Now all communication happens over sockets

pull/3/head
Zef Hemel 2022-03-11 11:49:42 +01:00
parent 5c5e232034
commit da4bf4a9ab
20 changed files with 467 additions and 430 deletions

View File

@ -1,78 +1,16 @@
import { Manifest } from "./types";
import { WebworkerSandbox } from "./worker_sandbox";
interface SysCallMapping {
// TODO: Better typing
[key: string]: any;
}
export class FunctionWorker {
private worker: Worker;
private inited: Promise<any>;
private initCallback: any;
private invokeResolve?: (result?: any) => void;
private invokeReject?: (reason?: any) => void;
private plug: Plug<any>;
constructor(plug: Plug<any>, name: string, code: string) {
// let worker = window.Worker;
this.worker = new Worker(new URL("function_worker.ts", import.meta.url), {
type: "module",
});
this.worker.onmessage = this.onmessage.bind(this);
this.worker.postMessage({
type: "boot",
name: name,
code: code,
});
this.inited = new Promise((resolve) => {
this.initCallback = resolve;
});
this.plug = plug;
}
async onmessage(evt: MessageEvent) {
let data = evt.data;
if (!data) return;
switch (data.type) {
case "inited":
this.initCallback();
break;
case "syscall":
let result = await this.plug.system.syscall(data.name, data.args);
this.worker.postMessage({
type: "syscall-response",
id: data.id,
data: result,
});
break;
case "result":
this.invokeResolve!(data.result);
break;
case "error":
this.invokeReject!(data.reason);
break;
default:
console.error("Unknown message type", data);
}
}
async invoke(args: Array<any>): Promise<any> {
await this.inited;
this.worker.postMessage({
type: "invoke",
args: args,
});
return new Promise((resolve, reject) => {
this.invokeResolve = resolve;
this.invokeReject = reject;
});
}
stop() {
this.worker.terminate();
}
export interface Sandbox {
isLoaded(name: string): boolean;
load(name: string, code: string): Promise<void>;
invoke(name: string, args: any[]): Promise<any>;
stop(): void;
}
export interface PlugLoader<HookT> {
@ -81,12 +19,13 @@ export interface PlugLoader<HookT> {
export class Plug<HookT> {
system: System<HookT>;
private runningFunctions: Map<string, FunctionWorker>;
// private runningFunctions: Map<string, FunctionWorker>;
functionWorker: WebworkerSandbox;
public manifest?: Manifest<HookT>;
constructor(system: System<HookT>, name: string) {
this.system = system;
this.runningFunctions = new Map<string, FunctionWorker>();
this.functionWorker = new WebworkerSandbox(this);
}
async load(manifest: Manifest<HookT>) {
@ -95,16 +34,13 @@ export class Plug<HookT> {
}
async invoke(name: string, args: Array<any>): Promise<any> {
let worker = this.runningFunctions.get(name);
if (!worker) {
worker = new FunctionWorker(
this,
if (!this.functionWorker.isLoaded(name)) {
await this.functionWorker.load(
name,
this.manifest!.functions[name].code!
);
this.runningFunctions.set(name, worker);
}
return await worker.invoke(args);
return await this.functionWorker.invoke(name, args);
}
async dispatchEvent(name: string, data?: any): Promise<any[]> {
@ -122,13 +58,7 @@ export class Plug<HookT> {
}
async stop() {
for (const [functionname, worker] of Object.entries(
this.runningFunctions
)) {
console.log(`Stopping ${functionname}`);
worker.stop();
}
this.runningFunctions = new Map<string, FunctionWorker>();
this.functionWorker.stop();
}
}
@ -141,7 +71,7 @@ export class System<HookT> {
this.registeredSyscalls = {};
}
registerSyscalls(...registrationObjects: Array<SysCallMapping>) {
registerSyscalls(...registrationObjects: SysCallMapping[]) {
for (const registrationObject of registrationObjects) {
for (let p in registrationObject) {
this.registeredSyscalls[p] = registrationObject[p];

7
plugbox/src/sandbox.html Normal file
View File

@ -0,0 +1,7 @@
<html>
<body>
<script type="module">
import "./function_worker";
</script>
</body>
</html>

View File

@ -1,11 +1,13 @@
declare global {
function syscall(id: string, name: string, args: any[]): Promise<any>;
function syscall(id: number, name: string, args: any[]): Promise<any>;
}
import { ControllerMessage, WorkerMessage, WorkerMessageType } from "./types";
import { safeRun } from "./util";
let func: Function | null = null;
let pendingRequests = new Map<string, (result: unknown) => void>();
self.syscall = async (id: string, name: string, args: any[]) => {
let loadedFunctions = new Map<string, Function>();
let pendingRequests = new Map<number, (result: unknown) => void>();
self.syscall = async (id: number, name: string, args: any[]) => {
return await new Promise((resolve, reject) => {
pendingRequests.set(id, resolve);
self.postMessage({
@ -38,51 +40,56 @@ function wrapScript(code: string): string {
return fn["default"].apply(null, arguments);`;
}
self.addEventListener("message", (event) => {
self.addEventListener("message", (event: { data: WorkerMessage }) => {
safeRun(async () => {
let messageEvent = event;
let data = messageEvent.data;
switch (data.type) {
case "boot":
case "load":
console.log("Booting", data.name);
func = new Function(wrapScript(data.code));
loadedFunctions.set(data.name!, new Function(wrapScript(data.code!)));
self.postMessage({
type: "inited",
});
name: data.name,
} as ControllerMessage);
break;
case "invoke":
if (!func) {
throw new Error("No function loaded");
let fn = loadedFunctions.get(data.name!);
if (!fn) {
throw new Error(`Function not loaded: ${data.name}`);
}
try {
let result = await Promise.resolve(func(...(data.args || [])));
let result = await Promise.resolve(fn(...(data.args || [])));
self.postMessage({
type: "result",
id: data.id,
result: result,
});
} as ControllerMessage);
} catch (e: any) {
self.postMessage({
type: "error",
id: data.id,
reason: e.message,
});
} as ControllerMessage);
throw e;
}
break;
case "syscall-response":
let id = data.id;
const lookup = pendingRequests.get(id);
let syscallId = data.id!;
const lookup = pendingRequests.get(syscallId);
if (!lookup) {
console.log(
"Current outstanding requests",
pendingRequests,
"looking up",
id
syscallId
);
throw Error("Invalid request id");
}
pendingRequests.delete(id);
pendingRequests.delete(syscallId);
lookup(data.data);
break;
}
});
});

View File

@ -2,6 +2,28 @@ export type EventHook = {
events: { [key: string]: string[] };
};
export type WorkerMessageType = "load" | "invoke" | "syscall-response";
export type WorkerMessage = {
type: WorkerMessageType;
id?: number;
name?: string;
code?: string;
args?: any[];
data?: any;
};
export type ControllerMessageType = "inited" | "result" | "error" | "syscall";
export type ControllerMessage = {
type: ControllerMessageType;
id?: number;
name?: string;
reason?: string;
args?: any[];
result: any;
};
export interface Manifest<HookT> {
hooks: HookT & EventHook;
functions: {

View File

@ -0,0 +1,86 @@
import { ControllerMessage, WorkerMessage } from "./types";
import { Plug, Sandbox } from "./runtime";
export class WebworkerSandbox implements Sandbox {
private worker: Worker;
private reqId = 0;
private outstandingInits = new Map<string, () => void>();
private outstandingInvocations = new Map<
number,
{ resolve: (result: any) => void; reject: (e: any) => void }
>();
private loadedFunctions = new Set<string>();
constructor(readonly plug: Plug<any>) {
this.worker = new Worker(new URL("sandbox_worker.ts", import.meta.url), {
type: "module",
});
this.worker.onmessage = this.onmessage.bind(this);
}
isLoaded(name: string) {
return this.loadedFunctions.has(name);
}
async load(name: string, code: string): Promise<void> {
this.worker.postMessage({
type: "load",
name: name,
code: code,
} as WorkerMessage);
return new Promise((resolve) => {
this.loadedFunctions.add(name);
this.outstandingInits.set(name, resolve);
});
}
async onmessage(evt: { data: ControllerMessage }) {
let data = evt.data;
if (!data) return;
switch (data.type) {
case "inited":
let initCb = this.outstandingInits.get(data.name!);
initCb && initCb();
this.outstandingInits.delete(data.name!);
break;
case "syscall":
let result = await this.plug.system.syscall(data.name!, data.args!);
this.worker.postMessage({
type: "syscall-response",
id: data.id,
data: result,
} as WorkerMessage);
break;
case "result":
let resultCb = this.outstandingInvocations.get(data.id!);
resultCb && resultCb.resolve(data.result);
break;
case "error":
let errCb = this.outstandingInvocations.get(data.result.id!);
errCb && errCb.reject(data.reason);
break;
default:
console.error("Unknown message type", data);
}
}
async invoke(name: string, args: any[]): Promise<any> {
this.reqId++;
this.worker.postMessage({
type: "invoke",
id: this.reqId,
name,
args,
});
return new Promise((resolve, reject) => {
this.outstandingInvocations.set(this.reqId, { resolve, reject });
});
}
stop() {
this.worker.terminate();
}
}

View File

@ -1,9 +1,9 @@
declare global {
function syscall(id: string, name: string, args: any[]): Promise<any>;
function syscall(id: number, name: string, args: any[]): Promise<any>;
}
export async function syscall(name: string, ...args: any[]): Promise<any> {
let reqId = "" + Math.floor(Math.random() * 1000000);
let reqId = Math.floor(Math.random() * 1000000);
// console.log("Syscall", name, reqId);
return await self.syscall(reqId, name, args);
// return new Promise((resolve, reject) => {

View File

@ -25,8 +25,6 @@ export async function deletePage() {
await syscall("editor.navigate", "start");
console.log("Deleting page from space");
await syscall("space.deletePage", pageName);
console.log("Reloading page list");
await syscall("space.reloadPageList");
}
export async function renamePage() {
@ -50,8 +48,6 @@ export async function renamePage() {
await syscall("space.writePage", newName, text);
console.log("Deleting page from space");
await syscall("space.deletePage", oldName);
console.log("Reloading page list");
await syscall("space.reloadPageList");
console.log("Navigating to new page");
await syscall("editor.navigate", newName);
@ -63,6 +59,7 @@ export async function renamePage() {
for (let pageToUpdate of pageToUpdateSet) {
console.log("Now going to update links in", pageToUpdate);
let { text } = await syscall("space.readPage", pageToUpdate);
console.log("Received text", text);
if (!text) {
// Page likely does not exist, but at least we can skip it
continue;

4
plugs/yarn.lock Normal file
View File

@ -0,0 +1,4 @@
# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
# yarn lockfile v1

View File

@ -8,118 +8,28 @@ import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect";
import { Socket } from "socket.io";
import { DiskStorage } from "./disk_storage";
import { PageMeta } from "./server";
import { Client, Page } from "./types";
import { ClientPageState, Page } from "./types";
import { safeRun } from "./util";
export class RealtimeStorage extends DiskStorage {
export class SocketAPI {
openPages = new Map<string, Page>();
private disconnectClient(client: Client, page: Page) {
page.clients.delete(client);
if (page.clients.size === 0) {
console.log("No more clients for", page.name, "flushing");
this.flushPageToDisk(page.name, page);
this.openPages.delete(page.name);
} else {
page.cursors.delete(client.socket.id);
this.broadcastCursors(page);
}
}
private broadcastCursors(page: Page) {
page.clients.forEach((client) => {
client.socket.emit("cursors", Object.fromEntries(page.cursors.entries()));
});
}
private flushPageToDisk(name: string, page: Page) {
super
.writePage(name, page.text.sliceString(0))
.then((meta) => {
console.log(`Wrote page ${name} to disk`);
page.meta = meta;
})
.catch((e) => {
console.log(`Could not write ${name} to disk:`, e);
});
}
// Override
async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> {
let page = this.openPages.get(pageName);
if (page) {
console.log("Serving page from memory", pageName);
return {
text: page.text.sliceString(0),
meta: page.meta,
};
} else {
return super.readPage(pageName);
}
}
async writePage(pageName: string, text: string): Promise<PageMeta> {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clients) {
client.socket.emit("reload", pageName);
}
this.openPages.delete(pageName);
}
return super.writePage(pageName, text);
}
disconnectPageSocket(socket: Socket, pageName: string) {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clients) {
if (client.socket === socket) {
this.disconnectClient(client, page);
}
}
}
}
connectedSockets: Set<Socket> = new Set();
pageStore: DiskStorage;
constructor(rootPath: string, io: Server) {
super(rootPath);
// setInterval(() => {
// console.log("Currently open pages:", this.openPages.keys());
// }, 10000);
// Disk watcher
fs.watch(
rootPath,
{
recursive: true,
persistent: false,
},
(eventType, filename) => {
safeRun(async () => {
if (path.extname(filename) !== ".md") {
return;
}
let localPath = path.join(rootPath, filename);
let pageName = filename.substring(0, filename.length - 3);
let s = await stat(localPath);
// console.log("Edit in", pageName);
const openPage = this.openPages.get(pageName);
if (openPage) {
if (openPage.meta.lastModified < s.mtime.getTime()) {
console.log("Page changed on disk outside of editor, reloading");
this.openPages.delete(pageName);
for (let client of openPage.clients) {
client.socket.emit("reload", pageName);
}
}
}
});
}
);
this.pageStore = new DiskStorage(rootPath);
this.fileWatcher(rootPath);
io.on("connection", (socket) => {
console.log("Connected", socket.id);
let clientOpenPages = new Set<string>();
this.connectedSockets.add(socket);
const socketOpenPages = new Set<string>();
socket.on("disconnect", () => {
console.log("Disconnected", socket.id);
socketOpenPages.forEach(disconnectPageSocket);
this.connectedSockets.delete(socket);
});
function onCall(eventName: string, cb: (...args: any[]) => Promise<any>) {
socket.on(eventName, (reqId: number, ...args) => {
@ -129,11 +39,23 @@ export class RealtimeStorage extends DiskStorage {
});
}
const _this = this;
function disconnectPageSocket(pageName: string) {
let page = _this.openPages.get(pageName);
if (page) {
for (let client of page.clientStates) {
if (client.socket === socket) {
_this.disconnectClient(client, page);
}
}
}
}
onCall("openPage", async (pageName: string) => {
let page = this.openPages.get(pageName);
if (!page) {
try {
let { text, meta } = await super.readPage(pageName);
let { text, meta } = await this.pageStore.readPage(pageName);
page = new Page(pageName, text, meta);
} catch (e) {
console.log("Creating new page", pageName);
@ -141,8 +63,8 @@ export class RealtimeStorage extends DiskStorage {
}
this.openPages.set(pageName, page);
}
page.clients.add(new Client(socket, page.version));
clientOpenPages.add(pageName);
page.clientStates.add(new ClientPageState(socket, page.version));
socketOpenPages.add(pageName);
console.log("Opened page", pageName);
this.broadcastCursors(page);
return page.toJSON();
@ -150,8 +72,8 @@ export class RealtimeStorage extends DiskStorage {
socket.on("closePage", (pageName: string) => {
console.log("Closing page", pageName);
clientOpenPages.delete(pageName);
this.disconnectPageSocket(socket, pageName);
socketOpenPages.delete(pageName);
disconnectPageSocket(pageName);
});
onCall(
@ -169,13 +91,13 @@ export class RealtimeStorage extends DiskStorage {
pageName,
this.openPages.keys()
);
return;
return false;
}
if (version !== page.version) {
console.error("Invalid version", version, page.version);
return false;
} else {
console.log("Applying", updates.length, "updates");
console.log("Applying", updates.length, "updates to", pageName);
let transformedUpdates = [];
let textChanged = false;
for (let update of updates) {
@ -225,7 +147,7 @@ export class RealtimeStorage extends DiskStorage {
}
// TODO: Optimize this
let oldestVersion = Infinity;
page.clients.forEach((client) => {
page.clientStates.forEach((client) => {
oldestVersion = Math.min(client.version, oldestVersion);
if (client.socket === socket) {
client.version = version;
@ -242,12 +164,139 @@ export class RealtimeStorage extends DiskStorage {
}
);
socket.on("disconnect", () => {
console.log("Disconnected", socket.id);
clientOpenPages.forEach((pageName) => {
this.disconnectPageSocket(socket, pageName);
});
onCall(
"readPage",
async (pageName: string): Promise<{ text: string; meta: PageMeta }> => {
let page = this.openPages.get(pageName);
if (page) {
console.log("Serving page from memory", pageName);
return {
text: page.text.sliceString(0),
meta: page.meta,
};
} else {
return this.pageStore.readPage(pageName);
}
}
);
onCall("writePage", async (pageName: string, text: string) => {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clientStates) {
client.socket.emit("reloadPage", pageName);
}
this.openPages.delete(pageName);
}
return this.pageStore.writePage(pageName, text);
});
onCall("deletePage", async (pageName: string) => {
this.openPages.delete(pageName);
socketOpenPages.delete(pageName);
// Cascading of this to all connected clients will be handled by file watcher
return this.pageStore.deletePage(pageName);
});
onCall("listPages", async (): Promise<PageMeta[]> => {
return this.pageStore.listPages();
});
onCall("getPageMeta", async (pageName: string): Promise<PageMeta> => {
let page = this.openPages.get(pageName);
if (page) {
return page.meta;
}
return this.pageStore.getPageMeta(pageName);
});
});
}
private disconnectClient(client: ClientPageState, page: Page) {
page.clientStates.delete(client);
if (page.clientStates.size === 0) {
console.log("No more clients for", page.name, "flushing");
this.flushPageToDisk(page.name, page);
this.openPages.delete(page.name);
} else {
page.cursors.delete(client.socket.id);
this.broadcastCursors(page);
}
}
private broadcastCursors(page: Page) {
page.clientStates.forEach((client) => {
client.socket.emit(
"cursorSnapshot",
page.name,
Object.fromEntries(page.cursors.entries())
);
});
}
private flushPageToDisk(name: string, page: Page) {
safeRun(async () => {
let meta = await this.pageStore.writePage(name, page.text.sliceString(0));
console.log(`Wrote page ${name} to disk`);
page.meta = meta;
});
}
private fileWatcher(rootPath: string) {
fs.watch(
rootPath,
{
recursive: true,
persistent: false,
},
(eventType, filename) => {
safeRun(async () => {
if (path.extname(filename) !== ".md") {
return;
}
let localPath = path.join(rootPath, filename);
let pageName = filename.substring(0, filename.length - 3);
// console.log("Edit in", pageName, eventType);
let modifiedTime = 0;
try {
let s = await stat(localPath);
modifiedTime = s.mtime.getTime();
} catch (e) {
// File was deleted
console.log("Deleted", pageName);
for (let socket of this.connectedSockets) {
socket.emit("pageDeleted", pageName);
}
return;
}
const openPage = this.openPages.get(pageName);
if (openPage) {
if (openPage.meta.lastModified < modifiedTime) {
console.log("Page changed on disk outside of editor, reloading");
this.openPages.delete(pageName);
const meta = {
name: pageName,
lastModified: modifiedTime,
} as PageMeta;
for (let client of openPage.clientStates) {
client.socket.emit("pageChanged", meta);
}
}
}
if (eventType === "rename") {
// This most likely means a new file was created, let's push new file listings to all connected sockets
console.log(
"New file created, broadcasting to all connected sockets"
);
for (let socket of this.connectedSockets) {
socket.emit("pageCreated", {
name: pageName,
lastModified: modifiedTime,
} as PageMeta);
}
}
});
}
);
}
}

View File

@ -1,12 +1,8 @@
import bodyParser from "body-parser";
import cors from "cors";
import express from "express";
import { readFile } from "fs/promises";
import http from "http";
import { Server } from "socket.io";
import stream from "stream";
import { promisify } from "util";
import { RealtimeStorage } from "./realtime_storage";
import { SocketAPI } from "./api";
const app = express();
const server = http.createServer(app);
@ -18,7 +14,6 @@ const io = new Server(server, {
});
const port = 3000;
const pipeline = promisify(stream.pipeline);
export const pagesPath = "../pages";
const distDir = `${__dirname}/../../webapp/dist`;
@ -29,80 +24,7 @@ export type PageMeta = {
};
app.use("/", express.static(distDir));
let fsRouter = express.Router();
// let diskFS = new DiskFS(pagesPath);
let filesystem = new RealtimeStorage(pagesPath, io);
// Page list
fsRouter.route("/").get(async (req, res) => {
res.json(await filesystem.listPages());
});
fsRouter
.route(/\/(.+)/)
.get(async (req, res) => {
let reqPath = req.params[0];
console.log("Getting", reqPath);
try {
let { text, meta } = await filesystem.readPage(reqPath);
res.status(200);
res.header("Last-Modified", "" + meta.lastModified);
res.header("Content-Type", "text/markdown");
res.send(text);
} catch (e) {
res.status(200);
res.send("");
}
})
.put(bodyParser.text({ type: "*/*" }), async (req, res) => {
let reqPath = req.params[0];
try {
let meta = await filesystem.writePage(reqPath, req.body);
res.status(200);
res.header("Last-Modified", "" + meta.lastModified);
res.send("OK");
} catch (err) {
res.status(500);
res.send("Write failed");
console.error("Pipeline failed", err);
}
})
.options(async (req, res) => {
let reqPath = req.params[0];
try {
const meta = await filesystem.getPageMeta(reqPath);
res.status(200);
res.header("Last-Modified", "" + meta.lastModified);
res.header("Content-Type", "text/markdown");
res.send("");
} catch (e) {
res.status(200);
res.send("");
}
})
.delete(async (req, res) => {
let reqPath = req.params[0];
try {
await filesystem.deletePage(reqPath);
res.status(200);
res.send("OK");
} catch (e) {
console.error("Error deleting file", reqPath, e);
res.status(500);
res.send("OK");
}
});
app.use(
"/fs",
cors({
methods: "GET,HEAD,PUT,OPTIONS,POST,DELETE",
preflightContinue: true,
}),
fsRouter
);
let filesystem = new SocketAPI(pagesPath, io);
// Fallback, serve index.html
let cachedIndex: string | undefined = undefined;
@ -113,7 +35,6 @@ app.get("/*", async (req, res) => {
res.status(200).header("Content-Type", "text/html").send(cachedIndex);
});
//sup
server.listen(port, () => {
console.log(`Server istening on port ${port}`);
});

View File

@ -4,7 +4,7 @@ import { Socket } from "socket.io";
import { Cursor } from "../../webapp/src/cursorEffect";
import { PageMeta } from "./server";
export class Client {
export class ClientPageState {
constructor(public socket: Socket, public version: number) {}
}
@ -12,7 +12,7 @@ export class Page {
versionOffset = 0;
updates: Update[] = [];
cursors = new Map<string, Cursor>();
clients = new Set<Client>();
clientStates = new Set<ClientPageState>();
pending: ((value: any) => void)[] = [];

View File

@ -1,12 +1,12 @@
import { Editor } from "./editor";
import { HttpRemoteSpace } from "./space";
import { RealtimeSpace } from "./space";
import { safeRun } from "./util";
import { io } from "socket.io-client";
let socket = io(`http://${location.hostname}:3000`);
let editor = new Editor(
new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket),
new RealtimeSpace(socket),
document.getElementById("root")!
);

View File

@ -10,8 +10,8 @@ import {
receiveUpdates,
sendableUpdates,
} from "@codemirror/collab";
import { RangeSetBuilder, Range } from "@codemirror/rangeset";
import { EditorState, StateEffect, StateField, Text } from "@codemirror/state";
import { RangeSetBuilder } from "@codemirror/rangeset";
import { Text } from "@codemirror/state";
import {
Decoration,
DecorationSet,
@ -21,7 +21,7 @@ import {
WidgetType,
} from "@codemirror/view";
import { Cursor, cursorEffect } from "./cursorEffect";
import { HttpRemoteSpace } from "./space";
import { RealtimeSpace, SpaceEventHandlers } from "./space";
const throttleInterval = 250;
@ -85,7 +85,7 @@ export function collabExtension(
pageName: string,
clientID: string,
doc: Document,
space: HttpRemoteSpace,
space: RealtimeSpace,
reloadCallback: () => void
) {
let plugin = ViewPlugin.fromClass(
@ -95,7 +95,15 @@ export function collabExtension(
private failedPushes = 0;
decorations: DecorationSet;
private cursorPositions: Map<string, Cursor> = doc.cursors;
throttledPush: () => void;
throttledPush = throttle(() => this.push(), throttleInterval);
eventHandlers: Partial<SpaceEventHandlers> = {
cursorSnapshot: (pageName, cursors) => {
console.log("Received new cursor snapshot", cursors);
this.cursorPositions = new Map(Object.entries(cursors));
},
};
buildDecorations(view: EditorView) {
let builder = new RangeSetBuilder<Decoration>();
@ -128,18 +136,7 @@ export function collabExtension(
this.pull();
}
this.decorations = this.buildDecorations(view);
this.throttledPush = throttle(() => this.push(), throttleInterval);
console.log("Created collabo plug");
space.addEventListener("cursors", this.updateCursors);
}
updateCursors(cursorEvent: any) {
this.cursorPositions = new Map();
console.log("Received new cursor snapshot", cursorEvent.detail, this);
for (let userId in cursorEvent.detail) {
this.cursorPositions.set(userId, cursorEvent.detail[userId]);
}
space.on(this.eventHandlers);
}
update(update: ViewUpdate) {
@ -190,7 +187,7 @@ export function collabExtension(
let success = await space.pushUpdates(pageName, version, updates);
this.pushing = false;
if (!success) {
if (!success && !this.done) {
this.failedPushes++;
if (this.failedPushes > 10) {
// Not sure if 10 is a good number, but YOLO
@ -198,14 +195,16 @@ export function collabExtension(
reloadCallback();
return this.destroy();
}
console.log("Push failed temporarily, but will try again");
console.log(
`Push for page ${pageName} failed temporarily, but will try again`
);
} else {
this.failedPushes = 0;
}
// Regardless of whether the push failed or new updates came in
// while it was running, try again if there's updates remaining
if (sendableUpdates(this.view.state).length) {
if (!this.done && sendableUpdates(this.view.state).length) {
// setTimeout(() => this.push(), 100);
this.throttledPush();
}
@ -236,7 +235,7 @@ export function collabExtension(
destroy() {
this.done = true;
space.removeEventListener("cursors", this.updateCursors);
space.off(this.eventHandlers);
}
},
{
@ -252,7 +251,6 @@ export function collabExtension(
return tr.effects.filter((e) => e.is(cursorEffect));
},
}),
// cursorField,
plugin,
];
}

View File

@ -100,7 +100,7 @@ export function FilterList({
ref={searchBoxRef}
onChange={filter}
onKeyDown={(e: React.KeyboardEvent) => {
console.log("Key up", e.key);
// console.log("Key up", e.key);
if (onKeyPress) {
onKeyPress(e.key, text);
}

View File

@ -7,7 +7,7 @@ export function PageNavigator({
onNavigate,
currentPage,
}: {
allPages: PageMeta[];
allPages: Set<PageMeta>;
onNavigate: (page: string | undefined) => void;
currentPage?: string;
}) {
@ -17,10 +17,10 @@ export function PageNavigator({
continue;
}
// Order by last modified date in descending order
let orderId = -pageMeta.lastModified.getTime();
let orderId = -pageMeta.lastModified;
// Unless it was opened and is still in memory
if (pageMeta.lastOpened) {
orderId = -pageMeta.lastOpened.getTime();
orderId = -pageMeta.lastOpened;
}
options.push({
...pageMeta,

View File

@ -45,7 +45,7 @@ import { slashCommandRegexp } from "./types";
import reducer from "./reducer";
import { smartQuoteKeymap } from "./smart_quotes";
import { HttpRemoteSpace } from "./space";
import { RealtimeSpace } from "./space";
import customMarkdownStyle from "./style";
import dbSyscalls from "./syscalls/db.localstorage";
import editorSyscalls from "./syscalls/editor.browser";
@ -84,7 +84,7 @@ export class Editor implements AppEventDispatcher {
viewState: AppViewState;
viewDispatch: React.Dispatch<Action>;
openPages: Map<string, PageState>;
space: HttpRemoteSpace;
space: RealtimeSpace;
editorCommands: Map<string, AppCommand>;
plugs: Plug<NuggetHook>[];
indexer: Indexer;
@ -92,7 +92,7 @@ export class Editor implements AppEventDispatcher {
pageNavigator: IPageNavigator;
indexCurrentPageDebounced: () => any;
constructor(space: HttpRemoteSpace, parent: Element) {
constructor(space: RealtimeSpace, parent: Element) {
this.editorCommands = new Map();
this.openPages = new Map();
this.plugs = [];
@ -114,7 +114,7 @@ export class Editor implements AppEventDispatcher {
}
async init() {
await this.loadPageList();
// await this.loadPageList();
await this.loadPlugs();
this.focus();
@ -127,8 +127,10 @@ export class Editor implements AppEventDispatcher {
if (this.currentPage) {
let pageState = this.openPages.get(this.currentPage)!;
pageState.selection = this.editorView!.state.selection;
pageState.scrollTop = this.editorView!.scrollDOM.scrollTop;
if (pageState) {
pageState.selection = this.editorView!.state.selection;
pageState.scrollTop = this.editorView!.scrollDOM.scrollTop;
}
this.space.closePage(this.currentPage);
}
@ -136,19 +138,25 @@ export class Editor implements AppEventDispatcher {
await this.loadPage(pageName);
});
this.space.addEventListener("connect", () => {
if (this.currentPage) {
console.log("Connected to socket, fetch fresh?");
this.reloadPage();
}
});
this.space.addEventListener("reload", (e) => {
let pageName = (e as CustomEvent).detail;
if (this.currentPage === pageName) {
console.log("Was told to reload the page");
this.reloadPage();
}
this.space.on({
connect: () => {
if (this.currentPage) {
console.log("Connected to socket, fetch fresh?");
this.reloadPage();
}
},
pageChanged: (meta) => {
if (this.currentPage === meta.name) {
console.log("page changed on disk, reloading");
this.reloadPage();
}
},
pageListUpdated: (pages) => {
this.viewDispatch({
type: "pages-listed",
pages: pages,
});
},
});
if (this.pageNavigator.getCurrentPage() === "") {
@ -411,14 +419,6 @@ export class Editor implements AppEventDispatcher {
}
}
async loadPageList() {
let pagesMeta = await this.space.listPages();
this.viewDispatch({
type: "pages-listed",
pages: pagesMeta,
});
}
focus() {
this.editorView!.focus();
}

View File

@ -9,10 +9,12 @@ export default function reducer(
case "page-loaded":
return {
...state,
allPages: state.allPages.map((pageMeta) =>
pageMeta.name === action.name
? { ...pageMeta, lastOpened: new Date() }
: pageMeta
allPages: new Set(
[...state.allPages].map((pageMeta) =>
pageMeta.name === action.name
? { ...pageMeta, lastOpened: Date.now() }
: pageMeta
)
),
currentPage: action.name,
};

View File

@ -14,27 +14,81 @@ export interface Space {
getPageMeta(name: string): Promise<PageMeta>;
}
export class HttpRemoteSpace extends EventTarget implements Space {
url: string;
export type SpaceEventHandlers = {
connect: () => void;
cursorSnapshot: (
pageName: string,
cursors: { [key: string]: Cursor }
) => void;
pageCreated: (meta: PageMeta) => void;
pageChanged: (meta: PageMeta) => void;
pageDeleted: (name: string) => void;
pageListUpdated: (pages: Set<PageMeta>) => void;
};
abstract class EventEmitter<HandlerT> {
private handlers: Partial<HandlerT>[] = [];
on(handlers: Partial<HandlerT>) {
this.handlers.push(handlers);
}
off(handlers: Partial<HandlerT>) {
this.handlers = this.handlers.filter((h) => h !== handlers);
}
emit(eventName: keyof HandlerT, ...args: any[]) {
for (let handler of this.handlers) {
let fn: any = handler[eventName];
if (fn) {
fn(...args);
}
}
}
}
export class RealtimeSpace
extends EventEmitter<SpaceEventHandlers>
implements Space
{
socket: Socket;
reqId = 0;
allPages = new Set<PageMeta>();
constructor(url: string, socket: Socket) {
constructor(socket: Socket) {
super();
this.url = url;
this.socket = socket;
socket.on("connect", () => {
console.log("connected to socket");
this.dispatchEvent(new Event("connect"));
[
"connect",
"cursorSnapshot",
"pageCreated",
"pageChanged",
"pageDeleted",
].forEach((eventName) => {
socket.on(eventName, (...args) => {
this.emit(eventName as keyof SpaceEventHandlers, ...args);
});
});
socket.on("reload", (pageName: string) => {
this.dispatchEvent(new CustomEvent("reload", { detail: pageName }));
this.wsCall("listPages").then((pages) => {
this.allPages = new Set(pages);
this.emit("pageListUpdated", this.allPages);
});
socket.on("cursors", (cursors) => {
this.dispatchEvent(new CustomEvent("cursors", { detail: cursors }));
this.on({
pageCreated: (meta) => {
this.allPages.add(meta);
console.log("New page created", meta);
this.emit("pageListUpdated", this.allPages);
},
pageDeleted: (name) => {
console.log("Page delete", name);
this.allPages.forEach((meta) => {
if (name === meta.name) {
this.allPages.delete(meta);
}
});
this.emit("pageListUpdated", this.allPages);
},
});
}
@ -76,14 +130,7 @@ export class HttpRemoteSpace extends EventTarget implements Space {
}
async listPages(): Promise<PageMeta[]> {
let req = await fetch(this.url, {
method: "GET",
});
return (await req.json()).map((meta: any) => ({
name: meta.name,
lastModified: new Date(meta.lastModified),
}));
return Array.from(this.allPages);
}
async openPage(name: string): Promise<Document> {
@ -101,47 +148,18 @@ export class HttpRemoteSpace extends EventTarget implements Space {
}
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {
let req = await fetch(`${this.url}/${name}`, {
method: "GET",
});
return {
text: await req.text(),
meta: {
lastModified: new Date(+req.headers.get("Last-Modified")!),
name: name,
},
};
return this.wsCall("readPage", name);
}
async writePage(name: string, text: string): Promise<PageMeta> {
let req = await fetch(`${this.url}/${name}`, {
method: "PUT",
body: text,
});
// 201 (Created) means a new page was created
return {
lastModified: new Date(+req.headers.get("Last-Modified")!),
name: name,
created: req.status === 201,
};
return this.wsCall("writePage", name, text);
}
async deletePage(name: string): Promise<void> {
let req = await fetch(`${this.url}/${name}`, {
method: "DELETE",
});
if (req.status !== 200) {
throw Error(`Failed to delete page: ${req.statusText}`);
}
return this.wsCall("deletePage", name);
}
async getPageMeta(name: string): Promise<PageMeta> {
let req = await fetch(`${this.url}/${name}`, {
method: "OPTIONS",
});
return {
name: name,
lastModified: new Date(+req.headers.get("Last-Modified")!),
};
return this.wsCall("deletePage", name);
}
}

View File

@ -3,10 +3,7 @@ import { PageMeta } from "../types";
export default (editor: Editor) => ({
"space.listPages": (): PageMeta[] => {
return editor.viewState.allPages;
},
"space.reloadPageList": async () => {
await editor.loadPageList();
return [...editor.viewState.allPages];
},
"space.reindex": async () => {
await editor.indexer.reindexSpace(editor.space, editor);

View File

@ -10,10 +10,9 @@ export type Manifest = plugbox.Manifest<NuggetHook>;
export type PageMeta = {
name: string;
lastModified: Date;
lastModified: number;
version?: number;
created?: boolean;
lastOpened?: Date;
lastOpened?: number;
};
export type AppCommand = {
@ -40,20 +39,20 @@ export type AppViewState = {
currentPage?: string;
showPageNavigator: boolean;
showCommandPalette: boolean;
allPages: PageMeta[];
allPages: Set<PageMeta>;
commands: Map<string, AppCommand>;
};
export const initialViewState: AppViewState = {
showPageNavigator: false,
showCommandPalette: false,
allPages: [],
allPages: new Set(),
commands: new Map(),
};
export type Action =
| { type: "page-loaded"; name: string }
| { type: "pages-listed"; pages: PageMeta[] }
| { type: "pages-listed"; pages: Set<PageMeta> }
| { type: "start-navigate" }
| { type: "stop-navigate" }
| { type: "update-commands"; commands: Map<string, AppCommand> }