From fff2690e999f689ca30ccfc1d2179f60b823ee01 Mon Sep 17 00:00:00 2001 From: Zef Hemel Date: Thu, 7 Apr 2022 14:04:50 +0200 Subject: [PATCH] Enormous refactor adding IndexedDB space and syncing. --- plugs/core/materialized_queries.ts | 4 + server/api_server.ts | 1 + server/disk_storage.ts | 2 +- webapp/boot.ts | 36 ++++--- webapp/components/top_bar.tsx | 9 ++ webapp/editor.tsx | 2 +- webapp/spaces/cache_space.ts | 10 +- webapp/spaces/httprest_space.ts | 10 +- webapp/spaces/indexeddb_space.ts | 16 ++- webapp/spaces/space.ts | 4 +- webapp/spaces/sync.test.ts | 56 +++++++---- webapp/spaces/sync.ts | 120 ++++++++++++----------- webapp/syscalls/{indexer.ts => index.ts} | 4 +- webapp/syscalls/system.ts | 4 +- 14 files changed, 168 insertions(+), 110 deletions(-) rename webapp/syscalls/{indexer.ts => index.ts} (74%) diff --git a/plugs/core/materialized_queries.ts b/plugs/core/materialized_queries.ts index 3b681d04..40099c37 100644 --- a/plugs/core/materialized_queries.ts +++ b/plugs/core/materialized_queries.ts @@ -93,17 +93,21 @@ export async function updateMaterializedQueriesOnPage(pageName: string) { return `${startQuery}\n${results.sort().join("\n")}\n${endQuery}`; case "link": let uniqueLinks = new Set(); + console.log("Here!!"); for (let { key, page, value: name } of await scanPrefixGlobal( `pl:${pageName}:` )) { + console.log("Here!!"); let [, pos] = key.split(":"); if (!filter || (filter && name.includes(filter))) { uniqueLinks.add(name); } } + console.log("Here!!"); for (const uniqueResult of uniqueLinks) { results.push(`* [[${uniqueResult}]]`); } + console.log("Here!!"); return `${startQuery}\n${results.sort().join("\n")}\n${endQuery}`; case "item": for (let { diff --git a/server/api_server.ts b/server/api_server.ts index 3d1d52bf..6430a45b 100644 --- a/server/api_server.ts +++ b/server/api_server.ts @@ -68,6 +68,7 @@ export class ExpressServer { // Page list fsRouter.route("/").get(async (req, res) => { + res.header("Now-Timestamp", "" + Date.now()); res.json(await this.storage.listPages()); }); diff --git a/server/disk_storage.ts b/server/disk_storage.ts index 957332f9..c8dcdb5b 100644 --- a/server/disk_storage.ts +++ b/server/disk_storage.ts @@ -141,7 +141,7 @@ export class DiskStorage implements Storage { if (lastModified) { let d = new Date(lastModified); console.log("Going to set the modified time", d); - await utimes(localPath, lastModified, lastModified); + await utimes(localPath, d, d); } // Fetch new metadata const s = await stat(localPath); diff --git a/webapp/boot.ts b/webapp/boot.ts index a2c810a9..4a3cd8ea 100644 --- a/webapp/boot.ts +++ b/webapp/boot.ts @@ -2,23 +2,33 @@ import { Editor } from "./editor"; import { safeRun } from "./util"; import { WatchableSpace } from "./spaces/cache_space"; import { HttpRestSpace } from "./spaces/httprest_space"; +import { IndexedDBSpace } from "./spaces/indexeddb_space"; +import { SpaceSync } from "./spaces/sync"; -// let localSpace = new WatchableSpace(new IndexedDBSpace("pages"), true); -// localSpace.watch(); +let localSpace = new WatchableSpace(new IndexedDBSpace("pages"), true); +localSpace.watch(); let serverSpace = new WatchableSpace(new HttpRestSpace(""), true); -serverSpace.watch(); +// serverSpace.watch(); // @ts-ignore -// window.syncer = async () => { -// let lastSync = +(localStorage.getItem("lastSync") || "0"); -// let syncer = new SpaceSync(serverSpace, localSpace, lastSync, "_trash/"); -// await syncer.syncPages( -// SpaceSync.primaryConflictResolver(serverSpace, localSpace) -// ); -// localStorage.setItem("lastSync", "" + syncer.lastSync); -// console.log("Done!"); -// }; -let editor = new Editor(serverSpace, document.getElementById("root")!); +window.syncer = async () => { + let lastLocalSync = +(localStorage.getItem("lastLocalSync") || "0"), + lastRemoteSync = +(localStorage.getItem("lastRemoteSync") || "0"); + let syncer = new SpaceSync( + serverSpace, + localSpace, + lastRemoteSync, + lastLocalSync, + "_trash/" + ); + await syncer.syncPages( + SpaceSync.primaryConflictResolver(serverSpace, localSpace) + ); + localStorage.setItem("lastLocalSync", "" + syncer.secondaryLastSync); + localStorage.setItem("lastRemoteSync", "" + syncer.primaryLastSync); + console.log("Done!"); +}; +let editor = new Editor(localSpace, document.getElementById("root")!); safeRun(async () => { await editor.init(); diff --git a/webapp/components/top_bar.tsx b/webapp/components/top_bar.tsx index 1a70d20f..dd97d124 100644 --- a/webapp/components/top_bar.tsx +++ b/webapp/components/top_bar.tsx @@ -33,6 +33,15 @@ export function TopBar({ {prettyName(pageName)} + {notifications.length > 0 && (
{notifications.map((notification) => ( diff --git a/webapp/editor.tsx b/webapp/editor.tsx index dde91d0b..2492c7bf 100644 --- a/webapp/editor.tsx +++ b/webapp/editor.tsx @@ -32,7 +32,7 @@ import { smartQuoteKeymap } from "./smart_quotes"; import { WatchableSpace } from "./spaces/cache_space"; import customMarkdownStyle from "./style"; import { editorSyscalls } from "./syscalls/editor"; -import { indexerSyscalls } from "./syscalls/indexer"; +import { indexerSyscalls } from "./syscalls"; import { spaceSyscalls } from "./syscalls/space"; import { Action, AppViewState, initialViewState } from "./types"; import { SilverBulletHooks } from "../common/manifest"; diff --git a/webapp/spaces/cache_space.ts b/webapp/spaces/cache_space.ts index fac692d6..1479be29 100644 --- a/webapp/spaces/cache_space.ts +++ b/webapp/spaces/cache_space.ts @@ -8,7 +8,7 @@ const pageWatchInterval = 2000; const trashPrefix = "_trash/"; const plugPrefix = "_plug/"; -export class WatchableSpace extends EventEmitter implements Space { +export class WatchableSpace extends EventEmitter { pageMetaCache = new Map(); watchedPages = new Set(); private initialPageListLoad = true; @@ -46,7 +46,7 @@ export class WatchableSpace extends EventEmitter implements Space { safeRun(async () => { let newPageList = await this.space.fetchPageList(); let deletedPages = new Set(this.pageMetaCache.keys()); - newPageList.forEach((meta) => { + newPageList.pages.forEach((meta) => { const pageName = meta.name; const oldPageMeta = this.pageMetaCache.get(pageName); const newPageMeta = { @@ -112,11 +112,11 @@ export class WatchableSpace extends EventEmitter implements Space { await this.writePage( `${trashPrefix}${name}`, pageData.text, - true, + false, deleteDate ); } - await this.space.deletePage(name, deleteDate); + await this.space.deletePage(name); this.pageMetaCache.delete(name); this.emit("pageDeleted", name); @@ -210,7 +210,7 @@ export class WatchableSpace extends EventEmitter implements Space { } } - fetchPageList(): Promise> { + fetchPageList(): Promise<{ pages: Set; nowTimestamp: number }> { return this.space.fetchPageList(); } diff --git a/webapp/spaces/httprest_space.ts b/webapp/spaces/httprest_space.ts index f8e78c4a..67d24b39 100644 --- a/webapp/spaces/httprest_space.ts +++ b/webapp/spaces/httprest_space.ts @@ -11,7 +11,10 @@ export class HttpRestSpace implements Space { this.plugUrl = url + "/plug"; } - public async fetchPageList(): Promise> { + public async fetchPageList(): Promise<{ + pages: Set; + nowTimestamp: number; + }> { let req = await fetch(this.pageUrl, { method: "GET", }); @@ -25,7 +28,10 @@ export class HttpRestSpace implements Space { }); }); - return result; + return { + pages: result, + nowTimestamp: +req.headers.get("Now-Timestamp")!, + }; } async readPage(name: string): Promise<{ text: string; meta: PageMeta }> { diff --git a/webapp/spaces/indexeddb_space.ts b/webapp/spaces/indexeddb_space.ts index 263b4bea..7524f8ba 100644 --- a/webapp/spaces/indexeddb_space.ts +++ b/webapp/spaces/indexeddb_space.ts @@ -12,7 +12,7 @@ type Page = { export class IndexedDBSpace implements Space { private pageTable: Table; - constructor(dbName: string) { + constructor(dbName: string, readonly timeSkew: number = 0) { const db = new Dexie(dbName); db.version(1).stores({ page: "name", @@ -42,13 +42,19 @@ export class IndexedDBSpace implements Space { return plug.invoke(name, args); } - async fetchPageList(): Promise> { + async fetchPageList(): Promise<{ + pages: Set; + nowTimestamp: number; + }> { let allPages = await this.pageTable.toArray(); - let set = new Set(allPages.map((p) => p.meta)); - return set; + return { + pages: new Set(allPages.map((p) => p.meta)), + nowTimestamp: Date.now() + this.timeSkew, + }; } proxySyscall(plug: Plug, name: string, args: any[]): Promise { + console.log("Going this", name); return plug.syscall(name, args); } @@ -69,7 +75,7 @@ export class IndexedDBSpace implements Space { ): Promise { let meta = { name, - lastModified: lastModified ? lastModified : new Date().getTime(), + lastModified: lastModified ? lastModified : Date.now() + this.timeSkew, }; await this.pageTable.put({ name, diff --git a/webapp/spaces/space.ts b/webapp/spaces/space.ts index fc4826e0..5689d51c 100644 --- a/webapp/spaces/space.ts +++ b/webapp/spaces/space.ts @@ -13,7 +13,7 @@ export type SpaceEvents = { export interface Space { // Pages - fetchPageList(): Promise>; + fetchPageList(): Promise<{ pages: Set; nowTimestamp: number }>; readPage(name: string): Promise<{ text: string; meta: PageMeta }>; getPageMeta(name: string): Promise; writePage( @@ -22,7 +22,7 @@ export interface Space { selfUpdate?: boolean, lastModified?: number ): Promise; - deletePage(name: string, deleteDate?: number): Promise; + deletePage(name: string): Promise; // Plugs proxySyscall(plug: Plug, name: string, args: any[]): Promise; diff --git a/webapp/spaces/sync.test.ts b/webapp/spaces/sync.test.ts index 056a0958..2bb10e3f 100644 --- a/webapp/spaces/sync.test.ts +++ b/webapp/spaces/sync.test.ts @@ -9,32 +9,33 @@ require("fake-indexeddb/auto"); test("Test store", async () => { let primary = new WatchableSpace(new IndexedDBSpace("primary"), true); - let secondary = new WatchableSpace(new IndexedDBSpace("secondary"), true); - let sync = new SpaceSync(primary, secondary, 0, "_trash/"); + let secondary = new WatchableSpace( + new IndexedDBSpace("secondary", -5000), + true + ); + let sync = new SpaceSync(primary, secondary, 0, 0, "_trash/"); async function conflictResolver(pageMeta1: PageMeta, pageMeta2: PageMeta) {} // Write one page to primary await primary.writePage("start", "Hello"); expect((await secondary.listPages()).size).toBe(0); - await sync.syncPages(conflictResolver); + await syncPages(conflictResolver); expect((await secondary.listPages()).size).toBe(1); expect((await secondary.readPage("start")).text).toBe("Hello"); - let lastSync = sync.lastSync; // Should be a no-op - await sync.syncPages(); - expect(sync.lastSync).toBe(lastSync); + expect(await syncPages()).toBe(0); // Now let's make a change on the secondary await secondary.writePage("start", "Hello!!"); await secondary.writePage("test", "Test page"); // And sync it - await sync.syncPages(); + await syncPages(); - expect((await primary.listPages()).size).toBe(2); - expect((await secondary.listPages()).size).toBe(2); + expect(primary.listPages().size).toBe(2); + expect(secondary.listPages().size).toBe(2); expect((await primary.readPage("start")).text).toBe("Hello!!"); @@ -43,12 +44,12 @@ test("Test store", async () => { await primary.writePage("start2", "2"); await secondary.writePage("start3", "3"); await secondary.writePage("start4", "4"); - await sync.syncPages(); + await syncPages(); expect((await primary.listPages()).size).toBe(5); expect((await secondary.listPages()).size).toBe(5); - expect(await sync.syncPages()).toBe(0); + expect(await syncPages()).toBe(0); console.log("Deleting pages"); // Delete some pages @@ -58,29 +59,29 @@ test("Test store", async () => { console.log("Pages", await primary.listPages()); console.log("Trash", await primary.listTrash()); - await sync.syncPages(); + await syncPages(); expect((await primary.listPages()).size).toBe(3); expect((await secondary.listPages()).size).toBe(3); // No-op - expect(await sync.syncPages()).toBe(0); + expect(await syncPages()).toBe(0); await secondary.deletePage("start4"); await primary.deletePage("start2"); - await sync.syncPages(); + await syncPages(); // Just "test" left expect((await primary.listPages()).size).toBe(1); expect((await secondary.listPages()).size).toBe(1); // No-op - expect(await sync.syncPages()).toBe(0); + expect(await syncPages()).toBe(0); await secondary.writePage("start", "I'm back"); - await sync.syncPages(); + await syncPages(); expect((await primary.readPage("start")).text).toBe("I'm back"); @@ -88,10 +89,10 @@ test("Test store", async () => { await primary.writePage("start", "Hello 1"); await secondary.writePage("start", "Hello 2"); - await sync.syncPages(SpaceSync.primaryConflictResolver(primary, secondary)); + await syncPages(SpaceSync.primaryConflictResolver(primary, secondary)); // Sync conflicting copy back - await sync.syncPages(); + await syncPages(); // Verify that primary won expect((await primary.readPage("start")).text).toBe("Hello 1"); @@ -100,4 +101,23 @@ test("Test store", async () => { // test + start + start.conflicting copy expect((await primary.listPages()).size).toBe(3); expect((await secondary.listPages()).size).toBe(3); + + async function syncPages( + conflictResolver?: ( + pageMeta1: PageMeta, + pageMeta2: PageMeta + ) => Promise + ): Promise { + // Awesome practice: adding sleeps to fix issues! + await sleep(2); + let n = await sync.syncPages(conflictResolver); + await sleep(2); + return n; + } }); + +function sleep(ms: number = 5): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} diff --git a/webapp/spaces/sync.ts b/webapp/spaces/sync.ts index 1c92da1a..e681f77d 100644 --- a/webapp/spaces/sync.ts +++ b/webapp/spaces/sync.ts @@ -6,7 +6,8 @@ export class SpaceSync { constructor( private primary: WatchableSpace, private secondary: WatchableSpace, - public lastSync: number, + public primaryLastSync: number, + public secondaryLastSync: number, private trashPrefix: string ) {} @@ -33,14 +34,20 @@ export class SpaceSync { }; } - async syncablePages(space: Space): Promise { - return [...(await space.fetchPageList())].filter( - (pageMeta) => !pageMeta.name.startsWith(this.trashPrefix) - ); + async syncablePages( + space: WatchableSpace + ): Promise<{ pages: PageMeta[]; nowTimestamp: number }> { + let fetchResult = await space.fetchPageList(); + return { + pages: [...fetchResult.pages].filter( + (pageMeta) => !pageMeta.name.startsWith(this.trashPrefix) + ), + nowTimestamp: fetchResult.nowTimestamp, + }; } async trashPages(space: Space): Promise { - return [...(await space.fetchPageList())] + return [...(await space.fetchPageList()).pages] .filter((pageMeta) => pageMeta.name.startsWith(this.trashPrefix)) .map((pageMeta) => ({ ...pageMeta, @@ -56,27 +63,28 @@ export class SpaceSync { ): Promise { let syncOps = 0; - let allPagesPrimary = new Map( - (await this.syncablePages(this.primary)).map((p) => [p.name, p]) - ); + let { pages: primaryAllPagesSet, nowTimestamp: primarySyncTimestamp } = + await this.syncablePages(this.primary); + let allPagesPrimary = new Map(primaryAllPagesSet.map((p) => [p.name, p])); + let { pages: secondaryAllPagesSet, nowTimestamp: secondarySyncTimestamp } = + await this.syncablePages(this.secondary); let allPagesSecondary = new Map( - (await this.syncablePages(this.secondary)).map((p) => [p.name, p]) + secondaryAllPagesSet.map((p) => [p.name, p]) ); + let allTrashPrimary = new Map( (await this.trashPages(this.primary)) // Filter out old trash - .filter((p) => p.lastModified > this.lastSync) + .filter((p) => p.lastModified > this.primaryLastSync) .map((p) => [p.name, p]) ); let allTrashSecondary = new Map( (await this.trashPages(this.secondary)) // Filter out old trash - .filter((p) => p.lastModified > this.lastSync) + .filter((p) => p.lastModified > this.secondaryLastSync) .map((p) => [p.name, p]) ); - let createdPagesOnSecondary = new Set(); - // Iterate over all pages on the primary first for (let [name, pageMetaPrimary] of allPagesPrimary.entries()) { let pageMetaSecondary = allPagesSecondary.get(pageMetaPrimary.name); @@ -95,15 +103,14 @@ export class SpaceSync { name, pageData.text, true, - pageData.meta.lastModified + secondarySyncTimestamp // The reason for this is to not include it in the next sync cycle, we cannot blindly use the lastModified date due to time skew ); syncOps++; - createdPagesOnSecondary.add(name); } else { // Existing page - if (pageMetaPrimary.lastModified > this.lastSync) { + if (pageMetaPrimary.lastModified > this.primaryLastSync) { // Primary updated since last sync - if (pageMetaSecondary.lastModified > this.lastSync) { + if (pageMetaSecondary.lastModified > this.secondaryLastSync) { // Secondary also updated! CONFLICT if (conflictResolver) { await conflictResolver(pageMetaPrimary, pageMetaSecondary); @@ -124,11 +131,11 @@ export class SpaceSync { name, pageData.text, false, - pageData.meta.lastModified + secondarySyncTimestamp ); syncOps++; } - } else if (pageMetaSecondary.lastModified > this.lastSync) { + } else if (pageMetaSecondary.lastModified > this.secondaryLastSync) { // Secondary updated, but not primary (checked above) // Push from secondary to primary console.log("Changed page on secondary", name, "syncing to primary"); @@ -137,7 +144,7 @@ export class SpaceSync { name, pageData.text, false, - pageData.meta.lastModified + primarySyncTimestamp ); syncOps++; } else { @@ -161,8 +168,8 @@ export class SpaceSync { await this.primary.writePage( name, pageData.text, - true, - pageData.meta.lastModified + false, + primarySyncTimestamp ); syncOps++; } @@ -170,50 +177,45 @@ export class SpaceSync { // And finally, let's trash some pages for (let pageToDelete of allTrashPrimary.values()) { - if (pageToDelete.lastModified > this.lastSync) { - // New deletion - console.log("Deleting", pageToDelete.name, "on secondary"); - try { - await this.secondary.deletePage( - pageToDelete.name, - pageToDelete.lastModified - ); - syncOps++; - } catch (e: any) { - console.log("Page already gone", e.message); - } + console.log("Deleting", pageToDelete.name, "on secondary"); + try { + await this.secondary.deletePage( + pageToDelete.name, + secondarySyncTimestamp + ); + syncOps++; + } catch (e: any) { + console.log("Page already gone", e.message); } } for (let pageToDelete of allTrashSecondary.values()) { - if (pageToDelete.lastModified > this.lastSync) { - // New deletion - console.log("Deleting", pageToDelete.name, "on primary"); - try { - await this.primary.deletePage( - pageToDelete.name, - pageToDelete.lastModified - ); - syncOps++; - } catch (e: any) { - console.log("Page already gone", e.message); - } + console.log("Deleting", pageToDelete.name, "on primary"); + try { + await this.primary.deletePage(pageToDelete.name, primarySyncTimestamp); + syncOps++; + } catch (e: any) { + console.log("Page already gone", e.message); } } + // Setting last sync time to the timestamps we got back when fetching the page lists on each end + this.primaryLastSync = primarySyncTimestamp; + this.secondaryLastSync = secondarySyncTimestamp; + // Find the latest timestamp and set it as lastSync - allPagesPrimary.forEach((pageMeta) => { - this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); - }); - allPagesSecondary.forEach((pageMeta) => { - this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); - }); - allTrashPrimary.forEach((pageMeta) => { - this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); - }); - allTrashSecondary.forEach((pageMeta) => { - this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); - }); + // allPagesPrimary.forEach((pageMeta) => { + // this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); + // }); + // allPagesSecondary.forEach((pageMeta) => { + // this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); + // }); + // allTrashPrimary.forEach((pageMeta) => { + // this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); + // }); + // allTrashSecondary.forEach((pageMeta) => { + // this.lastSync = Math.max(this.lastSync, pageMeta.lastModified); + // }); return syncOps; } diff --git a/webapp/syscalls/indexer.ts b/webapp/syscalls/index.ts similarity index 74% rename from webapp/syscalls/indexer.ts rename to webapp/syscalls/index.ts index b437bcdb..d3c196e6 100644 --- a/webapp/syscalls/indexer.ts +++ b/webapp/syscalls/index.ts @@ -1,8 +1,8 @@ -import { Space } from "../spaces/space"; import { SysCallMapping } from "../../plugos/system"; import { proxySyscalls } from "../../plugos/syscalls/transport"; +import { WatchableSpace } from "../spaces/cache_space"; -export function indexerSyscalls(space: Space): SysCallMapping { +export function indexerSyscalls(space: WatchableSpace): SysCallMapping { return proxySyscalls( [ "index.scanPrefixForPage", diff --git a/webapp/syscalls/system.ts b/webapp/syscalls/system.ts index a86b1aa4..940419fc 100644 --- a/webapp/syscalls/system.ts +++ b/webapp/syscalls/system.ts @@ -1,7 +1,7 @@ import { SysCallMapping } from "../../plugos/system"; -import { Space } from "../spaces/space"; +import { WatchableSpace } from "../spaces/cache_space"; -export function systemSyscalls(space: Space): SysCallMapping { +export function systemSyscalls(space: WatchableSpace): SysCallMapping { return { "system.invokeFunction": async ( ctx,