HUGE refactore, once again.

pull/3/head
Zef Hemel 2022-03-15 14:03:00 +01:00
parent 9d41c9e3d6
commit 97b9d3a3e0
19 changed files with 530 additions and 453 deletions

View File

@ -1,5 +1,5 @@
import { Editor } from "../../webapp/src/editor";
import { HttpRemoteSpace } from "../../webapp/src/space";
import { Space } from "../../webapp/src/space";
declare namespace window {
var ReactNativeWebView: any;
@ -32,17 +32,17 @@ console.error = (...args) => {
);
};
try {
let editor = new Editor(
new HttpRemoteSpace(`http://192.168.2.22:3000/fs`, null),
document.getElementById("root")!
);
console.log("Initing editor");
safeRun(async () => {
await editor.loadPageList();
await editor.loadPlugs();
editor.focus();
console.log("Inited", editor.viewState);
});
// let editor = new Editor(
// new Space(`http://192.168.2.22:3000/fs`, null),
// document.getElementById("root")!
// );
// console.log("Initing editor");
// safeRun(async () => {
// await editor.loadPageList();
// await editor.loadPlugs();
// editor.focus();
// console.log("Inited", editor.viewState);
// });
} catch (e: any) {
console.error("Got an error", e.message);
}

View File

@ -67,7 +67,7 @@ async function run() {
.parse();
let generatedManifest = await bundle(args._[0], !!args.debug);
writeFile(args._[1], JSON.stringify(generatedManifest, null, 2));
await writeFile(args._[1], JSON.stringify(generatedManifest, null, 2));
}
run().catch((e) => {

View File

@ -20,7 +20,6 @@ export class NodeSandbox implements Sandbox {
constructor(readonly system: System<any>, workerScript: string) {
this.worker = new Worker(workerScript);
this.worker.on("message", this.onmessage.bind(this));
}

View File

@ -42,6 +42,6 @@ test("Run a Node sandbox", async () => {
for (let i = 0; i < 100; i++) {
expect(await plug.invoke("addNumbersSyscall", [10, i])).toBe(10 + i);
}
console.log(plug.sandbox);
// console.log(plug.sandbox);
await system.stop();
});

View File

@ -31,7 +31,6 @@ export class Plug<HookT> {
if (!this.sandbox.isLoaded(name)) {
await this.sandbox.load(name, this.manifest!.functions[name].code!);
}
console.log("Loaded", name);
return await this.sandbox.invoke(name, args);
}

View File

@ -1,5 +1,5 @@
import { ControllerMessage, WorkerMessage } from "./types";
import { Plug, Sandbox, System } from "./runtime";
import { Sandbox, System } from "./runtime";
export class WebworkerSandbox implements Sandbox {
private worker: Worker;

View File

@ -1,7 +1,9 @@
.PHONY: core
BUILD=../plugbox/bin/plugbox-bundle.mjs
core: *
core: core/*
${BUILD} --debug core/core.plug.json ../webapp/src/generated/core.plug.json
watch: *
ls -d core/* | entr make
ls -d core/* | entr make

View File

@ -25,13 +25,13 @@ async function navigate(syntaxNode: any) {
}
export async function linkNavigate() {
navigate(await syscall("editor.getSyntaxNodeUnderCursor"));
await navigate(await syscall("editor.getSyntaxNodeUnderCursor"));
}
export async function clickNavigate(event: ClickEvent) {
if (event.ctrlKey || event.metaKey) {
let syntaxNode = await syscall("editor.getSyntaxNodeAtPos", event.pos);
navigate(syntaxNode);
await navigate(syntaxNode);
}
}
@ -48,7 +48,7 @@ export async function pageComplete() {
return {
from: prefix.from,
options: allPages
.filter((page) => page.name.startsWith(prefix.text))
.filter((page: any) => page.name.startsWith(prefix.text))
.map((pageMeta: any) => ({
label: pageMeta.name,
type: "page",

View File

View File

@ -8,17 +8,16 @@ import * as path from "path";
import * as fs from "fs";
describe("Server test", () => {
let io,
let io: Server,
socketServer: SocketServer,
cleaner,
clientSocket,
clientSocket: any,
reqId = 0;
const tmpDir = path.join(__dirname, "test");
function wsCall(eventName: string, ...args: any[]): Promise<any> {
return new Promise((resolve, reject) => {
reqId++;
clientSocket.once(`${eventName}Resp${reqId}`, (err, result) => {
clientSocket.once(`${eventName}Resp${reqId}`, (err: any, result: any) => {
if (err) {
reject(err);
} else {
@ -41,6 +40,7 @@ describe("Server test", () => {
clientSocket = new Client(`http://localhost:${port}`);
socketServer = new SocketServer(tmpDir, io);
clientSocket.on("connect", done);
await socketServer.init();
});
});
@ -52,40 +52,45 @@ describe("Server test", () => {
});
test("List pages", async () => {
let pages = await wsCall("listPages");
console.log(pages);
let pages = await wsCall("page.listPages");
expect(pages.length).toBe(1);
await wsCall("page.writePage", "test2.md", "This is another test");
let pages2 = await wsCall("page.listPages");
expect(pages2.length).toBe(2);
await wsCall("page.deletePage", "test2.md");
let pages3 = await wsCall("page.listPages");
expect(pages3.length).toBe(1);
});
test("Index operations", async () => {
await wsCall("index:clearPageIndexForPage", "test");
await wsCall("index:set", "test", "testkey", "value");
expect(await wsCall("index:get", "test", "testkey")).toBe("value");
await wsCall("index:delete", "test", "testkey");
expect(await wsCall("index:get", "test", "testkey")).toBe(null);
await wsCall("index:set", "test", "unrelated", 10);
await wsCall("index:set", "test", "unrelated", 12);
await wsCall("index:set", "test2", "complicated", {
await wsCall("index.clearPageIndexForPage", "test");
await wsCall("index.set", "test", "testkey", "value");
expect(await wsCall("index.get", "test", "testkey")).toBe("value");
await wsCall("index.delete", "test", "testkey");
expect(await wsCall("index.get", "test", "testkey")).toBe(null);
await wsCall("index.set", "test", "unrelated", 10);
await wsCall("index.set", "test", "unrelated", 12);
await wsCall("index.set", "test2", "complicated", {
name: "Bla",
age: 123123,
});
await wsCall("index:set", "test", "complicated", { name: "Bla", age: 100 });
await wsCall("index:set", "test", "complicated2", {
await wsCall("index.set", "test", "complicated", { name: "Bla", age: 100 });
await wsCall("index.set", "test", "complicated2", {
name: "Bla",
age: 101,
});
expect(await wsCall("index:get", "test", "complicated")).toStrictEqual({
expect(await wsCall("index.get", "test", "complicated")).toStrictEqual({
name: "Bla",
age: 100,
});
let result = await wsCall("index:scanPrefixForPage", "test", "compli");
let result = await wsCall("index.scanPrefixForPage", "test", "compli");
expect(result.length).toBe(2);
let result2 = await wsCall("index:scanPrefixGlobal", "compli");
let result2 = await wsCall("index.scanPrefixGlobal", "compli");
expect(result2.length).toBe(3);
await wsCall("index:deletePrefixForPage", "test", "compli");
let result3 = await wsCall("index:scanPrefixForPage", "test", "compli");
await wsCall("index.deletePrefixForPage", "test", "compli");
let result3 = await wsCall("index.scanPrefixForPage", "test", "compli");
expect(result3.length).toBe(0);
let result4 = await wsCall("index:scanPrefixGlobal", "compli");
let result4 = await wsCall("index.scanPrefixGlobal", "compli");
expect(result4.length).toBe(1);
});
});

View File

@ -1,302 +1,45 @@
import { stat } from "fs/promises";
import { ChangeSet } from "@codemirror/state";
import { Update } from "@codemirror/collab";
import { Server } from "socket.io";
import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect";
import { Socket } from "socket.io";
import { DiskStorage } from "./disk_storage";
import { ClientPageState, Page, PageMeta } from "./types";
import { safeRun } from "./util";
import * as fs from "fs";
import { Server, Socket } from "socket.io";
import { Page } from "./types";
import * as path from "path";
import knex, { Knex } from "knex";
import { IndexApi } from "./index_api";
import { PageApi } from "./page_api";
type IndexItem = {
page: string;
key: string;
value: any;
};
class ClientConnection {
export class ClientConnection {
openPages = new Set<string>();
constructor(readonly sock: Socket) {}
}
export interface ApiProvider {
init(): Promise<void>;
api(): Object;
}
export class SocketServer {
rootPath: string;
serverSock: Server;
openPages = new Map<string, Page>();
connectedSockets = new Set<Socket>();
pageStore: DiskStorage;
db: Knex;
serverSocket: Server;
private apis = new Map<string, ApiProvider>();
api = {
openPage: async (clientConn: ClientConnection, pageName: string) => {
let page = this.openPages.get(pageName);
if (!page) {
try {
let { text, meta } = await this.pageStore.readPage(pageName);
page = new Page(pageName, text, meta);
} catch (e) {
console.log("Creating new page", pageName);
page = new Page(pageName, "", { name: pageName, lastModified: 0 });
}
this.openPages.set(pageName, page);
}
page.clientStates.add(new ClientPageState(clientConn.sock, page.version));
clientConn.openPages.add(pageName);
console.log("Opened page", pageName);
this.broadcastCursors(page);
return page.toJSON();
},
pushUpdates: async (
clientConn: ClientConnection,
pageName: string,
version: number,
updates: any[]
): Promise<boolean> => {
let page = this.openPages.get(pageName);
if (!page) {
console.error(
"Received updates for not open page",
pageName,
this.openPages.keys()
);
return false;
}
if (version !== page.version) {
console.error("Invalid version", version, page.version);
return false;
} else {
console.log("Applying", updates.length, "updates to", pageName);
let transformedUpdates = [];
let textChanged = false;
for (let update of updates) {
let changes = ChangeSet.fromJSON(update.changes);
let transformedUpdate = {
changes,
clientID: update.clientID,
effects: update.cursors?.map((c: Cursor) => {
page.cursors.set(c.userId, c);
return cursorEffect.of(c);
}),
};
page.updates.push(transformedUpdate);
transformedUpdates.push(transformedUpdate);
let oldText = page.text;
page.text = changes.apply(page.text);
if (oldText !== page.text) {
textChanged = true;
}
}
if (textChanged) {
if (page.saveTimer) {
clearTimeout(page.saveTimer);
}
page.saveTimer = setTimeout(() => {
this.flushPageToDisk(pageName, page);
}, 1000);
}
while (page.pending.length) {
page.pending.pop()!(transformedUpdates);
}
return true;
}
},
pullUpdates: async (
clientConn: ClientConnection,
pageName: string,
version: number
): Promise<Update[]> => {
let page = this.openPages.get(pageName);
// console.log("Pulling updates for", pageName);
if (!page) {
console.error("Fetching updates for not open page");
return [];
}
// TODO: Optimize this
let oldestVersion = Infinity;
page.clientStates.forEach((client) => {
oldestVersion = Math.min(client.version, oldestVersion);
if (client.socket === clientConn.sock) {
client.version = version;
}
});
page.flushUpdates(oldestVersion);
if (version < page.version) {
return page.updatesSince(version);
} else {
return new Promise((resolve) => {
page.pending.push(resolve);
});
}
},
readPage: async (
clientConn: ClientConnection,
pageName: string
): Promise<{ text: string; meta: PageMeta }> => {
let page = this.openPages.get(pageName);
if (page) {
console.log("Serving page from memory", pageName);
return {
text: page.text.sliceString(0),
meta: page.meta,
};
} else {
return this.pageStore.readPage(pageName);
}
},
writePage: async (
clientConn: ClientConnection,
pageName: string,
text: string
) => {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clientStates) {
client.socket.emit("reloadPage", pageName);
}
this.openPages.delete(pageName);
}
return this.pageStore.writePage(pageName, text);
},
deletePage: async (clientConn: ClientConnection, pageName: string) => {
this.openPages.delete(pageName);
clientConn.openPages.delete(pageName);
// Cascading of this to all connected clients will be handled by file watcher
return this.pageStore.deletePage(pageName);
},
listPages: async (clientConn: ClientConnection): Promise<PageMeta[]> => {
return this.pageStore.listPages();
},
getPageMeta: async (
clientConn: ClientConnection,
pageName: string
): Promise<PageMeta> => {
let page = this.openPages.get(pageName);
if (page) {
return page.meta;
}
return this.pageStore.getPageMeta(pageName);
},
"index:clearPageIndexForPage": async (
clientConn: ClientConnection,
page: string
) => {
await this.db<IndexItem>("page_index").where({ page }).del();
},
"index:set": async (
clientConn: ClientConnection,
page: string,
key: string,
value: any
) => {
let changed = await this.db<IndexItem>("page_index")
.where({ page, key })
.update("value", JSON.stringify(value));
if (changed === 0) {
await this.db<IndexItem>("page_index").insert({
page,
key,
value: JSON.stringify(value),
});
}
},
"index:get": async (
clientConn: ClientConnection,
page: string,
key: string
) => {
let result = await this.db<IndexItem>("page_index")
.where({ page, key })
.select("value");
if (result.length) {
return JSON.parse(result[0].value);
} else {
return null;
}
},
"index:delete": async (
clientConn: ClientConnection,
page: string,
key: string
) => {
await this.db<IndexItem>("page_index").where({ page, key }).del();
},
"index:scanPrefixForPage": async (
clientConn: ClientConnection,
page: string,
prefix: string
) => {
return (
await this.db<IndexItem>("page_index")
.where({ page })
.andWhereLike("key", `${prefix}%`)
.select("page", "key", "value")
).map(({ page, key, value }) => ({
page,
key,
value: JSON.parse(value),
}));
},
"index:scanPrefixGlobal": async (
clientConn: ClientConnection,
prefix: string
) => {
return (
await this.db<IndexItem>("page_index")
.andWhereLike("key", `${prefix}%`)
.select("page", "key", "value")
).map(({ page, key, value }) => ({
page,
key,
value: JSON.parse(value),
}));
},
"index:deletePrefixForPage": async (
clientConn: ClientConnection,
page: string,
prefix: string
) => {
return await this.db<IndexItem>("page_index")
.where({ page })
.andWhereLike("key", `${prefix}%`)
.del();
},
"index:clearPageIndex": async (clientConn: ClientConnection) => {
return await this.db<IndexItem>("page_index").del();
},
};
async registerApi(name: string, apiProvider: ApiProvider) {
await apiProvider.init();
this.apis.set(name, apiProvider);
}
constructor(rootPath: string, serverSocket: Server) {
this.rootPath = path.resolve(rootPath);
this.serverSocket = serverSocket;
this.pageStore = new DiskStorage(this.rootPath);
}
this.db = knex({
client: "better-sqlite3",
connection: {
filename: path.join(rootPath, "data.db"),
},
useNullAsDefault: true,
});
this.initDb();
public async init() {
await this.registerApi("index", new IndexApi(this.rootPath));
await this.registerApi(
"page",
new PageApi(this.rootPath, this.connectedSockets)
);
serverSocket.on("connection", (socket) => {
this.serverSocket.on("connection", (socket) => {
const clientConn = new ClientConnection(socket);
// const socketOpenPages = new Set<string>();
console.log("Connected", socket.id);
this.connectedSockets.add(socket);
@ -333,121 +76,26 @@ export class SocketServer {
if (page) {
for (let client of page.clientStates) {
if (client.socket === socket) {
this.disconnectClient(client, page);
(this.apis.get("page")! as PageApi).disconnectClient(
client,
page
);
}
}
}
};
Object.entries(this.api).forEach(([eventName, cb]) => {
onCall(eventName, (...args: any[]): any => {
return cb.call(this, clientConn, ...args);
});
});
});
}
async initDb() {
if (!(await this.db.schema.hasTable("page_index"))) {
await this.db.schema.createTable("page_index", (table) => {
table.string("page");
table.string("key");
table.text("value");
table.primary(["page", "key"]);
});
console.log("Created table page_index");
}
}
disconnectClient(client: ClientPageState, page: Page) {
page.clientStates.delete(client);
if (page.clientStates.size === 0) {
console.log("No more clients for", page.name, "flushing");
this.flushPageToDisk(page.name, page);
this.openPages.delete(page.name);
} else {
page.cursors.delete(client.socket.id);
this.broadcastCursors(page);
}
}
broadcastCursors(page: Page) {
page.clientStates.forEach((client) => {
client.socket.emit(
"cursorSnapshot",
page.name,
Object.fromEntries(page.cursors.entries())
);
});
}
flushPageToDisk(name: string, page: Page) {
safeRun(async () => {
let meta = await this.pageStore.writePage(name, page.text.sliceString(0));
console.log(`Wrote page ${name} to disk`);
page.meta = meta;
});
}
fileWatcher() {
fs.watch(
this.rootPath,
{
recursive: true,
persistent: false,
},
(eventType, filename) => {
safeRun(async () => {
if (!filename.endsWith(".md")) {
return;
}
let localPath = path.join(this.rootPath, filename);
let pageName = filename.substring(0, filename.length - 3);
// console.log("Edit in", pageName, eventType);
let modifiedTime = 0;
try {
let s = await stat(localPath);
modifiedTime = s.mtime.getTime();
} catch (e) {
// File was deleted
console.log("Deleted", pageName);
for (let socket of this.connectedSockets) {
socket.emit("pageDeleted", pageName);
}
return;
}
const openPage = this.openPages.get(pageName);
if (openPage) {
if (openPage.meta.lastModified < modifiedTime) {
console.log("Page changed on disk outside of editor, reloading");
this.openPages.delete(pageName);
const meta = {
name: pageName,
lastModified: modifiedTime,
} as PageMeta;
for (let client of openPage.clientStates) {
client.socket.emit("pageChanged", meta);
}
}
}
if (eventType === "rename") {
// This most likely means a new file was created, let's push new file listings to all connected sockets
console.log(
"New file created, broadcasting to all connected sockets",
pageName
);
for (let socket of this.connectedSockets) {
socket.emit("pageCreated", {
name: pageName,
lastModified: modifiedTime,
} as PageMeta);
}
}
for (let [apiName, apiProvider] of this.apis) {
Object.entries(apiProvider.api()).forEach(([eventName, cb]) => {
onCall(`${apiName}.${eventName}`, (...args: any[]): any => {
// @ts-ignore
return cb(clientConn, ...args);
});
});
}
);
});
}
close() {
this.db.destroy();
(this.apis.get("index")! as IndexApi).db.destroy();
}
}

123
server/src/index_api.ts Normal file
View File

@ -0,0 +1,123 @@
import { ApiProvider, ClientConnection } from "./api";
import knex, { Knex } from "knex";
import path from "path";
type IndexItem = {
page: string;
key: string;
value: any;
};
export class IndexApi implements ApiProvider {
db: Knex;
constructor(rootPath: string) {
this.db = knex({
client: "better-sqlite3",
connection: {
filename: path.join(rootPath, "data.db"),
},
useNullAsDefault: true,
});
}
async init() {
if (!(await this.db.schema.hasTable("page_index"))) {
await this.db.schema.createTable("page_index", (table) => {
table.string("page");
table.string("key");
table.text("value");
table.primary(["page", "key"]);
});
console.log("Created table page_index");
}
}
api() {
return {
clearPageIndexForPage: async (
clientConn: ClientConnection,
page: string
) => {
await this.db<IndexItem>("page_index").where({ page }).del();
},
set: async (
clientConn: ClientConnection,
page: string,
key: string,
value: any
) => {
let changed = await this.db<IndexItem>("page_index")
.where({ page, key })
.update("value", JSON.stringify(value));
if (changed === 0) {
await this.db<IndexItem>("page_index").insert({
page,
key,
value: JSON.stringify(value),
});
}
},
get: async (clientConn: ClientConnection, page: string, key: string) => {
let result = await this.db<IndexItem>("page_index")
.where({ page, key })
.select("value");
if (result.length) {
return JSON.parse(result[0].value);
} else {
return null;
}
},
delete: async (
clientConn: ClientConnection,
page: string,
key: string
) => {
await this.db<IndexItem>("page_index").where({ page, key }).del();
},
scanPrefixForPage: async (
clientConn: ClientConnection,
page: string,
prefix: string
) => {
return (
await this.db<IndexItem>("page_index")
.where({ page })
.andWhereLike("key", `${prefix}%`)
.select("page", "key", "value")
).map(({ page, key, value }) => ({
page,
key,
value: JSON.parse(value),
}));
},
scanPrefixGlobal: async (
clientConn: ClientConnection,
prefix: string
) => {
return (
await this.db<IndexItem>("page_index")
.andWhereLike("key", `${prefix}%`)
.select("page", "key", "value")
).map(({ page, key, value }) => ({
page,
key,
value: JSON.parse(value),
}));
},
deletePrefixForPage: async (
clientConn: ClientConnection,
page: string,
prefix: string
) => {
return this.db<IndexItem>("page_index")
.where({ page })
.andWhereLike("key", `${prefix}%`)
.del();
},
clearPageIndex: async (clientConn: ClientConnection) => {
return this.db<IndexItem>("page_index").del();
},
};
}
}

281
server/src/page_api.ts Normal file
View File

@ -0,0 +1,281 @@
import { ClientPageState, Page, PageMeta } from "./types";
import { ChangeSet } from "@codemirror/state";
import { Update } from "@codemirror/collab";
import { ApiProvider, ClientConnection } from "./api";
import { Socket } from "socket.io";
import { DiskStorage } from "./disk_storage";
import { safeRun } from "./util";
import fs from "fs";
import path from "path";
import { stat } from "fs/promises";
import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect";
export class PageApi implements ApiProvider {
openPages = new Map<string, Page>();
pageStore: DiskStorage;
rootPath: string;
connectedSockets: Set<Socket>;
constructor(rootPath: string, connectedSockets: Set<Socket>) {
this.pageStore = new DiskStorage(rootPath);
this.rootPath = rootPath;
this.connectedSockets = connectedSockets;
}
async init(): Promise<void> {
return;
}
broadcastCursors(page: Page) {
page.clientStates.forEach((client) => {
client.socket.emit(
"cursorSnapshot",
page.name,
Object.fromEntries(page.cursors.entries())
);
});
}
flushPageToDisk(name: string, page: Page) {
safeRun(async () => {
let meta = await this.pageStore.writePage(name, page.text.sliceString(0));
console.log(`Wrote page ${name} to disk`);
page.meta = meta;
});
}
disconnectClient(client: ClientPageState, page: Page) {
page.clientStates.delete(client);
if (page.clientStates.size === 0) {
console.log("No more clients for", page.name, "flushing");
this.flushPageToDisk(page.name, page);
this.openPages.delete(page.name);
} else {
page.cursors.delete(client.socket.id);
this.broadcastCursors(page);
}
}
fileWatcher() {
fs.watch(
this.rootPath,
{
recursive: true,
persistent: false,
},
(eventType, filename) => {
safeRun(async () => {
if (!filename.endsWith(".md")) {
return;
}
let localPath = path.join(this.rootPath, filename);
let pageName = filename.substring(0, filename.length - 3);
// console.log("Edit in", pageName, eventType);
let modifiedTime = 0;
try {
let s = await stat(localPath);
modifiedTime = s.mtime.getTime();
} catch (e) {
// File was deleted
console.log("Deleted", pageName);
for (let socket of this.connectedSockets) {
socket.emit("pageDeleted", pageName);
}
return;
}
const openPage = this.openPages.get(pageName);
if (openPage) {
if (openPage.meta.lastModified < modifiedTime) {
console.log("Page changed on disk outside of editor, reloading");
this.openPages.delete(pageName);
const meta = {
name: pageName,
lastModified: modifiedTime,
} as PageMeta;
for (let client of openPage.clientStates) {
client.socket.emit("pageChanged", meta);
}
}
}
if (eventType === "rename") {
// This most likely means a new file was created, let's push new file listings to all connected sockets
console.log(
"New file created, broadcasting to all connected sockets",
pageName
);
for (let socket of this.connectedSockets) {
socket.emit("pageCreated", {
name: pageName,
lastModified: modifiedTime,
} as PageMeta);
}
}
});
}
);
}
api() {
return {
openPage: async (clientConn: ClientConnection, pageName: string) => {
let page = this.openPages.get(pageName);
if (!page) {
try {
let { text, meta } = await this.pageStore.readPage(pageName);
page = new Page(pageName, text, meta);
} catch (e) {
console.log("Creating new page", pageName);
page = new Page(pageName, "", { name: pageName, lastModified: 0 });
}
this.openPages.set(pageName, page);
}
page.clientStates.add(
new ClientPageState(clientConn.sock, page.version)
);
clientConn.openPages.add(pageName);
console.log("Opened page", pageName);
this.broadcastCursors(page);
return page.toJSON();
},
pushUpdates: async (
clientConn: ClientConnection,
pageName: string,
version: number,
updates: any[]
): Promise<boolean> => {
let page = this.openPages.get(pageName);
if (!page) {
console.error(
"Received updates for not open page",
pageName,
this.openPages.keys()
);
return false;
}
if (version !== page.version) {
console.error("Invalid version", version, page.version);
return false;
} else {
console.log("Applying", updates.length, "updates to", pageName);
let transformedUpdates = [];
let textChanged = false;
for (let update of updates) {
let changes = ChangeSet.fromJSON(update.changes);
let transformedUpdate = {
changes,
clientID: update.clientID,
effects: update.cursors?.map((c: Cursor) => {
page!.cursors.set(c.userId, c);
return cursorEffect.of(c);
}),
};
page.updates.push(transformedUpdate);
transformedUpdates.push(transformedUpdate);
let oldText = page.text;
page.text = changes.apply(page.text);
if (oldText !== page.text) {
textChanged = true;
}
}
if (textChanged) {
if (page.saveTimer) {
clearTimeout(page.saveTimer);
}
page.saveTimer = setTimeout(() => {
this.flushPageToDisk(pageName, page!);
}, 1000);
}
while (page.pending.length) {
page.pending.pop()!(transformedUpdates);
}
return true;
}
},
pullUpdates: async (
clientConn: ClientConnection,
pageName: string,
version: number
): Promise<Update[]> => {
let page = this.openPages.get(pageName);
// console.log("Pulling updates for", pageName);
if (!page) {
console.error("Fetching updates for not open page");
return [];
}
// TODO: Optimize this
let oldestVersion = Infinity;
page.clientStates.forEach((client) => {
oldestVersion = Math.min(client.version, oldestVersion);
if (client.socket === clientConn.sock) {
client.version = version;
}
});
page.flushUpdates(oldestVersion);
if (version < page.version) {
return page.updatesSince(version);
} else {
return new Promise((resolve) => {
page!.pending.push(resolve);
});
}
},
readPage: async (
clientConn: ClientConnection,
pageName: string
): Promise<{ text: string; meta: PageMeta }> => {
let page = this.openPages.get(pageName);
if (page) {
console.log("Serving page from memory", pageName);
return {
text: page.text.sliceString(0),
meta: page.meta,
};
} else {
return this.pageStore.readPage(pageName);
}
},
writePage: async (
clientConn: ClientConnection,
pageName: string,
text: string
) => {
let page = this.openPages.get(pageName);
if (page) {
for (let client of page.clientStates) {
client.socket.emit("reloadPage", pageName);
}
this.openPages.delete(pageName);
}
return this.pageStore.writePage(pageName, text);
},
deletePage: async (clientConn: ClientConnection, pageName: string) => {
this.openPages.delete(pageName);
clientConn.openPages.delete(pageName);
// Cascading of this to all connected clients will be handled by file watcher
return this.pageStore.deletePage(pageName);
},
listPages: async (clientConn: ClientConnection): Promise<PageMeta[]> => {
return this.pageStore.listPages();
},
getPageMeta: async (
clientConn: ClientConnection,
pageName: string
): Promise<PageMeta> => {
let page = this.openPages.get(pageName);
if (page) {
return page.meta;
}
return this.pageStore.getPageMeta(pageName);
},
};
}
}

View File

@ -30,6 +30,7 @@ const distDir = `${__dirname}/../../webapp/dist`;
app.use("/", express.static(distDir));
let socketServer = new SocketServer(args._[0] as string, io);
socketServer.init();
// Fallback, serve index.html
let cachedIndex: string | undefined = undefined;

10
tsconfig.json Normal file
View File

@ -0,0 +1,10 @@
{
"compilerOptions": {
"target": "esnext",
"strict": true,
"moduleResolution": "node",
"module": "esnext",
"esModuleInterop": true,
"allowSyntheticDefaultImports": true
}
}

View File

@ -28,6 +28,7 @@
"os-browserify": "^0.3.0",
"parcel": "^2.3.2",
"path-browserify": "^1.0.1",
"prettier": "^2.5.1",
"querystring-es3": "^0.2.1",
"stream-browserify": "^3.0.0",
"tty-browserify": "^0.0.1",

View File

@ -392,7 +392,7 @@ export class Editor implements AppEventDispatcher {
},
});
safeRun(async () => {
def.run(null);
await def.run(null);
});
},
});
@ -445,7 +445,7 @@ export class Editor implements AppEventDispatcher {
pageState.scrollTop = this.editorView!.scrollDOM.scrollTop;
}
this.space.closePage(this.currentPage);
await this.space.closePage(this.currentPage);
}
// Fetch next page to open

View File

@ -1,10 +1,10 @@
import { PageMeta } from "./types";
import { Socket } from "socket.io-client";
import { Update } from "@codemirror/collab";
import { Transaction, Text, ChangeSet } from "@codemirror/state";
import { ChangeSet, Text, Transaction } from "@codemirror/state";
import { CollabEvents, CollabDocument } from "./collab";
import { Cursor, cursorEffect } from "./cursorEffect";
import { CollabDocument, CollabEvents } from "./collab";
import { cursorEffect } from "./cursorEffect";
import { EventEmitter } from "./event";
export type SpaceEvents = {
@ -40,7 +40,7 @@ export class Space extends EventEmitter<SpaceEvents> {
this.emit(eventName as keyof SpaceEvents, ...args);
});
});
this.wsCall("listPages").then((pages) => {
this.wsCall("page.listPages").then((pages) => {
this.allPages = new Set(pages);
this.emit("pageListUpdated", this.allPages);
});
@ -87,7 +87,7 @@ export class Space extends EventEmitter<SpaceEvents> {
changes: u.changes.toJSON(),
cursors: u.effects?.map((e) => e.value),
}));
return this.wsCall("pushUpdates", pageName, version, updates);
return this.wsCall("page.pushUpdates", pageName, version, updates);
}
return false;
}
@ -96,13 +96,16 @@ export class Space extends EventEmitter<SpaceEvents> {
pageName: string,
version: number
): Promise<readonly Update[]> {
let updates: Update[] = await this.wsCall("pullUpdates", pageName, version);
let ups = updates.map((u) => ({
let updates: Update[] = await this.wsCall(
"page.pullUpdates",
pageName,
version
);
return updates.map((u) => ({
changes: ChangeSet.fromJSON(u.changes),
effects: u.effects?.map((e) => cursorEffect.of(e.value)),
clientID: u.clientID,
}));
return ups;
}
async listPages(): Promise<PageMeta[]> {
@ -111,7 +114,7 @@ export class Space extends EventEmitter<SpaceEvents> {
async openPage(name: string): Promise<CollabDocument> {
this.reqId++;
let pageJSON = await this.wsCall("openPage", name);
let pageJSON = await this.wsCall("page.openPage", name);
return new CollabDocument(
Text.of(pageJSON.text),
@ -121,27 +124,27 @@ export class Space extends EventEmitter<SpaceEvents> {
}
async closePage(name: string): Promise<void> {
this.socket.emit("closePage", name);
this.socket.emit("page.closePage", name);
}
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {
return this.wsCall("readPage", name);
return this.wsCall("page.readPage", name);
}
async writePage(name: string, text: string): Promise<PageMeta> {
return this.wsCall("writePage", name, text);
return this.wsCall("page.writePage", name, text);
}
async deletePage(name: string): Promise<void> {
return this.wsCall("deletePage", name);
return this.wsCall("page.deletePage", name);
}
async getPageMeta(name: string): Promise<PageMeta> {
return this.wsCall("deletePage", name);
return this.wsCall("page.getPageMeta", name);
}
async indexSet(pageName: string, key: string, value: any) {
await this.wsCall("index:set", pageName, key, value);
await this.wsCall("index.set", pageName, key, value);
}
async indexBatchSet(pageName: string, kvs: KV[]) {
@ -152,27 +155,27 @@ export class Space extends EventEmitter<SpaceEvents> {
}
async indexGet(pageName: string, key: string): Promise<any | null> {
return await this.wsCall("index:get", pageName, key);
return await this.wsCall("index.get", pageName, key);
}
async indexScanPrefixForPage(
pageName: string,
keyPrefix: string
): Promise<{ key: string; value: any }[]> {
return await this.wsCall("index:scanPrefixForPage", pageName, keyPrefix);
return await this.wsCall("index.scanPrefixForPage", pageName, keyPrefix);
}
async indexScanPrefixGlobal(
keyPrefix: string
): Promise<{ key: string; value: any }[]> {
return await this.wsCall("index:scanPrefixGlobal", keyPrefix);
return await this.wsCall("index.scanPrefixGlobal", keyPrefix);
}
async indexDeletePrefixForPage(pageName: string, keyPrefix: string) {
await this.wsCall("index:deletePrefixForPage", keyPrefix);
await this.wsCall("index.deletePrefixForPage", keyPrefix);
}
async indexDelete(pageName: string, key: string) {
await this.wsCall("index:delete", pageName, key);
await this.wsCall("index.delete", pageName, key);
}
}

View File

@ -2587,6 +2587,11 @@ posthtml@^0.16.4, posthtml@^0.16.5:
posthtml-parser "^0.10.0"
posthtml-render "^3.0.0"
prettier@^2.5.1:
version "2.5.1"
resolved "https://registry.yarnpkg.com/prettier/-/prettier-2.5.1.tgz#fff75fa9d519c54cf0fce328c1017d94546bc56a"
integrity sha512-vBZcPRUR5MZJwoyi3ZoyQlc1rXeEck8KgeC9AwwOn+exuxLxq5toTRDTSaVrXHxelDMHy9zlicw8u66yxoSUFg==
pretty-format@^27.0.0, pretty-format@^27.5.1:
version "27.5.1"
resolved "https://registry.yarnpkg.com/pretty-format/-/pretty-format-27.5.1.tgz#2181879fdea51a7a5851fb39d920faa63f01d88e"