import { stat } from "fs/promises"; import { ChangeSet } from "@codemirror/state"; import { Update } from "@codemirror/collab"; import { Server } from "socket.io"; import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect"; import { Socket } from "socket.io"; import { DiskStorage } from "./disk_storage"; import { ClientPageState, Page, PageMeta } from "./types"; import { safeRun } from "./util"; import * as fs from "fs"; import * as path from "path"; import knex, { Knex } from "knex"; type IndexItem = { page: string; key: string; value: any; }; class ClientConnection { openPages = new Set(); constructor(readonly sock: Socket) {} } export class SocketServer { rootPath: string; serverSock: Server; openPages = new Map(); connectedSockets = new Set(); pageStore: DiskStorage; db: Knex; serverSocket: Server; api = { openPage: async (clientConn: ClientConnection, pageName: string) => { let page = this.openPages.get(pageName); if (!page) { try { let { text, meta } = await this.pageStore.readPage(pageName); page = new Page(pageName, text, meta); } catch (e) { console.log("Creating new page", pageName); page = new Page(pageName, "", { name: pageName, lastModified: 0 }); } this.openPages.set(pageName, page); } page.clientStates.add(new ClientPageState(clientConn.sock, page.version)); clientConn.openPages.add(pageName); console.log("Opened page", pageName); this.broadcastCursors(page); return page.toJSON(); }, pushUpdates: async ( clientConn: ClientConnection, pageName: string, version: number, updates: any[] ): Promise => { let page = this.openPages.get(pageName); if (!page) { console.error( "Received updates for not open page", pageName, this.openPages.keys() ); return false; } if (version !== page.version) { console.error("Invalid version", version, page.version); return false; } else { console.log("Applying", updates.length, "updates to", pageName); let transformedUpdates = []; let textChanged = false; for (let update of updates) { let changes = ChangeSet.fromJSON(update.changes); let transformedUpdate = { changes, clientID: update.clientID, effects: update.cursors?.map((c: Cursor) => { page.cursors.set(c.userId, c); return cursorEffect.of(c); }), }; page.updates.push(transformedUpdate); transformedUpdates.push(transformedUpdate); let oldText = page.text; page.text = changes.apply(page.text); if (oldText !== page.text) { textChanged = true; } } if (textChanged) { if (page.saveTimer) { clearTimeout(page.saveTimer); } page.saveTimer = setTimeout(() => { this.flushPageToDisk(pageName, page); }, 1000); } while (page.pending.length) { page.pending.pop()!(transformedUpdates); } return true; } }, pullUpdates: async ( clientConn: ClientConnection, pageName: string, version: number ): Promise => { let page = this.openPages.get(pageName); // console.log("Pulling updates for", pageName); if (!page) { console.error("Fetching updates for not open page"); return []; } // TODO: Optimize this let oldestVersion = Infinity; page.clientStates.forEach((client) => { oldestVersion = Math.min(client.version, oldestVersion); if (client.socket === clientConn.sock) { client.version = version; } }); page.flushUpdates(oldestVersion); if (version < page.version) { return page.updatesSince(version); } else { return new Promise((resolve) => { page.pending.push(resolve); }); } }, readPage: async ( clientConn: ClientConnection, pageName: string ): Promise<{ text: string; meta: PageMeta }> => { let page = this.openPages.get(pageName); if (page) { console.log("Serving page from memory", pageName); return { text: page.text.sliceString(0), meta: page.meta, }; } else { return this.pageStore.readPage(pageName); } }, writePage: async ( clientConn: ClientConnection, pageName: string, text: string ) => { let page = this.openPages.get(pageName); if (page) { for (let client of page.clientStates) { client.socket.emit("reloadPage", pageName); } this.openPages.delete(pageName); } return this.pageStore.writePage(pageName, text); }, deletePage: async (clientConn: ClientConnection, pageName: string) => { this.openPages.delete(pageName); clientConn.openPages.delete(pageName); // Cascading of this to all connected clients will be handled by file watcher return this.pageStore.deletePage(pageName); }, listPages: async (clientConn: ClientConnection): Promise => { return this.pageStore.listPages(); }, getPageMeta: async ( clientConn: ClientConnection, pageName: string ): Promise => { let page = this.openPages.get(pageName); if (page) { return page.meta; } return this.pageStore.getPageMeta(pageName); }, "index:clearPageIndexForPage": async ( clientConn: ClientConnection, page: string ) => { await this.db("page_index").where({ page }).del(); }, "index:set": async ( clientConn: ClientConnection, page: string, key: string, value: any ) => { let changed = await this.db("page_index") .where({ page, key }) .update("value", JSON.stringify(value)); if (changed === 0) { await this.db("page_index").insert({ page, key, value: JSON.stringify(value), }); } }, "index:get": async ( clientConn: ClientConnection, page: string, key: string ) => { let result = await this.db("page_index") .where({ page, key }) .select("value"); if (result.length) { return JSON.parse(result[0].value); } else { return null; } }, "index:delete": async ( clientConn: ClientConnection, page: string, key: string ) => { await this.db("page_index").where({ page, key }).del(); }, "index:scanPrefixForPage": async ( clientConn: ClientConnection, page: string, prefix: string ) => { return ( await this.db("page_index") .where({ page }) .andWhereLike("key", `${prefix}%`) .select("page", "key", "value") ).map(({ page, key, value }) => ({ page, key, value: JSON.parse(value), })); }, "index:scanPrefixGlobal": async ( clientConn: ClientConnection, prefix: string ) => { return ( await this.db("page_index") .andWhereLike("key", `${prefix}%`) .select("page", "key", "value") ).map(({ page, key, value }) => ({ page, key, value: JSON.parse(value), })); }, "index:deletePrefixForPage": async ( clientConn: ClientConnection, page: string, prefix: string ) => { return await this.db("page_index") .where({ page }) .andWhereLike("key", `${prefix}%`) .del(); }, "index:clearPageIndex": async (clientConn: ClientConnection) => { return await this.db("page_index").del(); }, }; constructor(rootPath: string, serverSocket: Server) { this.rootPath = path.resolve(rootPath); this.serverSocket = serverSocket; this.pageStore = new DiskStorage(this.rootPath); this.db = knex({ client: "better-sqlite3", connection: { filename: path.join(rootPath, "data.db"), }, useNullAsDefault: true, }); this.initDb(); serverSocket.on("connection", (socket) => { const clientConn = new ClientConnection(socket); // const socketOpenPages = new Set(); console.log("Connected", socket.id); this.connectedSockets.add(socket); socket.on("disconnect", () => { console.log("Disconnected", socket.id); clientConn.openPages.forEach(disconnectPageSocket); this.connectedSockets.delete(socket); }); socket.on("closePage", (pageName: string) => { console.log("Closing page", pageName); clientConn.openPages.delete(pageName); disconnectPageSocket(pageName); }); const onCall = ( eventName: string, cb: (...args: any[]) => Promise ) => { socket.on(eventName, (reqId: number, ...args) => { cb(...args) .then((result) => { socket.emit(`${eventName}Resp${reqId}`, null, result); }) .catch((err) => { socket.emit(`${eventName}Resp${reqId}`, err.message); }); }); }; const disconnectPageSocket = (pageName: string) => { let page = this.openPages.get(pageName); if (page) { for (let client of page.clientStates) { if (client.socket === socket) { this.disconnectClient(client, page); } } } }; Object.entries(this.api).forEach(([eventName, cb]) => { onCall(eventName, (...args: any[]): any => { return cb.call(this, clientConn, ...args); }); }); }); } async initDb() { if (!(await this.db.schema.hasTable("page_index"))) { await this.db.schema.createTable("page_index", (table) => { table.string("page"); table.string("key"); table.text("value"); table.primary(["page", "key"]); }); console.log("Created table page_index"); } } disconnectClient(client: ClientPageState, page: Page) { page.clientStates.delete(client); if (page.clientStates.size === 0) { console.log("No more clients for", page.name, "flushing"); this.flushPageToDisk(page.name, page); this.openPages.delete(page.name); } else { page.cursors.delete(client.socket.id); this.broadcastCursors(page); } } broadcastCursors(page: Page) { page.clientStates.forEach((client) => { client.socket.emit( "cursorSnapshot", page.name, Object.fromEntries(page.cursors.entries()) ); }); } flushPageToDisk(name: string, page: Page) { safeRun(async () => { let meta = await this.pageStore.writePage(name, page.text.sliceString(0)); console.log(`Wrote page ${name} to disk`); page.meta = meta; }); } fileWatcher() { fs.watch( this.rootPath, { recursive: true, persistent: false, }, (eventType, filename) => { safeRun(async () => { if (!filename.endsWith(".md")) { return; } let localPath = path.join(this.rootPath, filename); let pageName = filename.substring(0, filename.length - 3); // console.log("Edit in", pageName, eventType); let modifiedTime = 0; try { let s = await stat(localPath); modifiedTime = s.mtime.getTime(); } catch (e) { // File was deleted console.log("Deleted", pageName); for (let socket of this.connectedSockets) { socket.emit("pageDeleted", pageName); } return; } const openPage = this.openPages.get(pageName); if (openPage) { if (openPage.meta.lastModified < modifiedTime) { console.log("Page changed on disk outside of editor, reloading"); this.openPages.delete(pageName); const meta = { name: pageName, lastModified: modifiedTime, } as PageMeta; for (let client of openPage.clientStates) { client.socket.emit("pageChanged", meta); } } } if (eventType === "rename") { // This most likely means a new file was created, let's push new file listings to all connected sockets console.log( "New file created, broadcasting to all connected sockets", pageName ); for (let socket of this.connectedSockets) { socket.emit("pageCreated", { name: pageName, lastModified: modifiedTime, } as PageMeta); } } }); } ); } close() { this.db.destroy(); } }