diff --git a/server/package.json b/server/package.json index e56e55d8..5980fa2e 100644 --- a/server/package.json +++ b/server/package.json @@ -13,6 +13,7 @@ "dependencies": { "@codemirror/collab": "^0.19.0", "@codemirror/state": "^0.19.9", + "body-parser": "^1.19.2", "cors": "^2.8.5", "express": "^4.17.3", "socket.io": "^4.4.1", diff --git a/server/src/disk_storage.ts b/server/src/disk_storage.ts new file mode 100644 index 00000000..2d4f3162 --- /dev/null +++ b/server/src/disk_storage.ts @@ -0,0 +1,79 @@ +import { readdir, readFile, stat, unlink, writeFile } from "fs/promises"; +import path from "path"; +import { PageMeta, pagesPath } from "./server"; + +export class DiskStorage { + rootPath: string; + + constructor(rootPath: string) { + this.rootPath = rootPath; + } + + async listPages(): Promise { + let fileNames: PageMeta[] = []; + + let _this = this; + + async function walkPath(dir: string) { + let files = await readdir(dir); + for (let file of files) { + const fullPath = path.join(dir, file); + let s = await stat(fullPath); + if (s.isDirectory()) { + await walkPath(fullPath); + } else { + if (path.extname(file) === ".md") { + fileNames.push({ + name: fullPath.substring( + _this.rootPath.length + 1, + fullPath.length - 3 + ), + lastModified: s.mtime.getTime(), + }); + } + } + } + } + await walkPath(this.rootPath); + return fileNames; + } + + async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> { + const localPath = path.join(pagesPath, pageName + ".md"); + const s = await stat(localPath); + return { + text: await readFile(localPath, "utf8"), + meta: { + name: pageName, + lastModified: s.mtime.getTime(), + }, + }; + } + + async writePage(pageName: string, text: string): Promise { + let localPath = path.join(pagesPath, pageName + ".md"); + // await pipeline(body, fs.createWriteStream(localPath)); + await writeFile(localPath, text); + + // console.log(`Wrote to ${localPath}`); + const s = await stat(localPath); + return { + name: pageName, + lastModified: s.mtime.getTime(), + }; + } + + async getPageMeta(pageName: string): Promise { + let localPath = path.join(pagesPath, pageName + ".md"); + const s = await stat(localPath); + return { + name: pageName, + lastModified: s.mtime.getTime(), + }; + } + + async deletePage(pageName: string) { + let localPath = path.join(pagesPath, pageName + ".md"); + await unlink(localPath); + } +} diff --git a/server/src/realtime_storage.ts b/server/src/realtime_storage.ts new file mode 100644 index 00000000..bf76928b --- /dev/null +++ b/server/src/realtime_storage.ts @@ -0,0 +1,253 @@ +import fs from "fs"; +import { stat } from "fs/promises"; +import path from "path"; +import { ChangeSet } from "@codemirror/state"; +import { Update } from "@codemirror/collab"; +import { Server } from "socket.io"; +import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect"; +import { Socket } from "socket.io"; +import { DiskStorage } from "./disk_storage"; +import { PageMeta } from "./server"; +import { Client, Page } from "./types"; +import { safeRun } from "./util"; + +export class RealtimeStorage extends DiskStorage { + openPages = new Map(); + + private disconnectClient(client: Client, page: Page) { + page.clients.delete(client); + if (page.clients.size === 0) { + console.log("No more clients for", page.name, "flushing"); + this.flushPageToDisk(page.name, page); + this.openPages.delete(page.name); + } else { + page.cursors.delete(client.socket.id); + this.broadcastCursors(page); + } + } + + private broadcastCursors(page: Page) { + page.clients.forEach((client) => { + client.socket.emit("cursors", Object.fromEntries(page.cursors.entries())); + }); + } + + private flushPageToDisk(name: string, page: Page) { + super + .writePage(name, page.text.sliceString(0)) + .then((meta) => { + console.log(`Wrote page ${name} to disk`); + page.meta = meta; + }) + .catch((e) => { + console.log(`Could not write ${name} to disk:`, e); + }); + } + + // Override + async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> { + let page = this.openPages.get(pageName); + if (page) { + console.log("Serving page from memory", pageName); + return { + text: page.text.sliceString(0), + meta: page.meta, + }; + } else { + return super.readPage(pageName); + } + } + + async writePage(pageName: string, text: string): Promise { + let page = this.openPages.get(pageName); + if (page) { + for (let client of page.clients) { + client.socket.emit("reload", pageName); + } + this.openPages.delete(pageName); + } + return super.writePage(pageName, text); + } + + disconnectPageSocket(socket: Socket, pageName: string) { + let page = this.openPages.get(pageName); + if (page) { + for (let client of page.clients) { + if (client.socket === socket) { + this.disconnectClient(client, page); + } + } + } + } + + constructor(rootPath: string, io: Server) { + super(rootPath); + + // setInterval(() => { + // console.log("Currently open pages:", this.openPages.keys()); + // }, 10000); + + // Disk watcher + fs.watch( + rootPath, + { + recursive: true, + persistent: false, + }, + (eventType, filename) => { + safeRun(async () => { + if (path.extname(filename) !== ".md") { + return; + } + let localPath = path.join(rootPath, filename); + let pageName = filename.substring(0, filename.length - 3); + let s = await stat(localPath); + // console.log("Edit in", pageName); + const openPage = this.openPages.get(pageName); + if (openPage) { + if (openPage.meta.lastModified < s.mtime.getTime()) { + console.log("Page changed on disk outside of editor, reloading"); + this.openPages.delete(pageName); + for (let client of openPage.clients) { + client.socket.emit("reload", pageName); + } + } + } + }); + } + ); + + io.on("connection", (socket) => { + console.log("Connected", socket.id); + let clientOpenPages = new Set(); + + function onCall(eventName: string, cb: (...args: any[]) => Promise) { + socket.on(eventName, (reqId: number, ...args) => { + cb(...args).then((result) => { + socket.emit(`${eventName}Resp${reqId}`, result); + }); + }); + } + + onCall("openPage", async (pageName: string) => { + let page = this.openPages.get(pageName); + if (!page) { + try { + let { text, meta } = await super.readPage(pageName); + page = new Page(pageName, text, meta); + } catch (e) { + console.log("Creating new page", pageName); + page = new Page(pageName, "", { name: pageName, lastModified: 0 }); + } + this.openPages.set(pageName, page); + } + page.clients.add(new Client(socket, page.version)); + clientOpenPages.add(pageName); + console.log("Opened page", pageName); + this.broadcastCursors(page); + return page.toJSON(); + }); + + socket.on("closePage", (pageName: string) => { + console.log("Closing page", pageName); + clientOpenPages.delete(pageName); + this.disconnectPageSocket(socket, pageName); + }); + + onCall( + "pushUpdates", + async ( + pageName: string, + version: number, + updates: any[] + ): Promise => { + let page = this.openPages.get(pageName); + + if (!page) { + console.error( + "Received updates for not open page", + pageName, + this.openPages.keys() + ); + return; + } + if (version !== page.version) { + console.error("Invalid version", version, page.version); + return false; + } else { + console.log("Applying", updates.length, "updates"); + let transformedUpdates = []; + let textChanged = false; + for (let update of updates) { + let changes = ChangeSet.fromJSON(update.changes); + let transformedUpdate = { + changes, + clientID: update.clientID, + effects: update.cursors?.map((c: Cursor) => { + page.cursors.set(c.userId, c); + return cursorEffect.of(c); + }), + }; + page.updates.push(transformedUpdate); + transformedUpdates.push(transformedUpdate); + let oldText = page.text; + page.text = changes.apply(page.text); + if (oldText !== page.text) { + textChanged = true; + } + } + + if (textChanged) { + if (page.saveTimer) { + clearTimeout(page.saveTimer); + } + + page.saveTimer = setTimeout(() => { + this.flushPageToDisk(pageName, page); + }, 1000); + } + while (page.pending.length) { + page.pending.pop()!(transformedUpdates); + } + return true; + } + } + ); + + onCall( + "pullUpdates", + async (pageName: string, version: number): Promise => { + let page = this.openPages.get(pageName); + // console.log("Pulling updates for", pageName); + if (!page) { + console.error("Fetching updates for not open page"); + return []; + } + // TODO: Optimize this + let oldestVersion = Infinity; + page.clients.forEach((client) => { + oldestVersion = Math.min(client.version, oldestVersion); + if (client.socket === socket) { + client.version = version; + } + }); + page.flushUpdates(oldestVersion); + if (version < page.version) { + return page.updatesSince(version); + } else { + return new Promise((resolve) => { + page.pending.push(resolve); + }); + } + } + ); + + socket.on("disconnect", () => { + console.log("Disconnected", socket.id); + clientOpenPages.forEach((pageName) => { + this.disconnectPageSocket(socket, pageName); + }); + }); + }); + } +} diff --git a/server/src/server.ts b/server/src/server.ts index 152890db..4cab8148 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -1,24 +1,12 @@ +import bodyParser from "body-parser"; import cors from "cors"; -import express, { text } from "express"; -import fs from "fs"; -import { readdir, readFile, stat, unlink } from "fs/promises"; -import path from "path"; -import stream from "stream"; -import { promisify } from "util"; -import { debounce } from "lodash"; - -import { ChangeSet, Text } from "@codemirror/state"; -import { Update } from "@codemirror/collab"; +import express from "express"; +import { readFile } from "fs/promises"; import http from "http"; import { Server } from "socket.io"; - -import { cursorEffect } from "../../webapp/src/cursorEffect"; - -function safeRun(fn: () => Promise) { - fn().catch((e) => { - console.error(e); - }); -} +import stream from "stream"; +import { promisify } from "util"; +import { RealtimeStorage } from "./realtime_storage"; const app = express(); const server = http.createServer(app); @@ -31,324 +19,20 @@ const io = new Server(server, { const port = 3000; const pipeline = promisify(stream.pipeline); -const pagesPath = "../pages"; +export const pagesPath = "../pages"; const distDir = `${__dirname}/../../webapp/dist`; -type PageMeta = { +export type PageMeta = { name: string; lastModified: number; version?: number; }; -class DiskFS { - rootPath: string; - - constructor(rootPath: string) { - this.rootPath = rootPath; - } - - async listPages(): Promise { - let fileNames: PageMeta[] = []; - - let _this = this; - - async function walkPath(dir: string) { - let files = await readdir(dir); - for (let file of files) { - const fullPath = path.join(dir, file); - let s = await stat(fullPath); - if (s.isDirectory()) { - await walkPath(fullPath); - } else { - if (path.extname(file) === ".md") { - fileNames.push({ - name: fullPath.substring( - _this.rootPath.length + 1, - fullPath.length - 3 - ), - lastModified: s.mtime.getTime(), - }); - } - } - } - } - await walkPath(this.rootPath); - return fileNames; - } - - async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> { - const localPath = path.join(pagesPath, pageName + ".md"); - const s = await stat(localPath); - return { - text: await readFile(localPath, "utf8"), - meta: { - name: pageName, - lastModified: s.mtime.getTime(), - }, - }; - } - - async writePage(pageName: string, body: any): Promise { - let localPath = path.join(pagesPath, pageName + ".md"); - await pipeline(body, fs.createWriteStream(localPath)); - // console.log(`Wrote to ${localPath}`); - const s = await stat(localPath); - return { - name: pageName, - lastModified: s.mtime.getTime(), - }; - } - - async getPageMeta(pageName: string): Promise { - let localPath = path.join(pagesPath, pageName + ".md"); - const s = await stat(localPath); - return { - name: pageName, - lastModified: s.mtime.getTime(), - }; - } - - async deletePage(pageName: string) { - let localPath = path.join(pagesPath, pageName + ".md"); - await unlink(localPath); - } -} - -import { Socket } from "socket.io"; - -class Page { - text: Text; - updates: Update[]; - sockets: Set; - meta: PageMeta; - - pending: ((value: any) => void)[] = []; - - saveTimer: NodeJS.Timeout | undefined; - - constructor(text: string, meta: PageMeta) { - this.updates = []; - this.text = Text.of(text.split("\n")); - this.meta = meta; - this.sockets = new Set(); - } -} - -class RealtimeEditFS extends DiskFS { - openPages = new Map(); - - disconnectSocket(socket: Socket, pageName: string) { - let page = this.openPages.get(pageName); - if (page) { - page.sockets.delete(socket); - if (page.sockets.size === 0) { - console.log("No more sockets for", pageName, "flushing"); - this.flushPageToDisk(pageName, page); - this.openPages.delete(pageName); - } - } - } - - flushPageToDisk(name: string, page: Page) { - super - .writePage(name, page.text.sliceString(0)) - .then((meta) => { - console.log(`Wrote page ${name} to disk`); - page.meta = meta; - }) - .catch((e) => { - console.log(`Could not write ${name} to disk:`, e); - }); - } - - // Override - async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> { - let page = this.openPages.get(pageName); - if (page) { - console.log("Serving page from memory", pageName); - return { - text: page.text.sliceString(0), - meta: page.meta, - }; - } else { - return super.readPage(pageName); - } - } - - async writePage(pageName: string, body: any): Promise { - let page = this.openPages.get(pageName); - if (page) { - for (let socket of page.sockets) { - socket.emit("reload", pageName); - } - this.openPages.delete(pageName); - } - return super.writePage(pageName, body); - } - - constructor(rootPath: string, io: Server) { - super(rootPath); - - setInterval(() => { - console.log("Currently open pages:", this.openPages.keys()); - }, 10000); - - // Disk watcher - fs.watch( - rootPath, - { - recursive: true, - persistent: false, - }, - (eventType, filename) => { - safeRun(async () => { - if (path.extname(filename) !== ".md") { - return; - } - let localPath = path.join(rootPath, filename); - let pageName = filename.substring(0, filename.length - 3); - let s = await stat(localPath); - // console.log("Edit in", pageName); - const openPage = this.openPages.get(pageName); - if (openPage) { - if (openPage.meta.lastModified < s.mtime.getTime()) { - console.log("Page changed on disk outside of editor, reloading"); - for (let socket of openPage.sockets) { - socket.emit("reload", pageName); - } - this.openPages.delete(pageName); - } - } - }); - } - ); - - io.on("connection", (socket) => { - console.log("Connected", socket.id); - let socketOpenPages = new Set(); - - function onCall(eventName: string, cb: (...args: any[]) => Promise) { - socket.on(eventName, (reqId: number, ...args) => { - cb(...args).then((result) => { - socket.emit(`${eventName}Resp${reqId}`, result); - }); - }); - } - - onCall("openPage", async (pageName: string) => { - let page = this.openPages.get(pageName); - if (!page) { - try { - let { text, meta } = await super.readPage(pageName); - page = new Page(text, meta); - } catch (e) { - // console.log(`Could not open ${pageName}:`, e); - // Page does not exist, let's create a new one - console.log("Creating new page", pageName); - page = new Page("", { name: pageName, lastModified: 0 }); - } - this.openPages.set(pageName, page); - } - page.sockets.add(socket); - socketOpenPages.add(pageName); - console.log("Opened page", pageName); - return [page.updates.length, page.text.toJSON()]; - }); - - socket.on("closePage", (pageName: string) => { - console.log("Closing page", pageName); - this.disconnectSocket(socket, pageName); - socketOpenPages.delete(pageName); - }); - - onCall( - "pushUpdates", - async ( - pageName: string, - version: number, - updates: any[] - ): Promise => { - let page = this.openPages.get(pageName); - - if (!page) { - console.error( - "Received updates for not open page", - pageName, - this.openPages.keys() - ); - return; - } - if (version !== page.updates.length) { - console.error("Invalid version", version, page.updates.length); - return false; - } else { - console.log("Applying", updates.length, "updates"); - let transformedUpdates = []; - for (let update of updates) { - let changes = ChangeSet.fromJSON(update.changes); - console.log("Got effect", update); - let transformedUpdate = { - changes, - clientID: update.clientID, - effects: update.cursors?.map((c) => { - return cursorEffect.of(c); - }), - }; - page.updates.push(transformedUpdate); - transformedUpdates.push(transformedUpdate); - // TODO: save cursors locally as well - page.text = changes.apply(page.text); - } - - if (page.saveTimer) { - clearTimeout(page.saveTimer); - } - - page.saveTimer = setTimeout(() => { - this.flushPageToDisk(pageName, page); - }, 1000); - while (page.pending.length) { - page.pending.pop()!(transformedUpdates); - } - return true; - } - } - ); - - onCall( - "pullUpdates", - async (pageName: string, version: number): Promise => { - let page = this.openPages.get(pageName); - // console.log("Pulling updates for", pageName); - if (!page) { - console.error("Fetching updates for not open page"); - return []; - } - if (version < page.updates.length) { - return page.updates.slice(version); - } else { - return new Promise((resolve) => { - page.pending.push(resolve); - }); - } - } - ); - - socket.on("disconnect", () => { - console.log("Disconnected", socket.id); - socketOpenPages.forEach((page) => { - this.disconnectSocket(socket, page); - }); - }); - }); - } -} - app.use("/", express.static(distDir)); let fsRouter = express.Router(); // let diskFS = new DiskFS(pagesPath); -let filesystem = new RealtimeEditFS(pagesPath, io); +let filesystem = new RealtimeStorage(pagesPath, io); // Page list fsRouter.route("/").get(async (req, res) => { @@ -371,11 +55,11 @@ fsRouter res.send(""); } }) - .put(async (req, res) => { + .put(bodyParser.text({ type: "*/*" }), async (req, res) => { let reqPath = req.params[0]; try { - let meta = await filesystem.writePage(reqPath, req); + let meta = await filesystem.writePage(reqPath, req.body); res.status(200); res.header("Last-Modified", "" + meta.lastModified); res.send("OK"); diff --git a/server/src/types.ts b/server/src/types.ts new file mode 100644 index 00000000..fed1944b --- /dev/null +++ b/server/src/types.ts @@ -0,0 +1,58 @@ +import { Update } from "@codemirror/collab"; +import { Text } from "@codemirror/state"; +import { Socket } from "socket.io"; +import { Cursor } from "../../webapp/src/cursorEffect"; +import { PageMeta } from "./server"; + +export class Client { + constructor(public socket: Socket, public version: number) {} +} + +export class Page { + versionOffset = 0; + updates: Update[] = []; + cursors = new Map(); + clients = new Set(); + + pending: ((value: any) => void)[] = []; + + text: Text; + meta: PageMeta; + + saveTimer: NodeJS.Timeout | undefined; + name: string; + + constructor(name: string, text: string, meta: PageMeta) { + this.name = name; + this.text = Text.of(text.split("\n")); + this.meta = meta; + } + + updatesSince(version: number): Update[] { + return this.updates.slice(version - this.versionOffset); + } + + get version(): number { + return this.updates.length + this.versionOffset; + } + + flushUpdates(version: number) { + if (this.versionOffset > version) { + throw Error("This should never happen"); + } + if (this.versionOffset === version) { + return; + } + this.updates = this.updates.slice(version - this.versionOffset); + this.versionOffset = version; + // console.log("Flushed updates, now got", this.updates.length, "updates"); + } + + toJSON() { + return { + text: this.text, + version: this.version, + cursors: Object.fromEntries(this.cursors.entries()), + }; + } +} diff --git a/server/src/util.ts b/server/src/util.ts new file mode 100644 index 00000000..55b0a08a --- /dev/null +++ b/server/src/util.ts @@ -0,0 +1,5 @@ +export function safeRun(fn: () => Promise) { + fn().catch((e) => { + console.error(e); + }); +} diff --git a/server/yarn.lock b/server/yarn.lock index eb2c3b20..a454af4e 100644 --- a/server/yarn.lock +++ b/server/yarn.lock @@ -670,11 +670,6 @@ "@types/qs" "*" "@types/serve-static" "*" -"@types/lodash@^4.14.179": - version "4.14.179" - resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.179.tgz#490ec3288088c91295780237d2497a3aa9dfb5c5" - integrity sha512-uwc1x90yCKqGcIOAT6DwOSuxnrAbpkdPsUOZtwrXb4D/6wZs+6qG7QnIawDuZWg0sWpxl+ltIKCaLoMlna678w== - "@types/mime@^1": version "1.3.2" resolved "https://registry.yarnpkg.com/@types/mime/-/mime-1.3.2.tgz#93e25bf9ee75fe0fd80b594bc4feb0e862111b5a" @@ -792,7 +787,7 @@ binary-extensions@^2.0.0: resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-2.2.0.tgz#75f502eeaf9ffde42fc98829645be4ea76bd9e2d" integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA== -body-parser@1.19.2: +body-parser@1.19.2, body-parser@^1.19.2: version "1.19.2" resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.19.2.tgz#4714ccd9c157d44797b8b5607d72c0b89952f26e" integrity sha512-SAAwOxgoCKMGs9uUAUFHygfLAyaniaoun6I8mFY9pRAJL9+Kec34aU+oIjDhTycub1jozEfEwx1W1IuOYxVSFw== @@ -1740,11 +1735,6 @@ lodash.uniq@^4.5.0: resolved "https://registry.yarnpkg.com/lodash.uniq/-/lodash.uniq-4.5.0.tgz#d0225373aeb652adc1bc82e4945339a842754773" integrity sha1-0CJTc662Uq3BvILklFM5qEJ1R3M= -lodash@^4.17.21: - version "4.17.21" - resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c" - integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg== - lowercase-keys@^1.0.0, lowercase-keys@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/lowercase-keys/-/lowercase-keys-1.0.1.tgz#6f9e30b47084d971a7c820ff15a6c5167b74c26f" diff --git a/webapp/src/boot.ts b/webapp/src/boot.ts index 049c3161..90da6b3c 100644 --- a/webapp/src/boot.ts +++ b/webapp/src/boot.ts @@ -3,7 +3,7 @@ import { HttpRemoteSpace } from "./space"; import { safeRun } from "./util"; import { io } from "socket.io-client"; -let socket = io("http://localhost:3000"); +let socket = io(`http://${location.hostname}:3000`); let editor = new Editor( new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket), diff --git a/webapp/src/collab.ts b/webapp/src/collab.ts index f50ef01a..a5402040 100644 --- a/webapp/src/collab.ts +++ b/webapp/src/collab.ts @@ -10,6 +10,7 @@ import { receiveUpdates, sendableUpdates, } from "@codemirror/collab"; +import { RangeSetBuilder, Range } from "@codemirror/rangeset"; import { EditorState, StateEffect, StateField, Text } from "@codemirror/state"; import { Decoration, @@ -19,81 +20,46 @@ import { ViewUpdate, WidgetType, } from "@codemirror/view"; -import { cursorEffect } from "./cursorEffect"; +import { Cursor, cursorEffect } from "./cursorEffect"; import { HttpRemoteSpace } from "./space"; +const throttleInterval = 250; + +const throttle = (func: () => void, limit: number) => { + let timer: any = null; + return function () { + if (!timer) { + timer = setTimeout(() => { + func(); + timer = null; + }, limit); + } + }; +}; + +//@ts-ignore +window.throttle = throttle; + export class Document { text: Text; version: number; + cursors: Map; - constructor(text: Text, version: number) { + constructor(text: Text, version: number, cursors: Map) { this.text = text; this.version = version; + this.cursors = cursors; } } -let meId = ""; - -const cursorField = StateField.define({ - create() { - return Decoration.none; - }, - update(cursors, tr) { - cursors = cursors.map(tr.changes); - for (let e of tr.effects) { - if (e.is(cursorEffect)) { - const newCursorDecoration = Decoration.widget({ - widget: new CursorWidget(e.value.userId, e.value.color, e.value.pos), - side: 1, - }); - cursors = cursors.update({ - filter: (from, to, d) => !d.eq(newCursorDecoration), - // add: [newCursorDecoration.range(e.value.pos)], - sort: true, - }); - } - } - // console.log("New cursors", cursors.size); - return cursors; - }, - provide: (f) => EditorView.decorations.from(f), - fromJSON(cursorJSONs) { - let cursors = []; - for (let cursorJSON of cursorJSONs) { - cursors.push( - Decoration.widget({ - widget: new CursorWidget( - cursorJSON.userId, - cursorJSON.color, - cursorJSON.pos - ), - side: 1, - }).range(cursorJSON.pos) - ); - } - return Decoration.set(cursors); - }, - toJSON(cursors) { - let cursor = cursors.iter(); - let results = []; - while (cursor.value) { - results.push({ ...cursor.value.spec.widget }); - cursor.next(); - } - return results; - }, -}); - class CursorWidget extends WidgetType { userId: string; color: string; - pos: number; - constructor(userId: string, color: string, pos: number) { + constructor(userId: string, color: string) { super(); this.userId = userId; this.color = color; - this.pos = pos; } eq(other: CursorWidget) { @@ -104,9 +70,13 @@ class CursorWidget extends WidgetType { let el = document.createElement("span"); el.className = "other-cursor"; el.style.backgroundColor = this.color; - if (this.userId == meId) { - el.style.display = "none"; - } + // let nameSpanContainer = document.createElement("span"); + // nameSpanContainer.className = "cursor-label-container"; + // let nameSpanLabel = document.createElement("label"); + // nameSpanLabel.className = "cursor-label"; + // nameSpanLabel.textContent = this.userId; + // nameSpanContainer.appendChild(nameSpanLabel); + // el.appendChild(nameSpanContainer); return el; } } @@ -114,28 +84,71 @@ class CursorWidget extends WidgetType { export function collabExtension( pageName: string, clientID: string, - startVersion: number, + doc: Document, space: HttpRemoteSpace, reloadCallback: () => void ) { - meId = clientID; let plugin = ViewPlugin.fromClass( class { private pushing = false; private done = false; private failedPushes = 0; + decorations: DecorationSet; + private cursorPositions: Map = doc.cursors; + throttledPush: () => void; + + buildDecorations(view: EditorView) { + let builder = new RangeSetBuilder(); + + let list = []; + for (let [userId, def] of this.cursorPositions) { + if (userId == clientID) { + continue; + } + list.push({ + pos: def.pos, + widget: Decoration.widget({ + widget: new CursorWidget(userId, def.color), + side: 1, + }), + }); + } + + list + .sort((a, b) => a.pos - b.pos) + .forEach((r) => { + builder.add(r.pos, r.pos, r.widget); + }); + + return builder.finish(); + } constructor(private view: EditorView) { if (pageName) { this.pull(); } + this.decorations = this.buildDecorations(view); + this.throttledPush = throttle(() => this.push(), throttleInterval); + + console.log("Created collabo plug"); + space.addEventListener("cursors", this.updateCursors); + } + + updateCursors(cursorEvent: any) { + this.cursorPositions = new Map(); + console.log("Received new cursor snapshot", cursorEvent.detail, this); + for (let userId in cursorEvent.detail) { + this.cursorPositions.set(userId, cursorEvent.detail[userId]); + } } update(update: ViewUpdate) { if (update.selectionSet) { let pos = update.state.selection.main.head; - console.log("New pos", pos); - // return; + // if (pos === 0) { + // console.error("Warning: position reset? at 0"); + // console.trace(); + // } setTimeout(() => { update.view.dispatch({ effects: [ @@ -144,17 +157,32 @@ export function collabExtension( }); }); } - let foundEffect = false; + let foundCursorMoves = new Set(); for (let tx of update.transactions) { - if (tx.effects.some((e) => e.is(cursorEffect))) { - foundEffect = true; + let cursorMove = tx.effects.find((e) => e.is(cursorEffect)); + if (cursorMove) { + foundCursorMoves.add(cursorMove.value.userId); } } - if (update.docChanged || foundEffect) this.push(); + // Update cursors + for (let cursor of this.cursorPositions.values()) { + if (foundCursorMoves.has(cursor.userId)) { + // Already got a cursor update for this one, no need to manually map + continue; + } + update.transactions.forEach((tx) => { + cursor.pos = tx.changes.mapPos(cursor.pos); + }); + } + this.decorations = this.buildDecorations(update.view); + if (update.docChanged || foundCursorMoves.size > 0) { + this.throttledPush(); + } } async push() { let updates = sendableUpdates(this.view.state); + // TODO: compose multiple updates into one if (this.pushing || !updates.length) return; console.log("Updates", updates); this.pushing = true; @@ -178,7 +206,8 @@ export function collabExtension( // Regardless of whether the push failed or new updates came in // while it was running, try again if there's updates remaining if (sendableUpdates(this.view.state).length) { - setTimeout(() => this.push(), 100); + // setTimeout(() => this.push(), 100); + this.throttledPush(); } } @@ -187,26 +216,43 @@ export function collabExtension( let version = getSyncedVersion(this.view.state); let updates = await space.pullUpdates(pageName, version); let d = receiveUpdates(this.view.state, updates); - console.log("Received", d); + // Pull out cursor updates and update local state + for (let update of updates) { + if (update.effects) { + for (let effect of update.effects) { + if (effect.is(cursorEffect)) { + this.cursorPositions.set(effect.value.userId, { + userId: effect.value.userId, + pos: effect.value.pos, + color: effect.value.color, + }); + } + } + } + } this.view.dispatch(d); } } destroy() { this.done = true; + space.removeEventListener("cursors", this.updateCursors); } + }, + { + decorations: (v) => v.decorations, } ); return [ collab({ - startVersion, + startVersion: doc.version, clientID, sharedEffects: (tr) => { return tr.effects.filter((e) => e.is(cursorEffect)); }, }), - cursorField, + // cursorField, plugin, ]; } diff --git a/webapp/src/cursorEffect.ts b/webapp/src/cursorEffect.ts index c088e509..3e05bfde 100644 --- a/webapp/src/cursorEffect.ts +++ b/webapp/src/cursorEffect.ts @@ -1,10 +1,11 @@ import { StateEffect } from "@codemirror/state"; - -export const cursorEffect = StateEffect.define<{ +export type Cursor = { pos: number; userId: string; color: string; -}>({ +}; + +export const cursorEffect = StateEffect.define({ map({ pos, userId, color }, changes) { return { pos: changes.mapPos(pos), userId, color }; }, diff --git a/webapp/src/editor.tsx b/webapp/src/editor.tsx index 1a7e850d..3885ed5a 100644 --- a/webapp/src/editor.tsx +++ b/webapp/src/editor.tsx @@ -65,6 +65,7 @@ import { collabExtension } from "./collab"; import { Document } from "./collab"; import { EditorSelection } from "@codemirror/state"; +import { Cursor } from "./cursorEffect"; class PageState { scrollTop: number; @@ -100,7 +101,10 @@ export class Editor implements AppEventDispatcher { this.viewDispatch = () => {}; this.render(parent); this.editorView = new EditorView({ - state: this.createEditorState("", new Document(Text.of([""]), 0)), + state: this.createEditorState( + "", + new Document(Text.of([""]), 0, new Map()) + ), parent: document.getElementById("editor")!, }); this.pageNavigator = new PathPageNavigator(); @@ -238,7 +242,7 @@ export class Editor implements AppEventDispatcher { collabExtension( pageName, this.space.socket.id, - doc.version, + doc, this.space, this.reloadPage.bind(this) ), @@ -435,6 +439,9 @@ export class Editor implements AppEventDispatcher { if (!pageState) { pageState = new PageState(0, editorState.selection); this.openPages.set(pageName, pageState!); + editorView.dispatch({ + selection: { anchor: 0 }, + }); } else { // Restore state console.log("Restoring selection state"); diff --git a/webapp/src/parser.ts b/webapp/src/parser.ts index 6ab22742..ec30d9aa 100644 --- a/webapp/src/parser.ts +++ b/webapp/src/parser.ts @@ -134,7 +134,7 @@ const TagLink: MarkdownConfig = { const WikiMarkdown = commonmark.configure([ WikiLink, AtMention, - TagLink, + // TagLink, TaskList, UnmarkedUrl, Comment, diff --git a/webapp/src/space.ts b/webapp/src/space.ts index 4b4e834a..f5b3ae1c 100644 --- a/webapp/src/space.ts +++ b/webapp/src/space.ts @@ -4,7 +4,7 @@ import { Update } from "@codemirror/collab"; import { Transaction, Text, ChangeSet } from "@codemirror/state"; import { Document } from "./collab"; -import { cursorEffect } from "./cursorEffect"; +import { Cursor, cursorEffect } from "./cursorEffect"; export interface Space { listPages(): Promise; @@ -32,6 +32,10 @@ export class HttpRemoteSpace extends EventTarget implements Space { socket.on("reload", (pageName: string) => { this.dispatchEvent(new CustomEvent("reload", { detail: pageName })); }); + + socket.on("cursors", (cursors) => { + this.dispatchEvent(new CustomEvent("cursors", { detail: cursors })); + }); } private wsCall(eventName: string, ...args: any[]): Promise { @@ -68,7 +72,6 @@ export class HttpRemoteSpace extends EventTarget implements Space { effects: u.effects?.map((e) => cursorEffect.of(e.value)), clientID: u.clientID, })); - console.log("Got updates", ups); return ups; } @@ -85,8 +88,12 @@ export class HttpRemoteSpace extends EventTarget implements Space { async openPage(name: string): Promise { this.reqId++; - let [version, text] = await this.wsCall("openPage", name); - return new Document(Text.of(text), version); + let pageJSON = await this.wsCall("openPage", name); + let cursors = new Map(); + for (let p in pageJSON.cursors) { + cursors.set(p, pageJSON.cursors[p]); + } + return new Document(Text.of(pageJSON.text), pageJSON.version, cursors); } async closePage(name: string): Promise { diff --git a/webapp/src/styles/editor.scss b/webapp/src/styles/editor.scss index 8f54f31d..53de173d 100644 --- a/webapp/src/styles/editor.scss +++ b/webapp/src/styles/editor.scss @@ -11,11 +11,34 @@ .other-cursor { display: inline-block; - width: 1px; - margin-right: -1px; + width: 2px; + margin-right: -2px; height: 1em; } + .cursor-label-container { + // display: none; + position: relative; + top: 2ch; + float: left; + width: 120px; + height: 2.2ch; + margin: 0; + padding: 0; + overflow: hidden; + font-family: Arial, Helvetica, sans-serif; + color: #fff; + border: gray 1px solid; + background-color: purple; + // font-size: 0.5em; + } + + .cursor-label-container label { + margin: 0; + padding: 0; + font-size: 0.7em; + } + .cm-selectionBackground { background-color: #d7e1f6 !important; }