Server refactor, cursor update broadcast, history compaction

pull/3/head
Zef Hemel 2022-03-10 14:09:59 +01:00
parent 7ae3496749
commit 5c5e232034
14 changed files with 577 additions and 423 deletions

View File

@ -13,6 +13,7 @@
"dependencies": { "dependencies": {
"@codemirror/collab": "^0.19.0", "@codemirror/collab": "^0.19.0",
"@codemirror/state": "^0.19.9", "@codemirror/state": "^0.19.9",
"body-parser": "^1.19.2",
"cors": "^2.8.5", "cors": "^2.8.5",
"express": "^4.17.3", "express": "^4.17.3",
"socket.io": "^4.4.1", "socket.io": "^4.4.1",

View File

@ -0,0 +1,79 @@
import { readdir, readFile, stat, unlink, writeFile } from "fs/promises";
import path from "path";
import { PageMeta, pagesPath } from "./server";
export class DiskStorage {
rootPath: string;
constructor(rootPath: string) {
this.rootPath = rootPath;
}
async listPages(): Promise<PageMeta[]> {
let fileNames: PageMeta[] = [];
let _this = this;
async function walkPath(dir: string) {
let files = await readdir(dir);
for (let file of files) {
const fullPath = path.join(dir, file);
let s = await stat(fullPath);
if (s.isDirectory()) {
await walkPath(fullPath);
} else {
if (path.extname(file) === ".md") {
fileNames.push({
name: fullPath.substring(
_this.rootPath.length + 1,
fullPath.length - 3
),
lastModified: s.mtime.getTime(),
});
}
}
}
}
await walkPath(this.rootPath);
return fileNames;
}
async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> {
const localPath = path.join(pagesPath, pageName + ".md");
const s = await stat(localPath);
return {
text: await readFile(localPath, "utf8"),
meta: {
name: pageName,
lastModified: s.mtime.getTime(),
},
};
}
async writePage(pageName: string, text: string): Promise<PageMeta> {
let localPath = path.join(pagesPath, pageName + ".md");
// await pipeline(body, fs.createWriteStream(localPath));
await writeFile(localPath, text);
// console.log(`Wrote to ${localPath}`);
const s = await stat(localPath);
return {
name: pageName,
lastModified: s.mtime.getTime(),
};
}
async getPageMeta(pageName: string): Promise<PageMeta> {
let localPath = path.join(pagesPath, pageName + ".md");
const s = await stat(localPath);
return {
name: pageName,
lastModified: s.mtime.getTime(),
};
}
async deletePage(pageName: string) {
let localPath = path.join(pagesPath, pageName + ".md");
await unlink(localPath);
}
}

View File

@ -0,0 +1,253 @@
import fs from "fs";
import { stat } from "fs/promises";
import path from "path";
import { ChangeSet } from "@codemirror/state";
import { Update } from "@codemirror/collab";
import { Server } from "socket.io";
import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect";
import { Socket } from "socket.io";
import { DiskStorage } from "./disk_storage";
import { PageMeta } from "./server";
import { Client, Page } from "./types";
import { safeRun } from "./util";
export class RealtimeStorage extends DiskStorage {
openPages = new Map<string, Page>();
private disconnectClient(client: Client, page: Page) {
page.clients.delete(client);
if (page.clients.size === 0) {
console.log("No more clients for", page.name, "flushing");
this.flushPageToDisk(page.name, page);
this.openPages.delete(page.name);
} else {
page.cursors.delete(client.socket.id);
this.broadcastCursors(page);
}
}
private broadcastCursors(page: Page) {
page.clients.forEach((client) => {
client.socket.emit("cursors", Object.fromEntries(page.cursors.entries()));
});
}
private flushPageToDisk(name: string, page: Page) {
super
.writePage(name, page.text.sliceString(0))
.then((meta) => {
console.log(`Wrote page ${name} to disk`);
page.meta = meta;
})
.catch((e) => {
console.log(`Could not write ${name} to disk:`, e);
});
}
// Override
async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> {
let page = this.openPages.get(pageName);
if (page) {
console.log("Serving page from memory", pageName);
return {
text: page.text.sliceString(0),
meta: page.meta,
};
} else {
return super.readPage(pageName);
}
}
async writePage(pageName: string, text: string): Promise<PageMeta> {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clients) {
client.socket.emit("reload", pageName);
}
this.openPages.delete(pageName);
}
return super.writePage(pageName, text);
}
disconnectPageSocket(socket: Socket, pageName: string) {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clients) {
if (client.socket === socket) {
this.disconnectClient(client, page);
}
}
}
}
constructor(rootPath: string, io: Server) {
super(rootPath);
// setInterval(() => {
// console.log("Currently open pages:", this.openPages.keys());
// }, 10000);
// Disk watcher
fs.watch(
rootPath,
{
recursive: true,
persistent: false,
},
(eventType, filename) => {
safeRun(async () => {
if (path.extname(filename) !== ".md") {
return;
}
let localPath = path.join(rootPath, filename);
let pageName = filename.substring(0, filename.length - 3);
let s = await stat(localPath);
// console.log("Edit in", pageName);
const openPage = this.openPages.get(pageName);
if (openPage) {
if (openPage.meta.lastModified < s.mtime.getTime()) {
console.log("Page changed on disk outside of editor, reloading");
this.openPages.delete(pageName);
for (let client of openPage.clients) {
client.socket.emit("reload", pageName);
}
}
}
});
}
);
io.on("connection", (socket) => {
console.log("Connected", socket.id);
let clientOpenPages = new Set<string>();
function onCall(eventName: string, cb: (...args: any[]) => Promise<any>) {
socket.on(eventName, (reqId: number, ...args) => {
cb(...args).then((result) => {
socket.emit(`${eventName}Resp${reqId}`, result);
});
});
}
onCall("openPage", async (pageName: string) => {
let page = this.openPages.get(pageName);
if (!page) {
try {
let { text, meta } = await super.readPage(pageName);
page = new Page(pageName, text, meta);
} catch (e) {
console.log("Creating new page", pageName);
page = new Page(pageName, "", { name: pageName, lastModified: 0 });
}
this.openPages.set(pageName, page);
}
page.clients.add(new Client(socket, page.version));
clientOpenPages.add(pageName);
console.log("Opened page", pageName);
this.broadcastCursors(page);
return page.toJSON();
});
socket.on("closePage", (pageName: string) => {
console.log("Closing page", pageName);
clientOpenPages.delete(pageName);
this.disconnectPageSocket(socket, pageName);
});
onCall(
"pushUpdates",
async (
pageName: string,
version: number,
updates: any[]
): Promise<boolean> => {
let page = this.openPages.get(pageName);
if (!page) {
console.error(
"Received updates for not open page",
pageName,
this.openPages.keys()
);
return;
}
if (version !== page.version) {
console.error("Invalid version", version, page.version);
return false;
} else {
console.log("Applying", updates.length, "updates");
let transformedUpdates = [];
let textChanged = false;
for (let update of updates) {
let changes = ChangeSet.fromJSON(update.changes);
let transformedUpdate = {
changes,
clientID: update.clientID,
effects: update.cursors?.map((c: Cursor) => {
page.cursors.set(c.userId, c);
return cursorEffect.of(c);
}),
};
page.updates.push(transformedUpdate);
transformedUpdates.push(transformedUpdate);
let oldText = page.text;
page.text = changes.apply(page.text);
if (oldText !== page.text) {
textChanged = true;
}
}
if (textChanged) {
if (page.saveTimer) {
clearTimeout(page.saveTimer);
}
page.saveTimer = setTimeout(() => {
this.flushPageToDisk(pageName, page);
}, 1000);
}
while (page.pending.length) {
page.pending.pop()!(transformedUpdates);
}
return true;
}
}
);
onCall(
"pullUpdates",
async (pageName: string, version: number): Promise<Update[]> => {
let page = this.openPages.get(pageName);
// console.log("Pulling updates for", pageName);
if (!page) {
console.error("Fetching updates for not open page");
return [];
}
// TODO: Optimize this
let oldestVersion = Infinity;
page.clients.forEach((client) => {
oldestVersion = Math.min(client.version, oldestVersion);
if (client.socket === socket) {
client.version = version;
}
});
page.flushUpdates(oldestVersion);
if (version < page.version) {
return page.updatesSince(version);
} else {
return new Promise((resolve) => {
page.pending.push(resolve);
});
}
}
);
socket.on("disconnect", () => {
console.log("Disconnected", socket.id);
clientOpenPages.forEach((pageName) => {
this.disconnectPageSocket(socket, pageName);
});
});
});
}
}

