From b644801e7bed90e9273342481097489172b498cd Mon Sep 17 00:00:00 2001 From: Zef Hemel Date: Thu, 11 Jan 2024 20:59:11 +0100 Subject: [PATCH] Optimizations --- cli/plug_run.test.ts | 43 ------- cli/plug_run.ts | 69 ----------- cli/plug_test.ts | 7 -- cli/test.plug.yaml | 6 - cmd/plug_run.ts | 40 ------- common/space_index.ts | 6 +- plugos/hooks/mq.ts | 11 +- plugos/lib/mq.datastore.ts | 7 +- plugos/syscalls/datastore.ts | 4 +- plugos/syscalls/transport.ts | 20 ---- plugs/index/api.ts | 24 ++-- plugs/index/builtins.ts | 33 +++--- server/http_server.ts | 56 ++------- server/instance.ts | 61 ++-------- server/rpc.ts | 38 ++++++ server/server_system.ts | 198 -------------------------------- silverbullet.ts | 10 -- web/client.ts | 49 ++++---- web/client_system.ts | 14 +-- web/remote_datastore.ts | 29 ++--- web/syscalls/datastore.proxy.ts | 101 +++++++++++++--- web/syscalls/system.ts | 1 - web/syscalls/util.ts | 36 ------ 23 files changed, 237 insertions(+), 626 deletions(-) delete mode 100644 cli/plug_run.test.ts delete mode 100644 cli/plug_run.ts delete mode 100644 cli/plug_test.ts delete mode 100644 cli/test.plug.yaml delete mode 100644 cmd/plug_run.ts delete mode 100644 plugos/syscalls/transport.ts delete mode 100644 server/server_system.ts delete mode 100644 web/syscalls/util.ts diff --git a/cli/plug_run.test.ts b/cli/plug_run.test.ts deleted file mode 100644 index 1d14ef90..00000000 --- a/cli/plug_run.test.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; -import { compileManifest } from "../plugos/compile.ts"; -import { esbuild } from "../plugos/deps.ts"; -import { runPlug } from "./plug_run.ts"; -import assets from "../dist/plug_asset_bundle.json" assert { - type: "json", -}; -import { assertEquals } from "../test_deps.ts"; -import { path } from "../common/deps.ts"; - -Deno.test("Test plug run", { - sanitizeResources: false, - sanitizeOps: false, -}, async () => { - // const tempDir = await Deno.makeTempDir(); - const tempDbFile = await Deno.makeTempFile({ suffix: ".db" }); - - const assetBundle = new AssetBundle(assets); - - const testFolder = path.dirname(new URL(import.meta.url).pathname); - const testSpaceFolder = path.join(testFolder, "test_space"); - - const plugFolder = path.join(testSpaceFolder, "_plug"); - await Deno.mkdir(plugFolder, { recursive: true }); - - await compileManifest( - path.join(testFolder, "test.plug.yaml"), - plugFolder, - ); - assertEquals( - await runPlug( - testSpaceFolder, - "test.run", - [], - assetBundle, - ), - "Hello", - ); - - // await Deno.remove(tempDir, { recursive: true }); - esbuild.stop(); - await Deno.remove(tempDbFile); -}); diff --git a/cli/plug_run.ts b/cli/plug_run.ts deleted file mode 100644 index efe4cddb..00000000 --- a/cli/plug_run.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { DiskSpacePrimitives } from "../common/spaces/disk_space_primitives.ts"; -import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; - -import { Application } from "../server/deps.ts"; -import { sleep } from "$sb/lib/async.ts"; -import { ServerSystem } from "../server/server_system.ts"; -import { AssetBundlePlugSpacePrimitives } from "../common/spaces/asset_bundle_space_primitives.ts"; -import { determineDatabaseBackend } from "../server/db_backend.ts"; -import { EndpointHook } from "../plugos/hooks/endpoint.ts"; -import { determineShellBackend } from "../server/shell_backend.ts"; - -export async function runPlug( - spacePath: string, - functionName: string | undefined, - args: string[] = [], - builtinAssetBundle: AssetBundle, - httpServerPort = 3123, - httpHostname = "127.0.0.1", -) { - const serverController = new AbortController(); - const app = new Application(); - - const dbBackend = await determineDatabaseBackend(spacePath); - - if (!dbBackend) { - console.error("Cannot run plugs in databaseless mode."); - return; - } - - const endpointHook = new EndpointHook("/_/"); - - const serverSystem = new ServerSystem( - new AssetBundlePlugSpacePrimitives( - new DiskSpacePrimitives(spacePath), - builtinAssetBundle, - ), - dbBackend, - determineShellBackend(spacePath), - ); - await serverSystem.init(true); - app.use((context, next) => { - return endpointHook.handleRequest(serverSystem.system!, context, next); - }); - - app.listen({ - hostname: httpHostname, - port: httpServerPort, - signal: serverController.signal, - }); - - if (functionName) { - const [plugName, funcName] = functionName.split("."); - - const plug = serverSystem.system.loadedPlugs.get(plugName); - if (!plug) { - throw new Error(`Plug ${plugName} not found`); - } - const result = await plug.invoke(funcName, args); - await serverSystem.close(); - serverSystem.kvPrimitives.close(); - serverController.abort(); - return result; - } else { - console.log("Running in server mode, use Ctrl-c to stop"); - while (true) { - await sleep(1000); - } - } -} diff --git a/cli/plug_test.ts b/cli/plug_test.ts deleted file mode 100644 index f5de9980..00000000 --- a/cli/plug_test.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { datastore } from "$sb/syscalls.ts"; - -export async function run() { - console.log("Hello from plug_test.ts"); - await datastore.set(["plug_test"], "Hello"); - return "Hello"; -} diff --git a/cli/test.plug.yaml b/cli/test.plug.yaml deleted file mode 100644 index 1f7a0680..00000000 --- a/cli/test.plug.yaml +++ /dev/null @@ -1,6 +0,0 @@ -name: test -requiredPermissions: -- shell -functions: - run: - path: plug_test.ts:run diff --git a/cmd/plug_run.ts b/cmd/plug_run.ts deleted file mode 100644 index 2bb191df..00000000 --- a/cmd/plug_run.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { runPlug } from "../cli/plug_run.ts"; -import { path } from "../common/deps.ts"; -import assets from "../dist/plug_asset_bundle.json" assert { - type: "json", -}; -import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; - -export async function plugRunCommand( - { - hostname, - port, - }: { - hostname?: string; - port?: number; - }, - spacePath: string, - functionName: string | undefined, - ...args: string[] -) { - spacePath = path.resolve(spacePath); - console.log("Space path", spacePath); - console.log("Function to run:", functionName, "with arguments", args); - try { - const result = await runPlug( - spacePath, - functionName, - args, - new AssetBundle(assets), - port, - hostname, - ); - if (result) { - console.log("Output", result); - } - Deno.exit(0); - } catch (e: any) { - console.error(e.message); - Deno.exit(1); - } -} diff --git a/common/space_index.ts b/common/space_index.ts index 233d9d2b..0ee89f11 100644 --- a/common/space_index.ts +++ b/common/space_index.ts @@ -1,4 +1,4 @@ -import { DataStore } from "../plugos/lib/datastore.ts"; +import { IDataStore } from "../plugos/lib/datastore.ts"; import { System } from "../plugos/system.ts"; const indexVersionKey = ["$indexVersion"]; @@ -8,7 +8,7 @@ const desiredIndexVersion = 2; let indexOngoing = false; -export async function ensureSpaceIndex(ds: DataStore, system: System) { +export async function ensureSpaceIndex(ds: IDataStore, system: System) { const currentIndexVersion = await ds.get(indexVersionKey); console.info("Current space index version", currentIndexVersion); @@ -25,6 +25,6 @@ export async function ensureSpaceIndex(ds: DataStore, system: System) { } } -export async function markFullSpaceIndexComplete(ds: DataStore) { +export async function markFullSpaceIndexComplete(ds: IDataStore) { await ds.set(indexVersionKey, desiredIndexVersion); } diff --git a/plugos/hooks/mq.ts b/plugos/hooks/mq.ts index 08d83095..02564a05 100644 --- a/plugos/hooks/mq.ts +++ b/plugos/hooks/mq.ts @@ -3,6 +3,7 @@ import { System } from "../system.ts"; import { fullQueueName } from "../lib/mq_util.ts"; import { MQMessage } from "$sb/types.ts"; import { MessageQueue } from "../lib/mq.ts"; +import { throttle } from "$sb/lib/async.ts"; type MQSubscription = { queue: string; @@ -24,14 +25,14 @@ export class MQHook implements Hook { this.system = system; system.on({ plugLoaded: () => { - this.reloadQueues(); + this.throttledReloadQueues(); }, plugUnloaded: () => { - this.reloadQueues(); + this.throttledReloadQueues(); }, }); - this.reloadQueues(); + this.throttledReloadQueues(); } stop() { @@ -40,6 +41,10 @@ export class MQHook implements Hook { this.subscriptions = []; } + throttledReloadQueues = throttle(() => { + this.reloadQueues(); + }, 1000); + reloadQueues() { this.stop(); for (const plug of this.system.loadedPlugs.values()) { diff --git a/plugos/lib/mq.datastore.ts b/plugos/lib/mq.datastore.ts index 681b82b5..b49cb3d7 100644 --- a/plugos/lib/mq.datastore.ts +++ b/plugos/lib/mq.datastore.ts @@ -32,9 +32,10 @@ export class DataStoreMQ implements MessageQueue { }; }); - if (messages.length > 0) { - await this.ds.batchSet(messages); + if (messages.length === 0) { + return; } + await this.ds.batchSet(messages); // See if we can immediately process the message with a local subscription const localSubscriptions = this.localSubscriptions.get(queue); @@ -50,6 +51,8 @@ export class DataStoreMQ implements MessageQueue { } async poll(queue: string, maxItems: number): Promise { + // console.log("Polling", queue, maxItems); + // console.trace(); // Note: this is not happening in a transactional way, so we may get duplicate message delivery // Retrieve a batch of messages const messages = await this.ds.query({ diff --git a/plugos/syscalls/datastore.ts b/plugos/syscalls/datastore.ts index 43455792..8fd6d0db 100644 --- a/plugos/syscalls/datastore.ts +++ b/plugos/syscalls/datastore.ts @@ -1,5 +1,5 @@ import { KV, KvKey, KvQuery } from "$sb/types.ts"; -import type { DataStore } from "../lib/datastore.ts"; +import type { DataStore, IDataStore } from "../lib/datastore.ts"; import type { SyscallContext, SysCallMapping } from "../system.ts"; /** @@ -8,7 +8,7 @@ import type { SyscallContext, SysCallMapping } from "../system.ts"; * @param prefix prefix to scope all keys to to which the plug name will be appended */ export function dataStoreSyscalls( - ds: DataStore, + ds: IDataStore, prefix: KvKey = ["ds"], ): SysCallMapping { return { diff --git a/plugos/syscalls/transport.ts b/plugos/syscalls/transport.ts deleted file mode 100644 index eaf525b4..00000000 --- a/plugos/syscalls/transport.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { SyscallContext, SysCallMapping } from "../system.ts"; - -export function proxySyscalls( - names: string[], - transportCall: ( - ctx: SyscallContext, - name: string, - ...args: any[] - ) => Promise, -): SysCallMapping { - const syscalls: SysCallMapping = {}; - - for (const name of names) { - syscalls[name] = (ctx, ...args: any[]) => { - return transportCall(ctx, name, ...args); - }; - } - - return syscalls; -} diff --git a/plugs/index/api.ts b/plugs/index/api.ts index 6cad9b38..af8441da 100644 --- a/plugs/index/api.ts +++ b/plugs/index/api.ts @@ -64,7 +64,7 @@ export async function clearIndex(): Promise { /** * Indexes entities in the data store */ -export async function indexObjects( +export function indexObjects( page: string, objects: ObjectValue[], ): Promise { @@ -127,14 +127,12 @@ export async function indexObjects( } } if (allAttributes.size > 0) { - await indexObjects( - page, - [...allAttributes].map(([key, value]) => { - const [tagName, name] = key.split(":"); - const attributeType = value.startsWith("!") - ? value.substring(1) - : value; - return { + [...allAttributes].forEach(([key, value]) => { + const [tagName, name] = key.split(":"); + const attributeType = value.startsWith("!") ? value.substring(1) : value; + kvs.push({ + key: ["attribute", cleanKey(key, page)], + value: { ref: key, tag: "attribute", tagName, @@ -142,12 +140,14 @@ export async function indexObjects( attributeType, readOnly: value.startsWith("!"), page, - }; - }), - ); + } as T, + }); + }); } if (kvs.length > 0) { return batchSet(page, kvs); + } else { + return Promise.resolve(); } } diff --git a/plugs/index/builtins.ts b/plugs/index/builtins.ts index b7f0f9ac..0674079b 100644 --- a/plugs/index/builtins.ts +++ b/plugs/index/builtins.ts @@ -90,31 +90,28 @@ export const builtins: Record> = { export async function loadBuiltinsIntoIndex() { console.log("Loading builtins attributes into index"); - const allTags: ObjectValue[] = []; + const allObjects: ObjectValue[] = []; for (const [tagName, attributes] of Object.entries(builtins)) { - allTags.push({ + allObjects.push({ ref: tagName, tag: "tag", name: tagName, page: builtinPseudoPage, parent: "builtin", }); - await indexObjects( - builtinPseudoPage, - Object.entries(attributes).map(([name, attributeType]) => { - return { - ref: `${tagName}:${name}`, - tag: "attribute", - tagName, - name, - attributeType: attributeType.startsWith("!") - ? attributeType.substring(1) - : attributeType, - readOnly: attributeType.startsWith("!"), - page: builtinPseudoPage, - }; - }), + allObjects.push( + ...Object.entries(attributes).map(([name, attributeType]) => ({ + ref: `${tagName}:${name}`, + tag: "attribute", + tagName, + name, + attributeType: attributeType.startsWith("!") + ? attributeType.substring(1) + : attributeType, + readOnly: attributeType.startsWith("!"), + page: builtinPseudoPage, + })), ); } - await indexObjects(builtinPseudoPage, allTags); + await indexObjects(builtinPseudoPage, allObjects); } diff --git a/server/http_server.ts b/server/http_server.ts index 0cdb837d..fc26d7e2 100644 --- a/server/http_server.ts +++ b/server/http_server.ts @@ -8,7 +8,12 @@ import { } from "./deps.ts"; import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; import { FileMeta } from "$sb/types.ts"; -import { ShellRequest, SyscallRequest, SyscallResponse } from "./rpc.ts"; +import { + handleRpc, + ShellRequest, + SyscallRequest, + SyscallResponse, +} from "./rpc.ts"; import { determineShellBackend } from "./shell_backend.ts"; import { SpaceServer, SpaceServerConfig } from "./instance.ts"; import { KvPrimitives } from "../plugos/lib/kv_primitives.ts"; @@ -132,13 +137,6 @@ export class HttpServer { // Serve static files (javascript, css, html) this.app.use(this.serveStatic.bind(this)); - const endpointHook = new EndpointHook("/_/"); - - this.app.use(async (context, next) => { - const spaceServer = await this.ensureSpaceServer(context.request); - return endpointHook.handleRequest(spaceServer.system!, context, next); - }); - this.addAuth(this.app); const fsRouter = this.addFsRoutes(); this.app.use(fsRouter.routes()); @@ -395,45 +393,9 @@ export class HttpServer { limit: 100 * 1024 * 1024, }).value; try { - if (operation === "shell") { - const shellCommand: ShellRequest = body; - const shellResponse = await spaceServer.shellBackend.handle( - shellCommand, - ); - response.headers.set("Content-Type", "application/json"); - response.body = JSON.stringify(shellResponse); - return; - } else { - // Syscall - if (spaceServer.syncOnly) { - response.headers.set("Content-Type", "text/plain"); - response.status = 400; - response.body = "Unknown operation"; - return; - } - const [plugName, syscall] = operation.split("/"); - const args: any[] = body; - try { - // console.log("Now invoking", operation, "with", args); - const result = await spaceServer.system!.localSyscall( - plugName, - syscall, - args, - ); - response.headers.set("Content-type", "application/json"); - response.status = 200; - response.body = JSON.stringify({ - result: result, - } as SyscallResponse); - } catch (e: any) { - response.headers.set("Content-type", "application/json"); - response.status = 500; - response.body = JSON.stringify({ - error: e.message, - } as SyscallResponse); - } - return; - } + const resp = await handleRpc(spaceServer, operation, body); + response.headers.set("Content-Type", "application/json"); + response.body = JSON.stringify({ r: resp }); } catch (e: any) { console.log("Error", e); response.status = 500; diff --git a/server/instance.ts b/server/instance.ts index 8b0bdbb6..054d595e 100644 --- a/server/instance.ts +++ b/server/instance.ts @@ -1,15 +1,14 @@ -import { SilverBulletHooks } from "../common/manifest.ts"; import { AssetBundlePlugSpacePrimitives } from "../common/spaces/asset_bundle_space_primitives.ts"; import { FilteredSpacePrimitives } from "../common/spaces/filtered_space_primitives.ts"; import { SpacePrimitives } from "../common/spaces/space_primitives.ts"; import { ensureSettingsAndIndex } from "../common/util.ts"; import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; +import { DataStore } from "../plugos/lib/datastore.ts"; import { KvPrimitives } from "../plugos/lib/kv_primitives.ts"; -import { System } from "../plugos/system.ts"; +import { PrefixedKvPrimitives } from "../plugos/lib/prefixed_kv_primitives.ts"; import { BuiltinSettings } from "../web/types.ts"; import { JWTIssuer } from "./crypto.ts"; import { gitIgnoreCompiler } from "./deps.ts"; -import { ServerSystem } from "./server_system.ts"; import { ShellBackend } from "./shell_backend.ts"; import { determineStorageBackend } from "./storage_backend.ts"; @@ -31,16 +30,14 @@ export class SpaceServer { authToken?: string; hostname: string; - private settings?: BuiltinSettings; + // private settings?: BuiltinSettings; spacePrimitives!: SpacePrimitives; jwtIssuer: JWTIssuer; - // Only set when syncOnly == false - private serverSystem?: ServerSystem; - system?: System; clientEncryption: boolean; syncOnly: boolean; + ds: DataStore; constructor( config: SpaceServerConfig, @@ -60,38 +57,15 @@ export class SpaceServer { } this.jwtIssuer = new JWTIssuer(kvPrimitives); + this.ds = new DataStore(new PrefixedKvPrimitives(kvPrimitives, ["ds"])); } async init() { - let fileFilterFn: (s: string) => boolean = () => true; - - this.spacePrimitives = new FilteredSpacePrimitives( - new AssetBundlePlugSpacePrimitives( - await determineStorageBackend(this.kvPrimitives, this.pagesPath), - this.plugAssetBundle, - ), - (meta) => fileFilterFn(meta.name), - async () => { - await this.reloadSettings(); - if (typeof this.settings?.spaceIgnore === "string") { - fileFilterFn = gitIgnoreCompiler(this.settings.spaceIgnore).accepts; - } else { - fileFilterFn = () => true; - } - }, + this.spacePrimitives = new AssetBundlePlugSpacePrimitives( + await determineStorageBackend(this.kvPrimitives, this.pagesPath), + this.plugAssetBundle, ); - // system = undefined in databaseless mode (no PlugOS instance on the server and no DB) - if (!this.syncOnly) { - // Enable server-side processing - const serverSystem = new ServerSystem( - this.spacePrimitives, - this.kvPrimitives, - this.shellBackend, - ); - this.serverSystem = serverSystem; - } - if (this.auth) { // Initialize JWT issuer await this.jwtIssuer.init( @@ -99,25 +73,6 @@ export class SpaceServer { ); } - if (this.serverSystem) { - await this.serverSystem.init(); - this.system = this.serverSystem.system; - // Swap in the space primitives from the server system - this.spacePrimitives = this.serverSystem.spacePrimitives; - } - - await this.reloadSettings(); console.log("Booted server with hostname", this.hostname); } - - async reloadSettings() { - if (!this.clientEncryption) { - // Only attempt this when the space is not encrypted - this.settings = await ensureSettingsAndIndex(this.spacePrimitives); - } else { - this.settings = { - indexPage: "index", - }; - } - } } diff --git a/server/rpc.ts b/server/rpc.ts index c52321b0..7e8311d8 100644 --- a/server/rpc.ts +++ b/server/rpc.ts @@ -1,3 +1,7 @@ +import { shell } from "$sb/syscalls.ts"; +import { KV, KvKey, KvQuery } from "$sb/types.ts"; +import { SpaceServer } from "./instance.ts"; + export type ShellRequest = { cmd: string; args: string[]; @@ -19,3 +23,37 @@ export type SyscallResponse = { result?: any; error?: string; }; + +export async function handleRpc( + spaceServer: SpaceServer, + name: string, + body: any, +): Promise { + switch (name) { + case "shell": { + const shellCommand: ShellRequest = body; + const shellResponse = await spaceServer.shellBackend.handle( + shellCommand, + ); + return shellResponse; + } + case "datastore.batchGet": { + const [keys]: [KvKey[]] = body; + return spaceServer.ds.batchGet(keys); + } + case "datastore.batchSet": { + const [entries]: [KV[]] = body; + return spaceServer.ds.batchSet(entries); + } + case "datastore.batchDelete": { + const [keys]: [KvKey[]] = body; + return spaceServer.ds.batchDelete(keys); + } + case "datastore.query": { + const [query]: [KvQuery] = body; + return spaceServer.ds.query(query); + } + default: + throw new Error(`Unknown rpc ${name}`); + } +} diff --git a/server/server_system.ts b/server/server_system.ts deleted file mode 100644 index 7905a362..00000000 --- a/server/server_system.ts +++ /dev/null @@ -1,198 +0,0 @@ -import { PlugNamespaceHook } from "../common/hooks/plug_namespace.ts"; -import { SilverBulletHooks } from "../common/manifest.ts"; -import { loadMarkdownExtensions } from "../common/markdown_parser/markdown_ext.ts"; -import buildMarkdown from "../common/markdown_parser/parser.ts"; -import { EventedSpacePrimitives } from "../common/spaces/evented_space_primitives.ts"; -import { PlugSpacePrimitives } from "../common/spaces/plug_space_primitives.ts"; -import { createSandbox } from "../plugos/environments/webworker_sandbox.ts"; -import { CronHook } from "../plugos/hooks/cron.ts"; -import { EventHook } from "../plugos/hooks/event.ts"; -import { MQHook } from "../plugos/hooks/mq.ts"; -import assetSyscalls from "../plugos/syscalls/asset.ts"; -import { eventSyscalls } from "../plugos/syscalls/event.ts"; -import { mqSyscalls } from "../plugos/syscalls/mq.ts"; -import { System } from "../plugos/system.ts"; -import { Space } from "../web/space.ts"; -import { debugSyscalls } from "../web/syscalls/debug.ts"; -import { markdownSyscalls } from "../common/syscalls/markdown.ts"; -import { spaceSyscalls } from "./syscalls/space.ts"; -import { systemSyscalls } from "../web/syscalls/system.ts"; -import { yamlSyscalls } from "../common/syscalls/yaml.ts"; -import { sandboxFetchSyscalls } from "../plugos/syscalls/fetch.ts"; -import { shellSyscalls } from "./syscalls/shell.ts"; -import { SpacePrimitives } from "../common/spaces/space_primitives.ts"; -import { base64EncodedDataUrl } from "../plugos/asset_bundle/base64.ts"; -import { Plug } from "../plugos/plug.ts"; -import { DataStore } from "../plugos/lib/datastore.ts"; -import { dataStoreSyscalls } from "../plugos/syscalls/datastore.ts"; -import { DataStoreMQ } from "../plugos/lib/mq.datastore.ts"; -import { languageSyscalls } from "../common/syscalls/language.ts"; -import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts"; -import { codeWidgetSyscalls } from "../web/syscalls/code_widget.ts"; -import { CodeWidgetHook } from "../web/hooks/code_widget.ts"; -import { KVPrimitivesManifestCache } from "../plugos/manifest_cache.ts"; -import { KvPrimitives } from "../plugos/lib/kv_primitives.ts"; -import { ShellBackend } from "./shell_backend.ts"; -import { ensureSpaceIndex } from "../common/space_index.ts"; - -const fileListInterval = 30 * 1000; // 30s - -const plugNameExtractRegex = /([^/]+)\.plug\.js$/; - -export class ServerSystem { - system!: System; - public spacePrimitives!: SpacePrimitives; - // denoKv!: Deno.Kv; - listInterval?: number; - ds!: DataStore; - - constructor( - private baseSpacePrimitives: SpacePrimitives, - readonly kvPrimitives: KvPrimitives, - private shellBackend: ShellBackend, - ) { - } - - // Always needs to be invoked right after construction - async init(awaitIndex = false) { - this.ds = new DataStore(this.kvPrimitives); - - this.system = new System( - "server", - { - manifestCache: new KVPrimitivesManifestCache( - this.kvPrimitives, - "manifest", - ), - plugFlushTimeout: 5 * 60 * 1000, // 5 minutes - }, - ); - - // Event hook - const eventHook = new EventHook(); - this.system.addHook(eventHook); - - // Cron hook - const cronHook = new CronHook(this.system); - this.system.addHook(cronHook); - - const mq = new DataStoreMQ(this.ds); - - setInterval(() => { - // Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ) - mq.requeueTimeouts(5000, 3, true).catch(console.error); - }, 20000); // Look to requeue every 20s - - const plugNamespaceHook = new PlugNamespaceHook(); - this.system.addHook(plugNamespaceHook); - - this.system.addHook(new MQHook(this.system, mq)); - - const codeWidgetHook = new CodeWidgetHook(); - - this.system.addHook(codeWidgetHook); - - this.spacePrimitives = new EventedSpacePrimitives( - new PlugSpacePrimitives( - this.baseSpacePrimitives, - plugNamespaceHook, - ), - eventHook, - ); - const space = new Space(this.spacePrimitives, eventHook); - - // Add syscalls - this.system.registerSyscalls( - [], - eventSyscalls(eventHook), - spaceSyscalls(space), - assetSyscalls(this.system), - yamlSyscalls(), - systemSyscalls(this.system), - mqSyscalls(mq), - languageSyscalls(), - handlebarsSyscalls(), - dataStoreSyscalls(this.ds), - debugSyscalls(), - codeWidgetSyscalls(codeWidgetHook), - markdownSyscalls(buildMarkdown([])), // Will later be replaced with markdown extensions - ); - - // Syscalls that require some additional permissions - this.system.registerSyscalls( - ["fetch"], - sandboxFetchSyscalls(), - ); - - this.system.registerSyscalls( - ["shell"], - shellSyscalls(this.shellBackend), - ); - - await this.loadPlugs(); - - // Load markdown syscalls based on all new syntax (if any) - this.system.registerSyscalls( - [], - markdownSyscalls(buildMarkdown(loadMarkdownExtensions(this.system))), - ); - - this.listInterval = setInterval(() => { - space.updatePageList().catch(console.error); - }, fileListInterval); - - eventHook.addLocalListener("file:changed", (path, localChange) => { - (async () => { - if (!localChange && path.endsWith(".md")) { - const pageName = path.slice(0, -3); - const data = await this.spacePrimitives.readFile(path); - console.log("Outside page change: reindexing", pageName); - // Change made outside of editor, trigger reindex - await eventHook.dispatchEvent("page:index_text", { - name: pageName, - text: new TextDecoder().decode(data.data), - }); - } - - if (path.startsWith("_plug/") && path.endsWith(".plug.js")) { - console.log("Plug updated, reloading:", path); - this.system.unload(path); - await this.loadPlugFromSpace(path); - } - })().catch(console.error); - }); - - // Ensure a valid index - const indexPromise = ensureSpaceIndex(this.ds, this.system); - if (awaitIndex) { - await indexPromise; - } - - await eventHook.dispatchEvent("system:ready"); - } - - async loadPlugs() { - for (const { name } of await this.spacePrimitives.fetchFileList()) { - if (plugNameExtractRegex.test(name)) { - await this.loadPlugFromSpace(name); - } - } - } - - async loadPlugFromSpace(path: string): Promise> { - const { meta, data } = await this.spacePrimitives.readFile(path); - const plugName = path.match(plugNameExtractRegex)![1]; - return this.system.load( - // Base64 encoding this to support `deno compile` mode - new URL(base64EncodedDataUrl("application/javascript", data)), - plugName, - meta.lastModified, - createSandbox, - ); - } - - async close() { - clearInterval(this.listInterval); - await this.system.unloadAll(); - } -} diff --git a/silverbullet.ts b/silverbullet.ts index 35b37249..16be8cdd 100755 --- a/silverbullet.ts +++ b/silverbullet.ts @@ -7,7 +7,6 @@ import { upgradeCommand } from "./cmd/upgrade.ts"; import { versionCommand } from "./cmd/version.ts"; import { serveCommand } from "./cmd/server.ts"; import { plugCompileCommand } from "./cmd/plug_compile.ts"; -import { plugRunCommand } from "./cmd/plug_run.ts"; import { syncCommand } from "./cmd/sync.ts"; await new Command() @@ -72,15 +71,6 @@ await new Command() .option("--importmap ", "Path to import map file to use") .option("--runtimeUrl ", "URL to worker_runtime.ts to use") .action(plugCompileCommand) - // plug:run - .command("plug:run", "Run a PlugOS function from the CLI") - .arguments(" [function] [...args:string]") - .option( - "--hostname, -L ", - "Hostname or address to listen on", - ) - .option("-p, --port ", "Port to listen on") - .action(plugRunCommand) // upgrade .command("upgrade", "Upgrade SilverBullet") .action(upgradeCommand) diff --git a/web/client.ts b/web/client.ts index ad15ce3d..774cc1ca 100644 --- a/web/client.ts +++ b/web/client.ts @@ -109,12 +109,12 @@ export class Client { ui!: MainUI; openPages!: OpenPages; - stateDataStore!: DataStore; - mq!: DataStoreMQ; // Used by the "wiki link" highlighter to check if a page exists public allKnownPages = new Set(); - remoteDataStore!: IDataStore; + clientDS!: DataStore; + mq!: DataStoreMQ; + ds!: IDataStore; constructor( private parent: Element, @@ -140,15 +140,15 @@ export class Client { `${this.dbPrefix}_state`, ); await stateKvPrimitives.init(); - this.stateDataStore = new DataStore(stateKvPrimitives); + this.clientDS = new DataStore(stateKvPrimitives); - // Only used in online mode - this.remoteDataStore = new RemoteDataStore(this.httpSpacePrimitives); + // In sync mode, reuse the clientDS, otherwise talk to a remote data store (over HTTP) + this.ds = this.syncMode + ? this.clientDS + : new RemoteDataStore(this.httpSpacePrimitives); // Setup message queue - this.mq = new DataStoreMQ( - this.syncMode ? this.stateDataStore : this.remoteDataStore, - ); + this.mq = new DataStoreMQ(this.ds); setInterval(() => { // Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ) @@ -162,7 +162,8 @@ export class Client { this.system = new ClientSystem( this, this.mq, - this.stateDataStore, + this.clientDS, + this.ds, this.eventHook, ); @@ -172,7 +173,7 @@ export class Client { ? new SyncService( localSpacePrimitives, this.plugSpaceRemotePrimitives, - this.stateDataStore, + this.clientDS, this.eventHook, (path) => { // TODO: At some point we should remove the data.db exception here @@ -252,6 +253,12 @@ export class Client { private async initSync() { this.syncService.start(); + if (!this.syncMode) { + ensureSpaceIndex(this.ds, this.system.system).catch( + console.error, + ); + } + // We're still booting, if a initial sync has already been completed we know this is the initial sync const initialSync = !await this.syncService.hasInitialSyncCompleted(); @@ -270,12 +277,12 @@ export class Client { // A full sync just completed if (!initialSync) { // If this was NOT the initial sync let's check if we need to perform a space reindex - ensureSpaceIndex(this.stateDataStore, this.system.system).catch( + ensureSpaceIndex(this.ds, this.system.system).catch( console.error, ); } else { // This was the initial sync, let's mark a full index as completed - await markFullSpaceIndexComplete(this.stateDataStore); + await markFullSpaceIndexComplete(this.ds); } } if (operations) { @@ -368,14 +375,14 @@ export class Client { scrollIntoView: true, }); } - await this.stateDataStore.set(["client", "lastOpenedPage"], pageName); + await this.clientDS.set(["client", "lastOpenedPage"], pageName); }, ); if (location.hash === "#boot") { (async () => { // Cold start PWA load - const lastPage = await this.stateDataStore.get([ + const lastPage = await this.clientDS.get([ "client", "lastOpenedPage", ]); @@ -420,7 +427,7 @@ export class Client { } await encryptedSpacePrimitives.setup(password); // this.stateDataStore.set(["encryptionKey"], password); - await this.stateDataStore.set( + await this.ds.set( ["spaceSalt"], encryptedSpacePrimitives.spaceSalt, ); @@ -432,7 +439,7 @@ export class Client { "Offline, will assume encryption space is initialized, fetching salt from data store", ); await encryptedSpacePrimitives.init( - await this.stateDataStore.get(["spaceSalt"]), + await this.ds.get(["spaceSalt"]), ); } } @@ -442,7 +449,7 @@ export class Client { await encryptedSpacePrimitives.login( prompt("Password")!, ); - await this.stateDataStore.set( + await this.ds.set( ["spaceSalt"], encryptedSpacePrimitives.spaceSalt, ); @@ -1042,7 +1049,7 @@ export class Client { async loadCaches() { const [widgetHeightCache, widgetCache] = await this - .stateDataStore.batchGet([[ + .clientDS.batchGet([[ "cache", "widgetHeight", ], ["cache", "widgets"]]); @@ -1051,7 +1058,7 @@ export class Client { } debouncedWidgetHeightCacheFlush = throttle(() => { - this.stateDataStore.set( + this.clientDS.set( ["cache", "widgetHeight"], this.widgetHeightCache.toJSON(), ) @@ -1070,7 +1077,7 @@ export class Client { } debouncedWidgetCacheFlush = throttle(() => { - this.stateDataStore.set(["cache", "widgets"], this.widgetCache.toJSON()) + this.clientDS.set(["cache", "widgets"], this.widgetCache.toJSON()) .catch( console.error, ); diff --git a/web/client_system.ts b/web/client_system.ts index 05fbeb78..426c1e09 100644 --- a/web/client_system.ts +++ b/web/client_system.ts @@ -29,9 +29,8 @@ import { } from "../common/markdown_parser/markdown_ext.ts"; import { MQHook } from "../plugos/hooks/mq.ts"; import { mqSyscalls } from "../plugos/syscalls/mq.ts"; -import { dataStoreProxySyscalls } from "./syscalls/datastore.proxy.ts"; import { dataStoreSyscalls } from "../plugos/syscalls/datastore.ts"; -import { DataStore } from "../plugos/lib/datastore.ts"; +import { DataStore, IDataStore } from "../plugos/lib/datastore.ts"; import { MessageQueue } from "../plugos/lib/mq.ts"; import { languageSyscalls } from "../common/syscalls/language.ts"; import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts"; @@ -56,7 +55,8 @@ export class ClientSystem { constructor( private client: Client, private mq: MessageQueue, - private ds: DataStore, + private clientDs: DataStore, + private dataStore: IDataStore, private eventHook: EventHook, ) { // Only set environment to "client" when running in thin client mode, otherwise we run everything locally (hybrid) @@ -65,7 +65,7 @@ export class ClientSystem { undefined, { manifestCache: new KVPrimitivesManifestCache( - ds.kv, + clientDs.kv, "manifest", ), }, @@ -165,12 +165,10 @@ export class ClientSystem { clientCodeWidgetSyscalls(), languageSyscalls(), mqSyscalls(this.mq), - this.client.syncMode - ? dataStoreSyscalls(this.ds) - : dataStoreProxySyscalls(this.client), + dataStoreSyscalls(this.dataStore), debugSyscalls(), syncSyscalls(this.client), - clientStoreSyscalls(this.ds), + clientStoreSyscalls(this.clientDs), ); // Syscalls that require some additional permissions diff --git a/web/remote_datastore.ts b/web/remote_datastore.ts index 26a1f606..2e0f1680 100644 --- a/web/remote_datastore.ts +++ b/web/remote_datastore.ts @@ -1,7 +1,7 @@ import { HttpSpacePrimitives } from "../common/spaces/http_space_primitives.ts"; import { KV, KvKey, KvQuery } from "$sb/types.ts"; -import { proxySyscall } from "./syscalls/util.ts"; import { IDataStore } from "../plugos/lib/datastore.ts"; +import { rpcCall } from "./syscalls/datastore.proxy.ts"; // implements DataStore "interface" export class RemoteDataStore implements IDataStore { @@ -10,45 +10,46 @@ export class RemoteDataStore implements IDataStore { private proxy( name: string, - args: any[], + ...args: any[] ) { - return proxySyscall( - { plug: { name: "index" } } as any, + // console.trace(); + return rpcCall( this.httpPrimitives, name, - args, + ...args, ); } - get(key: KvKey): Promise { - return this.proxy("datastore.get", [key]); + async get(key: KvKey): Promise { + const results = await this.batchGet([key]); + return results[0]; } batchGet(keys: KvKey[]): Promise<(T | null)[]> { - return this.proxy("datastore.batchGet", [keys]); + return this.proxy("datastore.batchGet", keys); } set(key: KvKey, value: any): Promise { - return this.proxy("datastore.set", [key, value]); + return this.batchSet([{ key, value }]); } batchSet(entries: KV[]): Promise { - return this.proxy("datastore.batchSet", [entries]); + return this.proxy("datastore.batchSet", entries); } delete(key: KvKey): Promise { - return this.proxy("datastore.delete", [key]); + return this.batchDelete([key]); } batchDelete(keys: KvKey[]): Promise { - return this.proxy("datastore.batchDelete", [keys]); + return this.proxy("datastore.batchDelete", keys); } query(query: KvQuery): Promise[]> { - return this.proxy("datastore.query", [query]); + return this.proxy("datastore.query", query); } queryDelete(query: KvQuery): Promise { - return this.proxy("datastore.queryDelete", [query]); + return this.proxy("datastore.queryDelete", query); } } diff --git a/web/syscalls/datastore.proxy.ts b/web/syscalls/datastore.proxy.ts index 0e65f6c2..860e37ad 100644 --- a/web/syscalls/datastore.proxy.ts +++ b/web/syscalls/datastore.proxy.ts @@ -1,15 +1,90 @@ -import type { SysCallMapping } from "../../plugos/system.ts"; -import type { Client } from "../client.ts"; -import { proxySyscalls } from "./util.ts"; +import { HttpSpacePrimitives } from "../../common/spaces/http_space_primitives.ts"; +import { KV, KvKey, KvQuery } from "$sb/types.ts"; +import type { SyscallContext, SysCallMapping } from "../../plugos/system.ts"; -export function dataStoreProxySyscalls(client: Client): SysCallMapping { - return proxySyscalls(client, [ - "datastore.delete", - "datastore.set", - "datastore.batchSet", - "datastore.batchDelete", - "datastore.batchGet", - "datastore.get", - "datastore.query", - ]); +export function dataStoreProxySyscalls( + httpSpacePrimitives: HttpSpacePrimitives, +): SysCallMapping { + return { + "datastore.delete": (ctx, key: KvKey) => { + return rpcCall(httpSpacePrimitives, "datastore.batchDelete", [ + addPrefix(ctx, key), + ]); + }, + "datastore.batchDelete": (ctx, keys: KvKey[]) => { + return rpcCall( + httpSpacePrimitives, + "datastore.batchDelete", + keys.map((key) => addPrefix(ctx, key)), + ); + }, + "datastore.set": (ctx, key: KvKey, value: any) => { + return rpcCall(httpSpacePrimitives, "datastore.batchSet", [ + { key: addPrefix(ctx, key), value: value }, + ]); + }, + "datastore.batchSet": (ctx, entries: KV[]) => { + return rpcCall( + httpSpacePrimitives, + "datastore.batchSet", + entries.map(({ key, value }) => ({ key: addPrefix(ctx, key), value })), + ); + }, + "datastore.get": async (ctx, key: KvKey) => { + const [result] = await rpcCall( + httpSpacePrimitives, + "datastore.batchGet", + [ + addPrefix(ctx, key), + ], + ); + return result; + }, + "datastore.batchGet": (ctx, keys: KvKey[]) => { + return rpcCall( + httpSpacePrimitives, + "datastore.batchGet", + keys.map((key) => addPrefix(ctx, key)), + ); + }, + "datastore.query": async (ctx, query: KvQuery) => { + const results: KV[] = await rpcCall( + httpSpacePrimitives, + "datastore.query", + { ...query, prefix: addPrefix(ctx, query.prefix || []) }, + ); + return results.map(({ key, value }) => ({ + key: stripPrefix(key), + value, + })); + }, + }; + + function addPrefix(ctx: SyscallContext, key: KvKey): KvKey { + return [ctx.plug.name, ...key]; + } + + function stripPrefix(key: KvKey): KvKey { + return key.slice(1); + } +} + +export async function rpcCall( + httpSpacePrimitives: HttpSpacePrimitives, + name: string, + ...args: any[] +): Promise { + const resp = await httpSpacePrimitives.authenticatedFetch( + `${httpSpacePrimitives.url}/.rpc/${name}`, + { + method: "POST", + body: JSON.stringify(args), + }, + ); + if (!resp.ok) { + const errorText = await resp.text(); + console.error("Remote rpc error", errorText); + throw new Error(errorText); + } + return (await resp.json()).r; } diff --git a/web/syscalls/system.ts b/web/syscalls/system.ts index fff3826b..46985201 100644 --- a/web/syscalls/system.ts +++ b/web/syscalls/system.ts @@ -2,7 +2,6 @@ import type { Plug } from "../../plugos/plug.ts"; import { SysCallMapping, System } from "../../plugos/system.ts"; import type { Client } from "../client.ts"; import { CommandDef } from "../hooks/command.ts"; -import { proxySyscall } from "./util.ts"; export function systemSyscalls( system: System, diff --git a/web/syscalls/util.ts b/web/syscalls/util.ts deleted file mode 100644 index 0d3d0c4e..00000000 --- a/web/syscalls/util.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { HttpSpacePrimitives } from "../../common/spaces/http_space_primitives.ts"; -import { SyscallContext, SysCallMapping } from "../../plugos/system.ts"; -import { SyscallResponse } from "../../server/rpc.ts"; -import { Client } from "../client.ts"; - -export function proxySyscalls(client: Client, names: string[]): SysCallMapping { - const syscalls: SysCallMapping = {}; - for (const name of names) { - syscalls[name] = (ctx, ...args: any[]) => { - return proxySyscall(ctx, client.httpSpacePrimitives, name, args); - }; - } - return syscalls; -} - -export async function proxySyscall( - ctx: SyscallContext, - httpSpacePrimitives: HttpSpacePrimitives, - name: string, - args: any[], -): Promise { - const resp = await httpSpacePrimitives.authenticatedFetch( - `${httpSpacePrimitives.url}/.rpc/${ctx.plug.name}/${name}`, - { - method: "POST", - body: JSON.stringify(args), - }, - ); - const result: SyscallResponse = await resp.json(); - if (result.error) { - console.error("Remote syscall error", result.error); - throw new Error(result.error); - } else { - return result.result; - } -}