From b5a8cd7d1b8e6d082bd9ebc1a72acec6a716467f Mon Sep 17 00:00:00 2001 From: Zef Hemel Date: Mon, 29 May 2023 17:05:20 +0200 Subject: [PATCH] Collab work --- .gitignore | 3 +- build_bundle.ts | 13 +- plug-api/silverbullet-syscall/collab.ts | 4 + plugs/collab/collab.ts | 46 ++++++ server/collab.test.ts | 39 +++++ server/collab.ts | 191 ++++++++++++++++++++++++ server/http_server.ts | 18 ++- web/cm_plugins/collab.ts | 33 ++-- web/deps.ts | 2 +- web/editor.tsx | 46 ++++-- web/space.ts | 2 +- web/syscalls/collab.ts | 20 ++- 12 files changed, 382 insertions(+), 35 deletions(-) create mode 100644 server/collab.test.ts create mode 100644 server/collab.ts diff --git a/.gitignore b/.gitignore index f5c88587..37d5d6ed 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ website_build .idea deno.lock fly.toml -env.sh \ No newline at end of file +env.sh +node_modules \ No newline at end of file diff --git a/build_bundle.ts b/build_bundle.ts index a1c4b558..886de3e0 100644 --- a/build_bundle.ts +++ b/build_bundle.ts @@ -13,12 +13,23 @@ await esbuild.build({ sourcemap: false, minify: false, plugins: [ + // ESBuild plugin to make npm modules external + { + name: "npm-external", + setup(build: any) { + build.onResolve({ filter: /^npm:/ }, (args: any) => { + return { + path: args.path, + external: true, + }; + }); + }, + }, { name: "json", setup: (build) => build.onLoad({ filter: /\.json$/ }, () => ({ loader: "json" })), }, - ...denoPlugins({ importMapURL: new URL("./import_map.json", import.meta.url) .toString(), diff --git a/plug-api/silverbullet-syscall/collab.ts b/plug-api/silverbullet-syscall/collab.ts index 8cfe85de..b467bf79 100644 --- a/plug-api/silverbullet-syscall/collab.ts +++ b/plug-api/silverbullet-syscall/collab.ts @@ -7,3 +7,7 @@ export function start(serverUrl: string, token: string, username: string) { export function stop() { return syscall("collab.stop"); } + +export function ping(clientId: string, currentPage: string) { + return syscall("collab.ping", clientId, currentPage); +} diff --git a/plugs/collab/collab.ts b/plugs/collab/collab.ts index abc6d3aa..aacc6346 100644 --- a/plugs/collab/collab.ts +++ b/plugs/collab/collab.ts @@ -110,6 +110,7 @@ export async function detectPage() { console.error("Error parsing YAML", e); } } + await ping(); } export function shareNoop() { @@ -160,3 +161,48 @@ export function writeFileCollab(name: string): FileMeta { perm: "rw", }; } + +const clientId = nanoid(); +let currentCollabId: string | undefined; + +const localCollabServer = location.protocol === "http:" + ? `ws://${location.host}/.ws-collab` + : `wss://${location.host}/.ws-collab`; + +async function ping() { + try { + const currentPage = await editor.getCurrentPage(); + const { collabId } = await collab.ping( + clientId, + currentPage, + ); + console.log("Collab ID", collabId); + if (!collabId && currentCollabId) { + // Stop collab + console.log("Stopping collab"); + // editor.flashNotification("Closing real-time collaboration mode."); + currentCollabId = undefined; + await collab.stop(); + } else if (collabId && collabId !== currentCollabId) { + // Start collab + console.log("Starting collab"); + editor.flashNotification("Opening page in real-time collaboration mode."); + currentCollabId = collabId; + await collab.start( + localCollabServer, + `${collabId}/${currentPage}`, + "you", + ); + } + } catch (e: any) { + // console.error("Ping error", e); + if (e.message.includes("Failed to fetch") && currentCollabId) { + console.log("Offline, stopping collab"); + currentCollabId = undefined; + await collab.stop(); + } + } +} +setInterval(() => { + ping().catch(console.error); +}, 5000); diff --git a/server/collab.test.ts b/server/collab.test.ts new file mode 100644 index 00000000..3d5ebff2 --- /dev/null +++ b/server/collab.test.ts @@ -0,0 +1,39 @@ +import { assert, assertEquals } from "../test_deps.ts"; +import { CollabServer } from "./collab.ts"; + +Deno.test("Collab server", async () => { + const collabServer = new CollabServer(null as any); + console.log("Client 1 joins page 1"); + assertEquals(collabServer.ping("client1", "page1"), {}); + assertEquals(collabServer.clients.size, 1); + assertEquals(collabServer.pages.size, 1); + console.log("Client 1 joins page 2"); + assertEquals(collabServer.ping("client1", "page2"), {}); + assertEquals(collabServer.clients.size, 1); + assertEquals(collabServer.pages.size, 1); + console.log("Client 2 joins to page 2, collab id created"); + const collabId = collabServer.ping("client2", "page2").collabId; + assertEquals(collabServer.clients.size, 2); + assert(collabId !== undefined); + console.log("Client 2 moves to page 1, collab id destroyed"); + assertEquals(collabServer.ping("client2", "page1"), {}); + assertEquals(collabServer.ping("client1", "page2"), {}); + console.log("Going to cleanup, which should have no effect"); + collabServer.cleanup(50); + assertEquals(collabServer.clients.size, 2); + collabServer.ping("client2", "page2"); + console.log("Going to sleep 20ms"); + await sleep(20); + console.log("Then client 1 pings, but client 2 does not"); + assertEquals(collabServer.ping("client1", "page2"), {}); + await sleep(20); + console.log("Going to cleanup, which should clean client 2"); + collabServer.cleanup(35); + assertEquals(collabServer.clients.size, 1); + assertEquals(collabServer.pages.get("page2")!.collabId, undefined); + console.log(collabServer); +}); + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/server/collab.ts b/server/collab.ts new file mode 100644 index 00000000..46837974 --- /dev/null +++ b/server/collab.ts @@ -0,0 +1,191 @@ +import { Hocuspocus } from "npm:@hocuspocus/server@2.0.6"; +import { getAvailablePortSync } from "https://deno.land/x/port@1.0.0/mod.ts"; +import { nanoid } from "https://esm.sh/nanoid@4.0.0"; +import { race, timeout } from "../common/async_util.ts"; +import { Application } from "./deps.ts"; +import { SpacePrimitives } from "../common/spaces/space_primitives.ts"; + +type CollabPage = { + clients: Set; // clientIds + collabId?: string; +}; + +const pingInterval = 5000; + +export class CollabServer { + clients: Map = new Map(); + pages: Map = new Map(); + + constructor(private spacePrimitives: SpacePrimitives) { + } + + start() { + setInterval(() => { + this.cleanup(3 * pingInterval); + }, pingInterval); + } + + ping(clientId: string, currentPage: string): { collabId?: string } { + let clientState = this.clients.get(clientId); + let collabId: string | undefined; + if (!clientState) { + clientState = { + openPage: "", + lastPing: Date.now(), + }; + } else { + clientState.lastPing = Date.now(); + } + if (currentPage !== clientState.openPage) { + // Client switched pages + // Update last page record + const lastCollabPage = this.pages.get(clientState.openPage); + if (lastCollabPage) { + lastCollabPage.clients.delete(clientId); + if (lastCollabPage.clients.size === 0) { + // Cleanup + this.pages.delete(clientState.openPage); + } else { + if (lastCollabPage.clients.size === 1) { + delete lastCollabPage.collabId; + } + this.pages.set(clientState.openPage, lastCollabPage); + } + } + // Update new page + let nextCollabPage = this.pages.get(currentPage); + if (!nextCollabPage) { + nextCollabPage = { + clients: new Set(), + }; + } + nextCollabPage.clients.add(clientId); + if (nextCollabPage.clients.size === 2) { + // Create a new collabId + nextCollabPage.collabId = nanoid(); + } + clientState.openPage = currentPage; + this.pages.set(currentPage, nextCollabPage); + collabId = nextCollabPage.collabId; + } else { + // Page didn't change + collabId = this.pages.get(currentPage)?.collabId; + } + this.clients.set(clientId, clientState); + if (collabId) { + return { collabId }; + } else { + return {}; + } + } + + cleanup(timeout: number) { + // Clean up clients that haven't pinged for some time + for (const [clientId, clientState] of this.clients) { + if (Date.now() - clientState.lastPing > timeout) { + console.log("[Collab]", "Ejecting client", clientId); + this.clients.delete(clientId); + const collabPage = this.pages.get(clientState.openPage); + if (collabPage) { + collabPage.clients.delete(clientId); + if (collabPage.clients.size === 0) { + this.pages.delete(clientState.openPage); + } else { + if (collabPage.clients.size === 1) { + delete collabPage.collabId; + } + this.pages.set(clientState.openPage, collabPage); + } + } + } + } + } + + route(app: Application) { + // The way this works is that we spin up a separate WS server locally and then proxy requests to it + // This is the only way I could get Hocuspocus to work with Deno + const internalPort = getAvailablePortSync(); + const hocuspocus = new Hocuspocus({ + port: internalPort, + address: "localhost", + quiet: true, + onLoadDocument: async (doc) => { + console.log("[Hocuspocus]", "Requesting doc load", doc.documentName); + const pageName = doc.documentName.split("/").slice(1).join("/"); + try { + const yText = doc.document.getText("codemirror"); + const { data } = await this.spacePrimitives.readFile( + `${pageName}.md`, + ); + + yText.insert(0, new TextDecoder().decode(data)); + console.log("[Hocuspocus]", "Loaded document from space"); + return doc.document; + } catch (e) { + console.error("Error loading doc", e); + } + }, + onDisconnect: (client) => { + console.log("[Hocuspocus]", "Client disconnected", client.clientsCount); + if (client.clientsCount === 0) { + console.log( + "[Hocuspocus]", + "Last client disconnected from", + client.documentName, + "purging from memory", + ); + hocuspocus.documents.delete(client.documentName); + } + return Promise.resolve(); + }, + }); + + hocuspocus.listen(); + app.use((ctx) => { + if (ctx.request.url.pathname === "/.ws") { + const sock = ctx.upgrade(); + sock.onmessage = (e) => { + console.log("WS: Got message", e.data); + }; + } + // Websocket proxy to hocuspocus + if (ctx.request.url.pathname === "/.ws-collab") { + const sock = ctx.upgrade(); + + const ws = new WebSocket(`ws://localhost:${internalPort}`); + const wsReady = race([ + new Promise((resolve) => { + ws.onopen = () => { + resolve(); + }; + }), + timeout(1000), + ]).catch(() => { + console.error("Timeout waiting for collab to open websocket"); + sock.close(); + }); + sock.onmessage = (e) => { + // console.log("Got message", e); + wsReady.then(() => ws.send(e.data)).catch(console.error); + }; + sock.onclose = () => { + if (ws.OPEN) { + ws.close(); + } + }; + ws.onmessage = (e) => { + if (sock.OPEN) { + sock.send(e.data); + } else { + console.error("Got message from websocket but socket is not open"); + } + }; + ws.onclose = () => { + if (sock.OPEN) { + sock.close(); + } + }; + } + }); + } +} diff --git a/server/http_server.ts b/server/http_server.ts index 620b0b96..2db3933a 100644 --- a/server/http_server.ts +++ b/server/http_server.ts @@ -7,6 +7,7 @@ import { performLocalFetch } from "../common/proxy_fetch.ts"; import { BuiltinSettings } from "../web/types.ts"; import { gitIgnoreCompiler } from "./deps.ts"; import { FilteredSpacePrimitives } from "../common/spaces/filtered_space_primitives.ts"; +import { CollabServer } from "./collab.ts"; export type ServerOptions = { hostname: string; @@ -29,6 +30,7 @@ export class HttpServer { clientAssetBundle: AssetBundle; settings?: BuiltinSettings; spacePrimitives: SpacePrimitives; + collab: CollabServer; constructor( spacePrimitives: SpacePrimitives, @@ -62,6 +64,8 @@ export class HttpServer { } }, ); + this.collab = new CollabServer(this.spacePrimitives); + this.collab.start(); } // Replaces some template variables in index.html in a rather ad-hoc manner, but YOLO @@ -123,7 +127,8 @@ export class HttpServer { this.app.use(({ request, response }, next) => { if ( !request.url.pathname.startsWith("/.fs") && - request.url.pathname !== "/.auth" + request.url.pathname !== "/.auth" && + !request.url.pathname.startsWith("/.ws") ) { response.headers.set("Content-type", "text/html"); response.body = this.renderIndexHtml(); @@ -138,6 +143,8 @@ export class HttpServer { this.app.use(fsRouter.routes()); this.app.use(fsRouter.allowedMethods()); + this.collab.route(this.app); + this.abortController = new AbortController(); const listenOptions: any = { hostname: this.hostname, @@ -273,6 +280,15 @@ export class HttpServer { }); return; } + case "ping": { + response.headers.set("Content-Type", "application/json"); + // console.log("Got ping", body); + response.body = JSON.stringify( + this.collab.ping(body.clientId, body.page), + ); + // console.log(this.collab); + return; + } default: response.headers.set("Content-Type", "text/plain"); response.status = 400; diff --git a/web/cm_plugins/collab.ts b/web/cm_plugins/collab.ts index c7c65599..57f0988a 100644 --- a/web/cm_plugins/collab.ts +++ b/web/cm_plugins/collab.ts @@ -1,4 +1,4 @@ -import { Extension, WebsocketProvider, Y, yCollab } from "../deps.ts"; +import { Extension, HocuspocusProvider, Y, yCollab } from "../deps.ts"; const userColors = [ { color: "#30bced", light: "#30bced33" }, @@ -12,27 +12,27 @@ const userColors = [ ]; export class CollabState { - ydoc: Y.Doc; - collabProvider: WebsocketProvider; - ytext: Y.Text; - yundoManager: Y.UndoManager; + public ytext: Y.Text; + private collabProvider: HocuspocusProvider; + private yundoManager: Y.UndoManager; - constructor(serverUrl: string, token: string, username: string) { - this.ydoc = new Y.Doc(); - this.collabProvider = new WebsocketProvider( - serverUrl, - token, - this.ydoc, - ); + constructor(serverUrl: string, name: string, username: string) { + this.collabProvider = new HocuspocusProvider({ + url: serverUrl, + name: name, + }); this.collabProvider.on("status", (e: any) => { console.log("Collab status change", e); }); - this.collabProvider.on("sync", (e: any) => { - console.log("Sync status", e); - }); + // this.collabProvider.on("sync", (e: any) => { + // console.log("Sync status", e); + // }); + // this.collabProvider.on("synced", (e: any) => { + // console.log("Synced status", e); + // }); - this.ytext = this.ydoc.getText("codemirror"); + this.ytext = this.collabProvider.document.getText("codemirror"); this.yundoManager = new Y.UndoManager(this.ytext); const randomColor = @@ -46,6 +46,7 @@ export class CollabState { } stop() { + this.collabProvider.disconnect(); this.collabProvider.destroy(); } diff --git a/web/deps.ts b/web/deps.ts index c0fecd27..d202b750 100644 --- a/web/deps.ts +++ b/web/deps.ts @@ -21,7 +21,7 @@ export { yCollab, yUndoManagerKeymap, } from "https://esm.sh/y-codemirror.next@0.3.2?external=yjs,@codemirror/state,@codemirror/commands,@codemirror/history,@codemirror/view"; -export { WebsocketProvider } from "https://esm.sh/y-websocket@1.4.5?external=yjs"; +export { HocuspocusProvider } from "https://esm.sh/@hocuspocus/provider@2.0.6?external=yjs,ws"; // Vim mode export { diff --git a/web/editor.tsx b/web/editor.tsx index 9acbd9a9..3e35016e 100644 --- a/web/editor.tsx +++ b/web/editor.tsx @@ -389,7 +389,8 @@ export class Editor { this.space.on({ pageChanged: (meta) => { - if (this.currentPage === meta.name) { + // Only reload when watching the current page (to avoid reloading when switching pages and in collab mode) + if (this.space.watchInterval && this.currentPage === meta.name) { console.log("Page changed elsewhere, reloading"); this.flashNotification("Page changed elsewhere, reloading"); this.reloadPage(); @@ -1144,8 +1145,7 @@ export class Editor { await this.save(true); // And stop the collab session if (this.collabState) { - this.collabState.stop(); - this.collabState = undefined; + this.stopCollab(); } } } @@ -1226,10 +1226,18 @@ export class Editor { if (pageState) { // Restore state editorView.scrollDOM.scrollTop = pageState!.scrollTop; - editorView.dispatch({ - selection: pageState.selection, - scrollIntoView: true, - }); + try { + editorView.dispatch({ + selection: pageState.selection, + scrollIntoView: true, + }); + } catch { + // This is fine, just go to the top + editorView.dispatch({ + selection: { anchor: 0 }, + scrollIntoView: true, + }); + } } else { editorView.scrollDOM.scrollTop = 0; editorView.dispatch({ @@ -1508,12 +1516,24 @@ export class Editor { } const initialText = this.editorView!.state.sliceDoc(); this.collabState = new CollabState(serverUrl, token, username); - this.collabState.collabProvider.once("sync", (synced: boolean) => { - if (this.collabState?.ytext.toString() === "") { - console.log("Synced value is empty, putting back original text"); - this.collabState?.ytext.insert(0, initialText); - } - }); + // this.collabState.collabProvider.on("synced", () => { + // if (this.collabState?.ytext.toString() === "") { + // console.error("Synced value is empty, putting back original text"); + // this.collabState?.ytext.insert(0, initialText); + // } + // }); this.rebuildEditorState(); + // Don't watch for local changes in this mode + this.space.unwatch(); + } + + stopCollab() { + if (this.collabState) { + this.collabState.stop(); + this.collabState = undefined; + this.rebuildEditorState(); + } + // Start file watching again + this.space.watch(); } } diff --git a/web/space.ts b/web/space.ts index 525d0143..0b469788 100644 --- a/web/space.ts +++ b/web/space.ts @@ -51,7 +51,7 @@ export class Space extends EventEmitter { super(); this.kvStore.get("imageHeightCache").then((cache) => { if (cache) { - console.log("Loaded image height cache from KV store", cache); + // console.log("Loaded image height cache from KV store", cache); this.imageHeightCache = cache; } }); diff --git a/web/syscalls/collab.ts b/web/syscalls/collab.ts index da395b00..ac1493d6 100644 --- a/web/syscalls/collab.ts +++ b/web/syscalls/collab.ts @@ -14,7 +14,25 @@ export function collabSyscalls(editor: Editor): SysCallMapping { "collab.stop": ( _ctx, ) => { - editor.collabState?.stop(); + editor.stopCollab(); + }, + "collab.ping": async ( + _ctx, + clientId: string, + currentPage: string, + ) => { + const resp = await editor.remoteSpacePrimitives.authenticatedFetch( + editor.remoteSpacePrimitives.url, + { + method: "POST", + body: JSON.stringify({ + operation: "ping", + clientId, + page: currentPage, + }), + }, + ); + return resp.json(); }, }; }