242 lines
7.7 KiB
TypeScript
242 lines
7.7 KiB
TypeScript
import { getAvailablePortSync } from "https://deno.land/x/port@1.0.0/mod.ts";
|
|
import { nanoid } from "https://esm.sh/nanoid@4.0.0";
|
|
import { race, timeout } from "../common/async_util.ts";
|
|
import { Application } from "./deps.ts";
|
|
import { SpacePrimitives } from "../common/spaces/space_primitives.ts";
|
|
import { collabPingInterval } from "../plugs/collab/constants.ts";
|
|
import { Hocuspocus } from "./deps.ts";
|
|
|
|
type CollabPage = {
|
|
clients: Map<string, number>; // clientId -> lastPing
|
|
collabId?: string;
|
|
// The currently elected provider of the initial document
|
|
masterClientId: string;
|
|
};
|
|
|
|
export class CollabServer {
|
|
clients: Map<string, { openPage: string; lastUpdate: number }> = new Map(); // clientId -> openPage
|
|
pages: Map<string, CollabPage> = new Map();
|
|
yCollabServer?: Hocuspocus;
|
|
|
|
constructor(private spacePrimitives: SpacePrimitives) {
|
|
}
|
|
|
|
start() {
|
|
setInterval(() => {
|
|
this.cleanup(3 * collabPingInterval);
|
|
}, collabPingInterval);
|
|
}
|
|
|
|
updatePresence(
|
|
clientId: string,
|
|
currentPage: string,
|
|
): { collabId?: string } {
|
|
let client = this.clients.get(clientId);
|
|
if (!client) {
|
|
client = { openPage: "", lastUpdate: 0 };
|
|
this.clients.set(clientId, client);
|
|
}
|
|
client.lastUpdate = Date.now();
|
|
if (currentPage !== client.openPage) {
|
|
// Client switched pages
|
|
// Update last page record
|
|
const lastCollabPage = this.pages.get(client.openPage);
|
|
if (lastCollabPage) {
|
|
lastCollabPage.clients.delete(clientId);
|
|
if (lastCollabPage.clients.size === 1) {
|
|
delete lastCollabPage.collabId;
|
|
}
|
|
|
|
if (lastCollabPage.clients.size === 0) {
|
|
this.pages.delete(client.openPage);
|
|
} else {
|
|
// Elect a new master client
|
|
if (lastCollabPage.masterClientId === clientId) {
|
|
// Any is fine, really
|
|
lastCollabPage.masterClientId =
|
|
[...lastCollabPage.clients.keys()][0];
|
|
}
|
|
}
|
|
}
|
|
// Ok, let's update our records now
|
|
client.openPage = currentPage;
|
|
}
|
|
|
|
// Update new page
|
|
let nextCollabPage = this.pages.get(currentPage);
|
|
if (!nextCollabPage) {
|
|
// Newly opened page (no other clients on this page right now)
|
|
nextCollabPage = {
|
|
clients: new Map(),
|
|
masterClientId: clientId,
|
|
};
|
|
this.pages.set(currentPage, nextCollabPage);
|
|
}
|
|
// Register last ping from us
|
|
nextCollabPage.clients.set(clientId, Date.now());
|
|
|
|
if (nextCollabPage.clients.size > 1 && !nextCollabPage.collabId) {
|
|
// Create a new collabId
|
|
nextCollabPage.collabId = nanoid();
|
|
}
|
|
// console.log("State", this.pages);
|
|
if (nextCollabPage.collabId) {
|
|
// We will now expose this collabId, except when we're just starting this session
|
|
// in which case we'll wait for the original client to publish the document
|
|
const existingyCollabSession = this.yCollabServer?.documents.get(
|
|
buildCollabId(nextCollabPage.collabId, `${currentPage}.md`),
|
|
);
|
|
if (existingyCollabSession) {
|
|
// console.log("Found an existing collab session already, let's join!");
|
|
return { collabId: nextCollabPage.collabId };
|
|
} else if (clientId === nextCollabPage.masterClientId) {
|
|
// console.log("We're the master, so we should connect");
|
|
return { collabId: nextCollabPage.collabId };
|
|
} else {
|
|
// We're not the first client, so we need to wait for the first client to connect
|
|
// console.log("We're not the master, so we should wait");
|
|
return {};
|
|
}
|
|
} else {
|
|
return {};
|
|
}
|
|
}
|
|
|
|
cleanup(timeout: number) {
|
|
// Clean up pages and their clients that haven't pinged for some time
|
|
for (const [pageName, page] of this.pages) {
|
|
for (const [clientId, lastPing] of page.clients) {
|
|
if (Date.now() - lastPing > timeout) {
|
|
// Eject client
|
|
page.clients.delete(clientId);
|
|
// Elect a new master client
|
|
if (page.masterClientId === clientId && page.clients.size > 0) {
|
|
page.masterClientId = [...page.clients.keys()][0];
|
|
}
|
|
}
|
|
}
|
|
if (page.clients.size === 1) {
|
|
// If there's only one client left, we don't need to keep the collabId around anymore
|
|
delete page.collabId;
|
|
}
|
|
if (page.clients.size === 0) {
|
|
// And if we have no clients left, well...
|
|
this.pages.delete(pageName);
|
|
}
|
|
}
|
|
|
|
for (const [clientId, { lastUpdate }] of this.clients) {
|
|
if (Date.now() - lastUpdate > timeout) {
|
|
// Eject client
|
|
this.clients.delete(clientId);
|
|
}
|
|
}
|
|
}
|
|
|
|
route(app: Application) {
|
|
// The way this works is that we spin up a separate WS server locally and then proxy requests to it
|
|
// This is the only way I could get Hocuspocus to work with Deno
|
|
const internalPort = getAvailablePortSync();
|
|
this.yCollabServer = new Hocuspocus({
|
|
port: internalPort,
|
|
address: "127.0.0.1",
|
|
quiet: true,
|
|
onStoreDocument: async (data) => {
|
|
const [_, path] = splitCollabId(data.documentName);
|
|
const text = data.document.getText("codemirror").toString();
|
|
console.log(
|
|
"[Hocuspocus]",
|
|
"Persisting",
|
|
path,
|
|
"to space on server",
|
|
);
|
|
const meta = await this.spacePrimitives.writeFile(
|
|
path,
|
|
new TextEncoder().encode(text),
|
|
);
|
|
// Broadcast new persisted lastModified date
|
|
data.document.broadcastStateless(
|
|
JSON.stringify({
|
|
type: "persisted",
|
|
path,
|
|
lastModified: meta.lastModified,
|
|
}),
|
|
);
|
|
return;
|
|
},
|
|
onDisconnect: (client) => {
|
|
console.log("[Hocuspocus]", "Client disconnected", client.clientsCount);
|
|
if (client.clientsCount === 0) {
|
|
console.log(
|
|
"[Hocuspocus]",
|
|
"Last client disconnected from",
|
|
client.documentName,
|
|
"purging from memory",
|
|
);
|
|
this.yCollabServer!.documents.delete(client.documentName);
|
|
}
|
|
return Promise.resolve();
|
|
},
|
|
});
|
|
|
|
this.yCollabServer.listen();
|
|
|
|
app.use((ctx) => {
|
|
// if (ctx.request.url.pathname === "/.ws") {
|
|
// const sock = ctx.upgrade();
|
|
// sock.onmessage = (e) => {
|
|
// console.log("WS: Got message", e.data);
|
|
// };
|
|
// }
|
|
// Websocket proxy to hocuspocus
|
|
if (ctx.request.url.pathname === "/.ws-collab") {
|
|
const sock = ctx.upgrade();
|
|
|
|
const ws = new WebSocket(`ws://localhost:${internalPort}`);
|
|
const wsReady = race([
|
|
new Promise<void>((resolve) => {
|
|
ws.onopen = () => {
|
|
resolve();
|
|
};
|
|
}),
|
|
timeout(1000),
|
|
]).catch(() => {
|
|
console.error("Timeout waiting for collab to open websocket");
|
|
sock.close();
|
|
});
|
|
sock.onmessage = (e) => {
|
|
// console.log("Got message", e);
|
|
wsReady.then(() => ws.send(e.data)).catch(console.error);
|
|
};
|
|
sock.onclose = () => {
|
|
if (ws.OPEN) {
|
|
ws.close();
|
|
}
|
|
};
|
|
ws.onmessage = (e) => {
|
|
if (sock.OPEN) {
|
|
sock.send(e.data);
|
|
} else {
|
|
console.error("Got message from websocket but socket is not open");
|
|
}
|
|
};
|
|
ws.onclose = () => {
|
|
if (sock.OPEN) {
|
|
sock.close();
|
|
}
|
|
};
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
function splitCollabId(documentName: string): [string, string] {
|
|
const [collabId, ...pathPieces] = documentName.split("/");
|
|
const path = pathPieces.join("/");
|
|
return [collabId, path];
|
|
}
|
|
|
|
function buildCollabId(collabId: string, path: string): string {
|
|
return `${collabId}/${path}`;
|
|
}
|