Collab work

deno-express
Zef Hemel 2023-05-29 17:05:20 +02:00
parent ef7bc0beae
commit b5a8cd7d1b
12 changed files with 382 additions and 35 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ website_build
deno.lock
fly.toml
env.sh
node_modules

View File

@ -13,12 +13,23 @@ await esbuild.build({
sourcemap: false,
minify: false,
plugins: [
// ESBuild plugin to make npm modules external
{
name: "npm-external",
setup(build: any) {
build.onResolve({ filter: /^npm:/ }, (args: any) => {
return {
path: args.path,
external: true,
};
});
},
},
{
name: "json",
setup: (build) =>
build.onLoad({ filter: /\.json$/ }, () => ({ loader: "json" })),
},
...denoPlugins({
importMapURL: new URL("./import_map.json", import.meta.url)
.toString(),

View File

@ -7,3 +7,7 @@ export function start(serverUrl: string, token: string, username: string) {
export function stop() {
return syscall("collab.stop");
}
export function ping(clientId: string, currentPage: string) {
return syscall("collab.ping", clientId, currentPage);
}

View File

@ -110,6 +110,7 @@ export async function detectPage() {
console.error("Error parsing YAML", e);
}
}
await ping();
}
export function shareNoop() {
@ -160,3 +161,48 @@ export function writeFileCollab(name: string): FileMeta {
perm: "rw",
};
}
const clientId = nanoid();
let currentCollabId: string | undefined;
const localCollabServer = location.protocol === "http:"
? `ws://${location.host}/.ws-collab`
: `wss://${location.host}/.ws-collab`;
async function ping() {
try {
const currentPage = await editor.getCurrentPage();
const { collabId } = await collab.ping(
clientId,
currentPage,
);
console.log("Collab ID", collabId);
if (!collabId && currentCollabId) {
// Stop collab
console.log("Stopping collab");
// editor.flashNotification("Closing real-time collaboration mode.");
currentCollabId = undefined;
await collab.stop();
} else if (collabId && collabId !== currentCollabId) {
// Start collab
console.log("Starting collab");
editor.flashNotification("Opening page in real-time collaboration mode.");
currentCollabId = collabId;
await collab.start(
localCollabServer,
`${collabId}/${currentPage}`,
"you",
);
}
} catch (e: any) {
// console.error("Ping error", e);
if (e.message.includes("Failed to fetch") && currentCollabId) {
console.log("Offline, stopping collab");
currentCollabId = undefined;
await collab.stop();
}
}
}
setInterval(() => {
ping().catch(console.error);
}, 5000);

39
server/collab.test.ts Normal file
View File

@ -0,0 +1,39 @@
import { assert, assertEquals } from "../test_deps.ts";
import { CollabServer } from "./collab.ts";
Deno.test("Collab server", async () => {
const collabServer = new CollabServer(null as any);
console.log("Client 1 joins page 1");
assertEquals(collabServer.ping("client1", "page1"), {});
assertEquals(collabServer.clients.size, 1);
assertEquals(collabServer.pages.size, 1);
console.log("Client 1 joins page 2");
assertEquals(collabServer.ping("client1", "page2"), {});
assertEquals(collabServer.clients.size, 1);
assertEquals(collabServer.pages.size, 1);
console.log("Client 2 joins to page 2, collab id created");
const collabId = collabServer.ping("client2", "page2").collabId;
assertEquals(collabServer.clients.size, 2);
assert(collabId !== undefined);
console.log("Client 2 moves to page 1, collab id destroyed");
assertEquals(collabServer.ping("client2", "page1"), {});
assertEquals(collabServer.ping("client1", "page2"), {});
console.log("Going to cleanup, which should have no effect");
collabServer.cleanup(50);
assertEquals(collabServer.clients.size, 2);
collabServer.ping("client2", "page2");
console.log("Going to sleep 20ms");
await sleep(20);
console.log("Then client 1 pings, but client 2 does not");
assertEquals(collabServer.ping("client1", "page2"), {});
await sleep(20);
console.log("Going to cleanup, which should clean client 2");
collabServer.cleanup(35);
assertEquals(collabServer.clients.size, 1);
assertEquals(collabServer.pages.get("page2")!.collabId, undefined);
console.log(collabServer);
});
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

191
server/collab.ts Normal file
View File

@ -0,0 +1,191 @@
import { Hocuspocus } from "npm:@hocuspocus/server@2.0.6";
import { getAvailablePortSync } from "https://deno.land/x/port@1.0.0/mod.ts";
import { nanoid } from "https://esm.sh/nanoid@4.0.0";
import { race, timeout } from "../common/async_util.ts";
import { Application } from "./deps.ts";
import { SpacePrimitives } from "../common/spaces/space_primitives.ts";
type CollabPage = {
clients: Set<string>; // clientIds
collabId?: string;
};
const pingInterval = 5000;
export class CollabServer {
clients: Map<string, { openPage: string; lastPing: number }> = new Map();
pages: Map<string, CollabPage> = new Map();
constructor(private spacePrimitives: SpacePrimitives) {
}
start() {
setInterval(() => {
this.cleanup(3 * pingInterval);
}, pingInterval);
}
ping(clientId: string, currentPage: string): { collabId?: string } {
let clientState = this.clients.get(clientId);
let collabId: string | undefined;
if (!clientState) {
clientState = {
openPage: "",
lastPing: Date.now(),
};
} else {
clientState.lastPing = Date.now();
}
if (currentPage !== clientState.openPage) {
// Client switched pages
// Update last page record
const lastCollabPage = this.pages.get(clientState.openPage);
if (lastCollabPage) {
lastCollabPage.clients.delete(clientId);
if (lastCollabPage.clients.size === 0) {
// Cleanup
this.pages.delete(clientState.openPage);
} else {
if (lastCollabPage.clients.size === 1) {
delete lastCollabPage.collabId;
}
this.pages.set(clientState.openPage, lastCollabPage);
}
}
// Update new page
let nextCollabPage = this.pages.get(currentPage);
if (!nextCollabPage) {
nextCollabPage = {
clients: new Set(),
};
}
nextCollabPage.clients.add(clientId);
if (nextCollabPage.clients.size === 2) {
// Create a new collabId
nextCollabPage.collabId = nanoid();
}
clientState.openPage = currentPage;
this.pages.set(currentPage, nextCollabPage);
collabId = nextCollabPage.collabId;
} else {
// Page didn't change
collabId = this.pages.get(currentPage)?.collabId;
}
this.clients.set(clientId, clientState);
if (collabId) {
return { collabId };
} else {
return {};
}
}
cleanup(timeout: number) {
// Clean up clients that haven't pinged for some time
for (const [clientId, clientState] of this.clients) {
if (Date.now() - clientState.lastPing > timeout) {
console.log("[Collab]", "Ejecting client", clientId);
this.clients.delete(clientId);
const collabPage = this.pages.get(clientState.openPage);
if (collabPage) {
collabPage.clients.delete(clientId);
if (collabPage.clients.size === 0) {
this.pages.delete(clientState.openPage);
} else {
if (collabPage.clients.size === 1) {
delete collabPage.collabId;
}
this.pages.set(clientState.openPage, collabPage);
}
}
}
}
}
route(app: Application) {
// The way this works is that we spin up a separate WS server locally and then proxy requests to it
// This is the only way I could get Hocuspocus to work with Deno
const internalPort = getAvailablePortSync();
const hocuspocus = new Hocuspocus({
port: internalPort,
address: "localhost",
quiet: true,
onLoadDocument: async (doc) => {
console.log("[Hocuspocus]", "Requesting doc load", doc.documentName);
const pageName = doc.documentName.split("/").slice(1).join("/");
try {
const yText = doc.document.getText("codemirror");
const { data } = await this.spacePrimitives.readFile(
`${pageName}.md`,
);
yText.insert(0, new TextDecoder().decode(data));
console.log("[Hocuspocus]", "Loaded document from space");
return doc.document;
} catch (e) {
console.error("Error loading doc", e);
}
},
onDisconnect: (client) => {
console.log("[Hocuspocus]", "Client disconnected", client.clientsCount);
if (client.clientsCount === 0) {
console.log(
"[Hocuspocus]",
"Last client disconnected from",
client.documentName,
"purging from memory",
);
hocuspocus.documents.delete(client.documentName);
}
return Promise.resolve();
},
});
hocuspocus.listen();
app.use((ctx) => {
if (ctx.request.url.pathname === "/.ws") {
const sock = ctx.upgrade();
sock.onmessage = (e) => {
console.log("WS: Got message", e.data);
};
}
// Websocket proxy to hocuspocus
if (ctx.request.url.pathname === "/.ws-collab") {
const sock = ctx.upgrade();
const ws = new WebSocket(`ws://localhost:${internalPort}`);
const wsReady = race([
new Promise<void>((resolve) => {
ws.onopen = () => {
resolve();
};
}),
timeout(1000),
]).catch(() => {
console.error("Timeout waiting for collab to open websocket");
sock.close();
});
sock.onmessage = (e) => {
// console.log("Got message", e);
wsReady.then(() => ws.send(e.data)).catch(console.error);
};
sock.onclose = () => {
if (ws.OPEN) {
ws.close();
}
};
ws.onmessage = (e) => {
if (sock.OPEN) {
sock.send(e.data);
} else {
console.error("Got message from websocket but socket is not open");
}
};
ws.onclose = () => {
if (sock.OPEN) {
sock.close();
}
};
}
});
}
}

View File

@ -7,6 +7,7 @@ import { performLocalFetch } from "../common/proxy_fetch.ts";
import { BuiltinSettings } from "../web/types.ts";
import { gitIgnoreCompiler } from "./deps.ts";
import { FilteredSpacePrimitives } from "../common/spaces/filtered_space_primitives.ts";
import { CollabServer } from "./collab.ts";
export type ServerOptions = {
hostname: string;
@ -29,6 +30,7 @@ export class HttpServer {
clientAssetBundle: AssetBundle;
settings?: BuiltinSettings;
spacePrimitives: SpacePrimitives;
collab: CollabServer;
constructor(
spacePrimitives: SpacePrimitives,
@ -62,6 +64,8 @@ export class HttpServer {
}
},
);
this.collab = new CollabServer(this.spacePrimitives);
this.collab.start();
}
// Replaces some template variables in index.html in a rather ad-hoc manner, but YOLO
@ -123,7 +127,8 @@ export class HttpServer {
this.app.use(({ request, response }, next) => {
if (
!request.url.pathname.startsWith("/.fs") &&
request.url.pathname !== "/.auth"
request.url.pathname !== "/.auth" &&
!request.url.pathname.startsWith("/.ws")
) {
response.headers.set("Content-type", "text/html");
response.body = this.renderIndexHtml();
@ -138,6 +143,8 @@ export class HttpServer {
this.app.use(fsRouter.routes());
this.app.use(fsRouter.allowedMethods());
this.collab.route(this.app);
this.abortController = new AbortController();
const listenOptions: any = {
hostname: this.hostname,
@ -273,6 +280,15 @@ export class HttpServer {
});
return;
}
case "ping": {
response.headers.set("Content-Type", "application/json");
// console.log("Got ping", body);
response.body = JSON.stringify(
this.collab.ping(body.clientId, body.page),
);
// console.log(this.collab);
return;
}
default:
response.headers.set("Content-Type", "text/plain");
response.status = 400;

View File

@ -1,4 +1,4 @@
import { Extension, WebsocketProvider, Y, yCollab } from "../deps.ts";
import { Extension, HocuspocusProvider, Y, yCollab } from "../deps.ts";
const userColors = [
{ color: "#30bced", light: "#30bced33" },
@ -12,27 +12,27 @@ const userColors = [
];
export class CollabState {
ydoc: Y.Doc;
collabProvider: WebsocketProvider;
ytext: Y.Text;
yundoManager: Y.UndoManager;
public ytext: Y.Text;
private collabProvider: HocuspocusProvider;
private yundoManager: Y.UndoManager;
constructor(serverUrl: string, token: string, username: string) {
this.ydoc = new Y.Doc();
this.collabProvider = new WebsocketProvider(
serverUrl,
token,
this.ydoc,
);
constructor(serverUrl: string, name: string, username: string) {
this.collabProvider = new HocuspocusProvider({
url: serverUrl,
name: name,
});
this.collabProvider.on("status", (e: any) => {
console.log("Collab status change", e);
});
this.collabProvider.on("sync", (e: any) => {
console.log("Sync status", e);
});
// this.collabProvider.on("sync", (e: any) => {
// console.log("Sync status", e);
// });
// this.collabProvider.on("synced", (e: any) => {
// console.log("Synced status", e);
// });
this.ytext = this.ydoc.getText("codemirror");
this.ytext = this.collabProvider.document.getText("codemirror");
this.yundoManager = new Y.UndoManager(this.ytext);
const randomColor =
@ -46,6 +46,7 @@ export class CollabState {
}
stop() {
this.collabProvider.disconnect();
this.collabProvider.destroy();
}

View File

@ -21,7 +21,7 @@ export {
yCollab,
yUndoManagerKeymap,
} from "https://esm.sh/y-codemirror.next@0.3.2?external=yjs,@codemirror/state,@codemirror/commands,@codemirror/history,@codemirror/view";
export { WebsocketProvider } from "https://esm.sh/y-websocket@1.4.5?external=yjs";
export { HocuspocusProvider } from "https://esm.sh/@hocuspocus/provider@2.0.6?external=yjs,ws";
// Vim mode
export {

View File

@ -389,7 +389,8 @@ export class Editor {
this.space.on({
pageChanged: (meta) => {
if (this.currentPage === meta.name) {
// Only reload when watching the current page (to avoid reloading when switching pages and in collab mode)
if (this.space.watchInterval && this.currentPage === meta.name) {
console.log("Page changed elsewhere, reloading");
this.flashNotification("Page changed elsewhere, reloading");
this.reloadPage();
@ -1144,8 +1145,7 @@ export class Editor {
await this.save(true);
// And stop the collab session
if (this.collabState) {
this.collabState.stop();
this.collabState = undefined;
this.stopCollab();
}
}
}
@ -1226,10 +1226,18 @@ export class Editor {
if (pageState) {
// Restore state
editorView.scrollDOM.scrollTop = pageState!.scrollTop;
editorView.dispatch({
selection: pageState.selection,
scrollIntoView: true,
});
try {
editorView.dispatch({
selection: pageState.selection,
scrollIntoView: true,
});
} catch {
// This is fine, just go to the top
editorView.dispatch({
selection: { anchor: 0 },
scrollIntoView: true,
});
}
} else {
editorView.scrollDOM.scrollTop = 0;
editorView.dispatch({
@ -1508,12 +1516,24 @@ export class Editor {
}
const initialText = this.editorView!.state.sliceDoc();
this.collabState = new CollabState(serverUrl, token, username);
this.collabState.collabProvider.once("sync", (synced: boolean) => {
if (this.collabState?.ytext.toString() === "") {
console.log("Synced value is empty, putting back original text");
this.collabState?.ytext.insert(0, initialText);
}
});
// this.collabState.collabProvider.on("synced", () => {
// if (this.collabState?.ytext.toString() === "") {
// console.error("Synced value is empty, putting back original text");
// this.collabState?.ytext.insert(0, initialText);
// }
// });
this.rebuildEditorState();
// Don't watch for local changes in this mode
this.space.unwatch();
}
stopCollab() {
if (this.collabState) {
this.collabState.stop();
this.collabState = undefined;
this.rebuildEditorState();
}
// Start file watching again
this.space.watch();
}
}

View File

@ -51,7 +51,7 @@ export class Space extends EventEmitter<SpaceEvents> {
super();
this.kvStore.get("imageHeightCache").then((cache) => {
if (cache) {
console.log("Loaded image height cache from KV store", cache);
// console.log("Loaded image height cache from KV store", cache);
this.imageHeightCache = cache;
}
});

View File

@ -14,7 +14,25 @@ export function collabSyscalls(editor: Editor): SysCallMapping {
"collab.stop": (
_ctx,
) => {
editor.collabState?.stop();
editor.stopCollab();
},
"collab.ping": async (
_ctx,
clientId: string,
currentPage: string,
) => {
const resp = await editor.remoteSpacePrimitives.authenticatedFetch(
editor.remoteSpacePrimitives.url,
{
method: "POST",
body: JSON.stringify({
operation: "ping",
clientId,
page: currentPage,
}),
},
);
return resp.json();
},
};
}