View File

@ -1,24 +1,12 @@
import bodyParser from "body-parser";
import cors from "cors"; import cors from "cors";
import express, { text } from "express"; import express from "express";
import fs from "fs"; import { readFile } from "fs/promises";
import { readdir, readFile, stat, unlink } from "fs/promises";
import path from "path";
import stream from "stream";
import { promisify } from "util";
import { debounce } from "lodash";
import { ChangeSet, Text } from "@codemirror/state";
import { Update } from "@codemirror/collab";
import http from "http"; import http from "http";
import { Server } from "socket.io"; import { Server } from "socket.io";
import stream from "stream";
import { cursorEffect } from "../../webapp/src/cursorEffect"; import { promisify } from "util";
import { RealtimeStorage } from "./realtime_storage";
function safeRun(fn: () => Promise<void>) {
fn().catch((e) => {
console.error(e);
});
}
const app = express(); const app = express();
const server = http.createServer(app); const server = http.createServer(app);
@ -31,324 +19,20 @@ const io = new Server(server, {
const port = 3000; const port = 3000;
const pipeline = promisify(stream.pipeline); const pipeline = promisify(stream.pipeline);
const pagesPath = "../pages"; export const pagesPath = "../pages";
const distDir = `${__dirname}/../../webapp/dist`; const distDir = `${__dirname}/../../webapp/dist`;
type PageMeta = { export type PageMeta = {
name: string; name: string;
lastModified: number; lastModified: number;
version?: number; version?: number;
}; };
class DiskFS {
rootPath: string;
constructor(rootPath: string) {
this.rootPath = rootPath;
}
async listPages(): Promise<PageMeta[]> {
let fileNames: PageMeta[] = [];
let _this = this;
async function walkPath(dir: string) {
let files = await readdir(dir);
for (let file of files) {
const fullPath = path.join(dir, file);
let s = await stat(fullPath);
if (s.isDirectory()) {
await walkPath(fullPath);
} else {
if (path.extname(file) === ".md") {
fileNames.push({
name: fullPath.substring(
_this.rootPath.length + 1,
fullPath.length - 3
),
lastModified: s.mtime.getTime(),
});
}
}
}
}
await walkPath(this.rootPath);
return fileNames;
}
async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> {
const localPath = path.join(pagesPath, pageName + ".md");
const s = await stat(localPath);
return {
text: await readFile(localPath, "utf8"),
meta: {
name: pageName,
lastModified: s.mtime.getTime(),
},
};
}
async writePage(pageName: string, body: any): Promise<PageMeta> {
let localPath = path.join(pagesPath, pageName + ".md");
await pipeline(body, fs.createWriteStream(localPath));
// console.log(`Wrote to ${localPath}`);
const s = await stat(localPath);
return {
name: pageName,
lastModified: s.mtime.getTime(),
};
}
async getPageMeta(pageName: string): Promise<PageMeta> {
let localPath = path.join(pagesPath, pageName + ".md");
const s = await stat(localPath);
return {
name: pageName,
lastModified: s.mtime.getTime(),
};
}
async deletePage(pageName: string) {
let localPath = path.join(pagesPath, pageName + ".md");
await unlink(localPath);
}
}
import { Socket } from "socket.io";
class Page {
text: Text;
updates: Update[];
sockets: Set<Socket>;
meta: PageMeta;
pending: ((value: any) => void)[] = [];
saveTimer: NodeJS.Timeout | undefined;
constructor(text: string, meta: PageMeta) {
this.updates = [];
this.text = Text.of(text.split("\n"));
this.meta = meta;
this.sockets = new Set<Socket>();
}
}
class RealtimeEditFS extends DiskFS {
openPages = new Map<string, Page>();
disconnectSocket(socket: Socket, pageName: string) {
let page = this.openPages.get(pageName);
if (page) {
page.sockets.delete(socket);
if (page.sockets.size === 0) {
console.log("No more sockets for", pageName, "flushing");
this.flushPageToDisk(pageName, page);
this.openPages.delete(pageName);
}
}
}
flushPageToDisk(name: string, page: Page) {
super
.writePage(name, page.text.sliceString(0))
.then((meta) => {
console.log(`Wrote page ${name} to disk`);
page.meta = meta;
})
.catch((e) => {
console.log(`Could not write ${name} to disk:`, e);
});
}
// Override
async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> {
let page = this.openPages.get(pageName);
if (page) {
console.log("Serving page from memory", pageName);
return {
text: page.text.sliceString(0),
meta: page.meta,
};
} else {
return super.readPage(pageName);
}
}
async writePage(pageName: string, body: any): Promise<PageMeta> {
let page = this.openPages.get(pageName);
if (page) {
for (let socket of page.sockets) {
socket.emit("reload", pageName);
}
this.openPages.delete(pageName);
}
return super.writePage(pageName, body);
}
constructor(rootPath: string, io: Server) {
super(rootPath);
setInterval(() => {
console.log("Currently open pages:", this.openPages.keys());
}, 10000);
// Disk watcher
fs.watch(
rootPath,
{
recursive: true,
persistent: false,
},
(eventType, filename) => {
safeRun(async () => {
if (path.extname(filename) !== ".md") {
return;
}
let localPath = path.join(rootPath, filename);
let pageName = filename.substring(0, filename.length - 3);
let s = await stat(localPath);
// console.log("Edit in", pageName);
const openPage = this.openPages.get(pageName);
if (openPage) {
if (openPage.meta.lastModified < s.mtime.getTime()) {
console.log("Page changed on disk outside of editor, reloading");
for (let socket of openPage.sockets) {
socket.emit("reload", pageName);
}
this.openPages.delete(pageName);
}
}
});
}
);
io.on("connection", (socket) => {
console.log("Connected", socket.id);
let socketOpenPages = new Set<string>();
function onCall(eventName: string, cb: (...args: any[]) => Promise<any>) {
socket.on(eventName, (reqId: number, ...args) => {
cb(...args).then((result) => {
socket.emit(`${eventName}Resp${reqId}`, result);
});
});
}
onCall("openPage", async (pageName: string) => {
let page = this.openPages.get(pageName);
if (!page) {
try {
let { text, meta } = await super.readPage(pageName);
page = new Page(text, meta);
} catch (e) {
// console.log(`Could not open ${pageName}:`, e);
// Page does not exist, let's create a new one
console.log("Creating new page", pageName);
page = new Page("", { name: pageName, lastModified: 0 });
}
this.openPages.set(pageName, page);
}
page.sockets.add(socket);
socketOpenPages.add(pageName);
console.log("Opened page", pageName);
return [page.updates.length, page.text.toJSON()];
});
socket.on("closePage", (pageName: string) => {
console.log("Closing page", pageName);
this.disconnectSocket(socket, pageName);
socketOpenPages.delete(pageName);
});
onCall(
"pushUpdates",
async (
pageName: string,
version: number,
updates: any[]
): Promise<boolean> => {
let page = this.openPages.get(pageName);
if (!page) {
console.error(
"Received updates for not open page",
pageName,
this.openPages.keys()
);
return;
}
if (version !== page.updates.length) {
console.error("Invalid version", version, page.updates.length);
return false;
} else {
console.log("Applying", updates.length, "updates");
let transformedUpdates = [];
for (let update of updates) {
let changes = ChangeSet.fromJSON(update.changes);
console.log("Got effect", update);
let transformedUpdate = {
changes,
clientID: update.clientID,
effects: update.cursors?.map((c) => {
return cursorEffect.of(c);
}),
};
page.updates.push(transformedUpdate);
transformedUpdates.push(transformedUpdate);
// TODO: save cursors locally as well
page.text = changes.apply(page.text);
}
if (page.saveTimer) {
clearTimeout(page.saveTimer);
}
page.saveTimer = setTimeout(() => {
this.flushPageToDisk(pageName, page);
}, 1000);
while (page.pending.length) {
page.pending.pop()!(transformedUpdates);
}
return true;
}
}
);
onCall(
"pullUpdates",
async (pageName: string, version: number): Promise<Update[]> => {
let page = this.openPages.get(pageName);
// console.log("Pulling updates for", pageName);
if (!page) {
console.error("Fetching updates for not open page");
return [];
}
if (version < page.updates.length) {
return page.updates.slice(version);
} else {
return new Promise((resolve) => {
page.pending.push(resolve);
});
}
}
);
socket.on("disconnect", () => {
console.log("Disconnected", socket.id);
socketOpenPages.forEach((page) => {
this.disconnectSocket(socket, page);
});
});
});
}
}
app.use("/", express.static(distDir)); app.use("/", express.static(distDir));
let fsRouter = express.Router(); let fsRouter = express.Router();
// let diskFS = new DiskFS(pagesPath); // let diskFS = new DiskFS(pagesPath);
let filesystem = new RealtimeEditFS(pagesPath, io); let filesystem = new RealtimeStorage(pagesPath, io);
// Page list // Page list
fsRouter.route("/").get(async (req, res) => { fsRouter.route("/").get(async (req, res) => {
@ -371,11 +55,11 @@ fsRouter
res.send(""); res.send("");
} }
}) })
.put(async (req, res) => { .put(bodyParser.text({ type: "*/*" }), async (req, res) => {
let reqPath = req.params[0]; let reqPath = req.params[0];
try { try {
let meta = await filesystem.writePage(reqPath, req); let meta = await filesystem.writePage(reqPath, req.body);
res.status(200); res.status(200);
res.header("Last-Modified", "" + meta.lastModified); res.header("Last-Modified", "" + meta.lastModified);
res.send("OK"); res.send("OK");

58
server/src/types.ts Normal file
View File

@ -0,0 +1,58 @@
import { Update } from "@codemirror/collab";
import { Text } from "@codemirror/state";
import { Socket } from "socket.io";
import { Cursor } from "../../webapp/src/cursorEffect";
import { PageMeta } from "./server";
export class Client {
constructor(public socket: Socket, public version: number) {}
}
export class Page {
versionOffset = 0;
updates: Update[] = [];
cursors = new Map<string, Cursor>();
clients = new Set<Client>();
pending: ((value: any) => void)[] = [];
text: Text;
meta: PageMeta;
saveTimer: NodeJS.Timeout | undefined;
name: string;
constructor(name: string, text: string, meta: PageMeta) {
this.name = name;
this.text = Text.of(text.split("\n"));
this.meta = meta;
}
updatesSince(version: number): Update[] {
return this.updates.slice(version - this.versionOffset);
}
get version(): number {
return this.updates.length + this.versionOffset;
}
flushUpdates(version: number) {
if (this.versionOffset > version) {
throw Error("This should never happen");
}
if (this.versionOffset === version) {
return;
}
this.updates = this.updates.slice(version - this.versionOffset);
this.versionOffset = version;
// console.log("Flushed updates, now got", this.updates.length, "updates");
}
toJSON() {
return {
text: this.text,
version: this.version,
cursors: Object.fromEntries(this.cursors.entries()),
};
}
}

5
server/src/util.ts Normal file
View File

@ -0,0 +1,5 @@
export function safeRun(fn: () => Promise<void>) {
fn().catch((e) => {
console.error(e);
});
}

View File

@ -670,11 +670,6 @@
"@types/qs" "*" "@types/qs" "*"
"@types/serve-static" "*" "@types/serve-static" "*"
"@types/lodash@^4.14.179":
version "4.14.179"
resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.179.tgz#490ec3288088c91295780237d2497a3aa9dfb5c5"
integrity sha512-uwc1x90yCKqGcIOAT6DwOSuxnrAbpkdPsUOZtwrXb4D/6wZs+6qG7QnIawDuZWg0sWpxl+ltIKCaLoMlna678w==
"@types/mime@^1": "@types/mime@^1":
version "1.3.2" version "1.3.2"
resolved "https://registry.yarnpkg.com/@types/mime/-/mime-1.3.2.tgz#93e25bf9ee75fe0fd80b594bc4feb0e862111b5a" resolved "https://registry.yarnpkg.com/@types/mime/-/mime-1.3.2.tgz#93e25bf9ee75fe0fd80b594bc4feb0e862111b5a"
@ -792,7 +787,7 @@ binary-extensions@^2.0.0:
resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-2.2.0.tgz#75f502eeaf9ffde42fc98829645be4ea76bd9e2d" resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-2.2.0.tgz#75f502eeaf9ffde42fc98829645be4ea76bd9e2d"
integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA== integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==
body-parser@1.19.2: body-parser@1.19.2, body-parser@^1.19.2:
version "1.19.2" version "1.19.2"
resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.19.2.tgz#4714ccd9c157d44797b8b5607d72c0b89952f26e" resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.19.2.tgz#4714ccd9c157d44797b8b5607d72c0b89952f26e"
integrity sha512-SAAwOxgoCKMGs9uUAUFHygfLAyaniaoun6I8mFY9pRAJL9+Kec34aU+oIjDhTycub1jozEfEwx1W1IuOYxVSFw== integrity sha512-SAAwOxgoCKMGs9uUAUFHygfLAyaniaoun6I8mFY9pRAJL9+Kec34aU+oIjDhTycub1jozEfEwx1W1IuOYxVSFw==
@ -1740,11 +1735,6 @@ lodash.uniq@^4.5.0:
resolved "https://registry.yarnpkg.com/lodash.uniq/-/lodash.uniq-4.5.0.tgz#d0225373aeb652adc1bc82e4945339a842754773" resolved "https://registry.yarnpkg.com/lodash.uniq/-/lodash.uniq-4.5.0.tgz#d0225373aeb652adc1bc82e4945339a842754773"
integrity sha1-0CJTc662Uq3BvILklFM5qEJ1R3M= integrity sha1-0CJTc662Uq3BvILklFM5qEJ1R3M=
lodash@^4.17.21:
version "4.17.21"
resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c"
integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==
lowercase-keys@^1.0.0, lowercase-keys@^1.0.1: lowercase-keys@^1.0.0, lowercase-keys@^1.0.1:
version "1.0.1" version "1.0.1"
resolved "https://registry.yarnpkg.com/lowercase-keys/-/lowercase-keys-1.0.1.tgz#6f9e30b47084d971a7c820ff15a6c5167b74c26f" resolved "https://registry.yarnpkg.com/lowercase-keys/-/lowercase-keys-1.0.1.tgz#6f9e30b47084d971a7c820ff15a6c5167b74c26f"

View File

@ -3,7 +3,7 @@ import { HttpRemoteSpace } from "./space";
import { safeRun } from "./util"; import { safeRun } from "./util";
import { io } from "socket.io-client"; import { io } from "socket.io-client";
let socket = io("http://localhost:3000"); let socket = io(`http://${location.hostname}:3000`);
let editor = new Editor( let editor = new Editor(
new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket), new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket),

View File

@ -10,6 +10,7 @@ import {
receiveUpdates, receiveUpdates,
sendableUpdates, sendableUpdates,
} from "@codemirror/collab"; } from "@codemirror/collab";
import { RangeSetBuilder, Range } from "@codemirror/rangeset";
import { EditorState, StateEffect, StateField, Text } from "@codemirror/state"; import { EditorState, StateEffect, StateField, Text } from "@codemirror/state";
import { import {
Decoration, Decoration,
@ -19,81 +20,46 @@ import {
ViewUpdate, ViewUpdate,
WidgetType, WidgetType,
} from "@codemirror/view"; } from "@codemirror/view";
import { cursorEffect } from "./cursorEffect"; import { Cursor, cursorEffect } from "./cursorEffect";
import { HttpRemoteSpace } from "./space"; import { HttpRemoteSpace } from "./space";
const throttleInterval = 250;
const throttle = (func: () => void, limit: number) => {
let timer: any = null;
return function () {
if (!timer) {
timer = setTimeout(() => {
func();
timer = null;
}, limit);
}
};
};
//@ts-ignore
window.throttle = throttle;
export class Document { export class Document {
text: Text; text: Text;
version: number; version: number;
cursors: Map<string, Cursor>;
constructor(text: Text, version: number) { constructor(text: Text, version: number, cursors: Map<string, Cursor>) {
this.text = text; this.text = text;
this.version = version; this.version = version;
this.cursors = cursors;
} }
} }
let meId = "";
const cursorField = StateField.define<DecorationSet>({
create() {
return Decoration.none;
},
update(cursors, tr) {
cursors = cursors.map(tr.changes);
for (let e of tr.effects) {
if (e.is(cursorEffect)) {
const newCursorDecoration = Decoration.widget({
widget: new CursorWidget(e.value.userId, e.value.color, e.value.pos),
side: 1,
});
cursors = cursors.update({
filter: (from, to, d) => !d.eq(newCursorDecoration),
// add: [newCursorDecoration.range(e.value.pos)],
sort: true,
});
}
}
// console.log("New cursors", cursors.size);
return cursors;
},
provide: (f) => EditorView.decorations.from(f),
fromJSON(cursorJSONs) {
let cursors = [];
for (let cursorJSON of cursorJSONs) {
cursors.push(
Decoration.widget({
widget: new CursorWidget(
cursorJSON.userId,
cursorJSON.color,
cursorJSON.pos
),
side: 1,
}).range(cursorJSON.pos)
);
}
return Decoration.set(cursors);
},
toJSON(cursors) {
let cursor = cursors.iter();
let results = [];
while (cursor.value) {
results.push({ ...cursor.value.spec.widget });
cursor.next();
}
return results;
},
});
class CursorWidget extends WidgetType { class CursorWidget extends WidgetType {
userId: string; userId: string;
color: string; color: string;
pos: number;
constructor(userId: string, color: string, pos: number) { constructor(userId: string, color: string) {
super(); super();
this.userId = userId; this.userId = userId;
this.color = color; this.color = color;
this.pos = pos;
} }
eq(other: CursorWidget) { eq(other: CursorWidget) {
@ -104,9 +70,13 @@ class CursorWidget extends WidgetType {
let el = document.createElement("span"); let el = document.createElement("span");
el.className = "other-cursor"; el.className = "other-cursor";
el.style.backgroundColor = this.color; el.style.backgroundColor = this.color;
if (this.userId == meId) { // let nameSpanContainer = document.createElement("span");
el.style.display = "none"; // nameSpanContainer.className = "cursor-label-container";
} // let nameSpanLabel = document.createElement("label");
// nameSpanLabel.className = "cursor-label";
// nameSpanLabel.textContent = this.userId;
// nameSpanContainer.appendChild(nameSpanLabel);
// el.appendChild(nameSpanContainer);
return el; return el;
} }
} }
@ -114,28 +84,71 @@ class CursorWidget extends WidgetType {
export function collabExtension( export function collabExtension(
pageName: string, pageName: string,
clientID: string, clientID: string,
startVersion: number, doc: Document,
space: HttpRemoteSpace, space: HttpRemoteSpace,
reloadCallback: () => void reloadCallback: () => void
) { ) {
meId = clientID;
let plugin = ViewPlugin.fromClass( let plugin = ViewPlugin.fromClass(
class { class {
private pushing = false; private pushing = false;
private done = false; private done = false;
private failedPushes = 0; private failedPushes = 0;
decorations: DecorationSet;
private cursorPositions: Map<string, Cursor> = doc.cursors;
throttledPush: () => void;
buildDecorations(view: EditorView) {
let builder = new RangeSetBuilder<Decoration>();
let list = [];
for (let [userId, def] of this.cursorPositions) {
if (userId == clientID) {
continue;
}
list.push({
pos: def.pos,
widget: Decoration.widget({
widget: new CursorWidget(userId, def.color),
side: 1,
}),
});
}
list
.sort((a, b) => a.pos - b.pos)
.forEach((r) => {
builder.add(r.pos, r.pos, r.widget);
});
return builder.finish();
}
constructor(private view: EditorView) { constructor(private view: EditorView) {
if (pageName) { if (pageName) {
this.pull(); this.pull();
} }
this.decorations = this.buildDecorations(view);
this.throttledPush = throttle(() => this.push(), throttleInterval);
console.log("Created collabo plug");
space.addEventListener("cursors", this.updateCursors);
}
updateCursors(cursorEvent: any) {
this.cursorPositions = new Map();
console.log("Received new cursor snapshot", cursorEvent.detail, this);
for (let userId in cursorEvent.detail) {
this.cursorPositions.set(userId, cursorEvent.detail[userId]);
}
} }
update(update: ViewUpdate) { update(update: ViewUpdate) {
if (update.selectionSet) { if (update.selectionSet) {
let pos = update.state.selection.main.head; let pos = update.state.selection.main.head;
console.log("New pos", pos); // if (pos === 0) {
// return; // console.error("Warning: position reset? at 0");
// console.trace();
// }
setTimeout(() => { setTimeout(() => {
update.view.dispatch({ update.view.dispatch({
effects: [ effects: [
@ -144,17 +157,32 @@ export function collabExtension(
}); });
}); });
} }
let foundEffect = false; let foundCursorMoves = new Set<string>();
for (let tx of update.transactions) { for (let tx of update.transactions) {
if (tx.effects.some((e) => e.is(cursorEffect))) { let cursorMove = tx.effects.find((e) => e.is(cursorEffect));
foundEffect = true; if (cursorMove) {
foundCursorMoves.add(cursorMove.value.userId);
} }
} }
if (update.docChanged || foundEffect) this.push(); // Update cursors
for (let cursor of this.cursorPositions.values()) {
if (foundCursorMoves.has(cursor.userId)) {
// Already got a cursor update for this one, no need to manually map
continue;
}
update.transactions.forEach((tx) => {
cursor.pos = tx.changes.mapPos(cursor.pos);
});
}
this.decorations = this.buildDecorations(update.view);
if (update.docChanged || foundCursorMoves.size > 0) {
this.throttledPush();
}
} }
async push() { async push() {
let updates = sendableUpdates(this.view.state); let updates = sendableUpdates(this.view.state);
// TODO: compose multiple updates into one
if (this.pushing || !updates.length) return; if (this.pushing || !updates.length) return;
console.log("Updates", updates); console.log("Updates", updates);
this.pushing = true; this.pushing = true;
@ -178,7 +206,8 @@ export function collabExtension(
// Regardless of whether the push failed or new updates came in // Regardless of whether the push failed or new updates came in
// while it was running, try again if there's updates remaining // while it was running, try again if there's updates remaining
if (sendableUpdates(this.view.state).length) { if (sendableUpdates(this.view.state).length) {
setTimeout(() => this.push(), 100); // setTimeout(() => this.push(), 100);
this.throttledPush();
} }
} }
@ -187,26 +216,43 @@ export function collabExtension(
let version = getSyncedVersion(this.view.state); let version = getSyncedVersion(this.view.state);
let updates = await space.pullUpdates(pageName, version); let updates = await space.pullUpdates(pageName, version);
let d = receiveUpdates(this.view.state, updates); let d = receiveUpdates(this.view.state, updates);
console.log("Received", d); // Pull out cursor updates and update local state
for (let update of updates) {
if (update.effects) {
for (let effect of update.effects) {
if (effect.is(cursorEffect)) {
this.cursorPositions.set(effect.value.userId, {
userId: effect.value.userId,
pos: effect.value.pos,
color: effect.value.color,
});
}
}
}
}
this.view.dispatch(d); this.view.dispatch(d);
} }
} }
destroy() { destroy() {
this.done = true; this.done = true;
space.removeEventListener("cursors", this.updateCursors);
} }
},
{
decorations: (v) => v.decorations,
} }
); );
return [ return [
collab({ collab({
startVersion, startVersion: doc.version,
clientID, clientID,
sharedEffects: (tr) => { sharedEffects: (tr) => {
return tr.effects.filter((e) => e.is(cursorEffect)); return tr.effects.filter((e) => e.is(cursorEffect));
}, },
}), }),
cursorField, // cursorField,
plugin, plugin,
]; ];
} }

View File

@ -1,10 +1,11 @@
import { StateEffect } from "@codemirror/state"; import { StateEffect } from "@codemirror/state";
export type Cursor = {
export const cursorEffect = StateEffect.define<{
pos: number; pos: number;
userId: string; userId: string;
color: string; color: string;
}>({ };
export const cursorEffect = StateEffect.define<Cursor>({
map({ pos, userId, color }, changes) { map({ pos, userId, color }, changes) {
return { pos: changes.mapPos(pos), userId, color }; return { pos: changes.mapPos(pos), userId, color };
}, },

View File

@ -65,6 +65,7 @@ import { collabExtension } from "./collab";
import { Document } from "./collab"; import { Document } from "./collab";
import { EditorSelection } from "@codemirror/state"; import { EditorSelection } from "@codemirror/state";
import { Cursor } from "./cursorEffect";
class PageState { class PageState {
scrollTop: number; scrollTop: number;
@ -100,7 +101,10 @@ export class Editor implements AppEventDispatcher {
this.viewDispatch = () => {}; this.viewDispatch = () => {};
this.render(parent); this.render(parent);
this.editorView = new EditorView({ this.editorView = new EditorView({
state: this.createEditorState("", new Document(Text.of([""]), 0)), state: this.createEditorState(
"",
new Document(Text.of([""]), 0, new Map<string, Cursor>())
),
parent: document.getElementById("editor")!, parent: document.getElementById("editor")!,
}); });
this.pageNavigator = new PathPageNavigator(); this.pageNavigator = new PathPageNavigator();
@ -238,7 +242,7 @@ export class Editor implements AppEventDispatcher {
collabExtension( collabExtension(
pageName, pageName,
this.space.socket.id, this.space.socket.id,
doc.version, doc,
this.space, this.space,
this.reloadPage.bind(this) this.reloadPage.bind(this)
), ),
@ -435,6 +439,9 @@ export class Editor implements AppEventDispatcher {
if (!pageState) { if (!pageState) {
pageState = new PageState(0, editorState.selection); pageState = new PageState(0, editorState.selection);
this.openPages.set(pageName, pageState!); this.openPages.set(pageName, pageState!);
editorView.dispatch({
selection: { anchor: 0 },
});
} else { } else {
// Restore state // Restore state
console.log("Restoring selection state"); console.log("Restoring selection state");

View File

@ -134,7 +134,7 @@ const TagLink: MarkdownConfig = {
const WikiMarkdown = commonmark.configure([ const WikiMarkdown = commonmark.configure([
WikiLink, WikiLink,
AtMention, AtMention,
TagLink, // TagLink,
TaskList, TaskList,
UnmarkedUrl, UnmarkedUrl,
Comment, Comment,

View File

@ -4,7 +4,7 @@ import { Update } from "@codemirror/collab";
import { Transaction, Text, ChangeSet } from "@codemirror/state"; import { Transaction, Text, ChangeSet } from "@codemirror/state";
import { Document } from "./collab"; import { Document } from "./collab";
import { cursorEffect } from "./cursorEffect"; import { Cursor, cursorEffect } from "./cursorEffect";
export interface Space { export interface Space {
listPages(): Promise<PageMeta[]>; listPages(): Promise<PageMeta[]>;
@ -32,6 +32,10 @@ export class HttpRemoteSpace extends EventTarget implements Space {
socket.on("reload", (pageName: string) => { socket.on("reload", (pageName: string) => {
this.dispatchEvent(new CustomEvent("reload", { detail: pageName })); this.dispatchEvent(new CustomEvent("reload", { detail: pageName }));
}); });
socket.on("cursors", (cursors) => {
this.dispatchEvent(new CustomEvent("cursors", { detail: cursors }));
});
} }
private wsCall(eventName: string, ...args: any[]): Promise<any> { private wsCall(eventName: string, ...args: any[]): Promise<any> {
@ -68,7 +72,6 @@ export class HttpRemoteSpace extends EventTarget implements Space {
effects: u.effects?.map((e) => cursorEffect.of(e.value)), effects: u.effects?.map((e) => cursorEffect.of(e.value)),
clientID: u.clientID, clientID: u.clientID,
})); }));
console.log("Got updates", ups);
return ups; return ups;
} }
@ -85,8 +88,12 @@ export class HttpRemoteSpace extends EventTarget implements Space {
async openPage(name: string): Promise<Document> { async openPage(name: string): Promise<Document> {
this.reqId++; this.reqId++;
let [version, text] = await this.wsCall("openPage", name); let pageJSON = await this.wsCall("openPage", name);
return new Document(Text.of(text), version); let cursors = new Map<string, Cursor>();
for (let p in pageJSON.cursors) {
cursors.set(p, pageJSON.cursors[p]);
}
return new Document(Text.of(pageJSON.text), pageJSON.version, cursors);
} }
async closePage(name: string): Promise<void> { async closePage(name: string): Promise<void> {

View File

@ -11,11 +11,34 @@
.other-cursor { .other-cursor {
display: inline-block; display: inline-block;
width: 1px; width: 2px;
margin-right: -1px; margin-right: -2px;
height: 1em; height: 1em;
} }
.cursor-label-container {
// display: none;
position: relative;
top: 2ch;
float: left;
width: 120px;
height: 2.2ch;
margin: 0;
padding: 0;
overflow: hidden;
font-family: Arial, Helvetica, sans-serif;
color: #fff;
border: gray 1px solid;
background-color: purple;
// font-size: 0.5em;
}
.cursor-label-container label {
margin: 0;
padding: 0;
font-size: 0.7em;
}
.cm-selectionBackground { .cm-selectionBackground {
background-color: #d7e1f6 !important; background-color: #d7e1f6 !important;
} }