Optimizations

pull/628/head
Zef Hemel 2024-01-11 20:59:11 +01:00
parent f577661128
commit b644801e7b
23 changed files with 237 additions and 626 deletions

View File

@ -1,43 +0,0 @@
import { AssetBundle } from "../plugos/asset_bundle/bundle.ts";
import { compileManifest } from "../plugos/compile.ts";
import { esbuild } from "../plugos/deps.ts";
import { runPlug } from "./plug_run.ts";
import assets from "../dist/plug_asset_bundle.json" assert {
type: "json",
};
import { assertEquals } from "../test_deps.ts";
import { path } from "../common/deps.ts";
Deno.test("Test plug run", {
sanitizeResources: false,
sanitizeOps: false,
}, async () => {
// const tempDir = await Deno.makeTempDir();
const tempDbFile = await Deno.makeTempFile({ suffix: ".db" });
const assetBundle = new AssetBundle(assets);
const testFolder = path.dirname(new URL(import.meta.url).pathname);
const testSpaceFolder = path.join(testFolder, "test_space");
const plugFolder = path.join(testSpaceFolder, "_plug");
await Deno.mkdir(plugFolder, { recursive: true });
await compileManifest(
path.join(testFolder, "test.plug.yaml"),
plugFolder,
);
assertEquals(
await runPlug(
testSpaceFolder,
"test.run",
[],
assetBundle,
),
"Hello",
);
// await Deno.remove(tempDir, { recursive: true });
esbuild.stop();
await Deno.remove(tempDbFile);
});

View File

@ -1,69 +0,0 @@
import { DiskSpacePrimitives } from "../common/spaces/disk_space_primitives.ts";
import { AssetBundle } from "../plugos/asset_bundle/bundle.ts";
import { Application } from "../server/deps.ts";
import { sleep } from "$sb/lib/async.ts";
import { ServerSystem } from "../server/server_system.ts";
import { AssetBundlePlugSpacePrimitives } from "../common/spaces/asset_bundle_space_primitives.ts";
import { determineDatabaseBackend } from "../server/db_backend.ts";
import { EndpointHook } from "../plugos/hooks/endpoint.ts";
import { determineShellBackend } from "../server/shell_backend.ts";
export async function runPlug(
spacePath: string,
functionName: string | undefined,
args: string[] = [],
builtinAssetBundle: AssetBundle,
httpServerPort = 3123,
httpHostname = "127.0.0.1",
) {
const serverController = new AbortController();
const app = new Application();
const dbBackend = await determineDatabaseBackend(spacePath);
if (!dbBackend) {
console.error("Cannot run plugs in databaseless mode.");
return;
}
const endpointHook = new EndpointHook("/_/");
const serverSystem = new ServerSystem(
new AssetBundlePlugSpacePrimitives(
new DiskSpacePrimitives(spacePath),
builtinAssetBundle,
),
dbBackend,
determineShellBackend(spacePath),
);
await serverSystem.init(true);
app.use((context, next) => {
return endpointHook.handleRequest(serverSystem.system!, context, next);
});
app.listen({
hostname: httpHostname,
port: httpServerPort,
signal: serverController.signal,
});
if (functionName) {
const [plugName, funcName] = functionName.split(".");
const plug = serverSystem.system.loadedPlugs.get(plugName);
if (!plug) {
throw new Error(`Plug ${plugName} not found`);
}
const result = await plug.invoke(funcName, args);
await serverSystem.close();
serverSystem.kvPrimitives.close();
serverController.abort();
return result;
} else {
console.log("Running in server mode, use Ctrl-c to stop");
while (true) {
await sleep(1000);
}
}
}

View File

@ -1,7 +0,0 @@
import { datastore } from "$sb/syscalls.ts";
export async function run() {
console.log("Hello from plug_test.ts");
await datastore.set(["plug_test"], "Hello");
return "Hello";
}

View File

@ -1,6 +0,0 @@
name: test
requiredPermissions:
- shell
functions:
run:
path: plug_test.ts:run

View File

@ -1,40 +0,0 @@
import { runPlug } from "../cli/plug_run.ts";
import { path } from "../common/deps.ts";
import assets from "../dist/plug_asset_bundle.json" assert {
type: "json",
};
import { AssetBundle } from "../plugos/asset_bundle/bundle.ts";
export async function plugRunCommand(
{
hostname,
port,
}: {
hostname?: string;
port?: number;
},
spacePath: string,
functionName: string | undefined,
...args: string[]
) {
spacePath = path.resolve(spacePath);
console.log("Space path", spacePath);
console.log("Function to run:", functionName, "with arguments", args);
try {
const result = await runPlug(
spacePath,
functionName,
args,
new AssetBundle(assets),
port,
hostname,
);
if (result) {
console.log("Output", result);
}
Deno.exit(0);
} catch (e: any) {
console.error(e.message);
Deno.exit(1);
}
}

View File

@ -1,4 +1,4 @@
import { DataStore } from "../plugos/lib/datastore.ts"; import { IDataStore } from "../plugos/lib/datastore.ts";
import { System } from "../plugos/system.ts"; import { System } from "../plugos/system.ts";
const indexVersionKey = ["$indexVersion"]; const indexVersionKey = ["$indexVersion"];
@ -8,7 +8,7 @@ const desiredIndexVersion = 2;
let indexOngoing = false; let indexOngoing = false;
export async function ensureSpaceIndex(ds: DataStore, system: System<any>) { export async function ensureSpaceIndex(ds: IDataStore, system: System<any>) {
const currentIndexVersion = await ds.get(indexVersionKey); const currentIndexVersion = await ds.get(indexVersionKey);
console.info("Current space index version", currentIndexVersion); console.info("Current space index version", currentIndexVersion);
@ -25,6 +25,6 @@ export async function ensureSpaceIndex(ds: DataStore, system: System<any>) {
} }
} }
export async function markFullSpaceIndexComplete(ds: DataStore) { export async function markFullSpaceIndexComplete(ds: IDataStore) {
await ds.set(indexVersionKey, desiredIndexVersion); await ds.set(indexVersionKey, desiredIndexVersion);
} }

View File

@ -3,6 +3,7 @@ import { System } from "../system.ts";
import { fullQueueName } from "../lib/mq_util.ts"; import { fullQueueName } from "../lib/mq_util.ts";
import { MQMessage } from "$sb/types.ts"; import { MQMessage } from "$sb/types.ts";
import { MessageQueue } from "../lib/mq.ts"; import { MessageQueue } from "../lib/mq.ts";
import { throttle } from "$sb/lib/async.ts";
type MQSubscription = { type MQSubscription = {
queue: string; queue: string;
@ -24,14 +25,14 @@ export class MQHook implements Hook<MQHookT> {
this.system = system; this.system = system;
system.on({ system.on({
plugLoaded: () => { plugLoaded: () => {
this.reloadQueues(); this.throttledReloadQueues();
}, },
plugUnloaded: () => { plugUnloaded: () => {
this.reloadQueues(); this.throttledReloadQueues();
}, },
}); });
this.reloadQueues(); this.throttledReloadQueues();
} }
stop() { stop() {
@ -40,6 +41,10 @@ export class MQHook implements Hook<MQHookT> {
this.subscriptions = []; this.subscriptions = [];
} }
throttledReloadQueues = throttle(() => {
this.reloadQueues();
}, 1000);
reloadQueues() { reloadQueues() {
this.stop(); this.stop();
for (const plug of this.system.loadedPlugs.values()) { for (const plug of this.system.loadedPlugs.values()) {

View File

@ -32,9 +32,10 @@ export class DataStoreMQ implements MessageQueue {
}; };
}); });
if (messages.length > 0) { if (messages.length === 0) {
await this.ds.batchSet(messages); return;
} }
await this.ds.batchSet(messages);
// See if we can immediately process the message with a local subscription // See if we can immediately process the message with a local subscription
const localSubscriptions = this.localSubscriptions.get(queue); const localSubscriptions = this.localSubscriptions.get(queue);
@ -50,6 +51,8 @@ export class DataStoreMQ implements MessageQueue {
} }
async poll(queue: string, maxItems: number): Promise<MQMessage[]> { async poll(queue: string, maxItems: number): Promise<MQMessage[]> {
// console.log("Polling", queue, maxItems);
// console.trace();
// Note: this is not happening in a transactional way, so we may get duplicate message delivery // Note: this is not happening in a transactional way, so we may get duplicate message delivery
// Retrieve a batch of messages // Retrieve a batch of messages
const messages = await this.ds.query<MQMessage>({ const messages = await this.ds.query<MQMessage>({

View File

@ -1,5 +1,5 @@
import { KV, KvKey, KvQuery } from "$sb/types.ts"; import { KV, KvKey, KvQuery } from "$sb/types.ts";
import type { DataStore } from "../lib/datastore.ts"; import type { DataStore, IDataStore } from "../lib/datastore.ts";
import type { SyscallContext, SysCallMapping } from "../system.ts"; import type { SyscallContext, SysCallMapping } from "../system.ts";
/** /**
@ -8,7 +8,7 @@ import type { SyscallContext, SysCallMapping } from "../system.ts";
* @param prefix prefix to scope all keys to to which the plug name will be appended * @param prefix prefix to scope all keys to to which the plug name will be appended
*/ */
export function dataStoreSyscalls( export function dataStoreSyscalls(
ds: DataStore, ds: IDataStore,
prefix: KvKey = ["ds"], prefix: KvKey = ["ds"],
): SysCallMapping { ): SysCallMapping {
return { return {

View File

@ -1,20 +0,0 @@
import { SyscallContext, SysCallMapping } from "../system.ts";
export function proxySyscalls(
names: string[],
transportCall: (
ctx: SyscallContext,
name: string,
...args: any[]
) => Promise<any>,
): SysCallMapping {
const syscalls: SysCallMapping = {};
for (const name of names) {
syscalls[name] = (ctx, ...args: any[]) => {
return transportCall(ctx, name, ...args);
};
}
return syscalls;
}

View File

@ -64,7 +64,7 @@ export async function clearIndex(): Promise<void> {
/** /**
* Indexes entities in the data store * Indexes entities in the data store
*/ */
export async function indexObjects<T>( export function indexObjects<T>(
page: string, page: string,
objects: ObjectValue<T>[], objects: ObjectValue<T>[],
): Promise<void> { ): Promise<void> {
@ -127,14 +127,12 @@ export async function indexObjects<T>(
} }
} }
if (allAttributes.size > 0) { if (allAttributes.size > 0) {
await indexObjects<AttributeObject>( [...allAttributes].forEach(([key, value]) => {
page,
[...allAttributes].map(([key, value]) => {
const [tagName, name] = key.split(":"); const [tagName, name] = key.split(":");
const attributeType = value.startsWith("!") const attributeType = value.startsWith("!") ? value.substring(1) : value;
? value.substring(1) kvs.push({
: value; key: ["attribute", cleanKey(key, page)],
return { value: {
ref: key, ref: key,
tag: "attribute", tag: "attribute",
tagName, tagName,
@ -142,12 +140,14 @@ export async function indexObjects<T>(
attributeType, attributeType,
readOnly: value.startsWith("!"), readOnly: value.startsWith("!"),
page, page,
}; } as T,
}), });
); });
} }
if (kvs.length > 0) { if (kvs.length > 0) {
return batchSet(page, kvs); return batchSet(page, kvs);
} else {
return Promise.resolve();
} }
} }

View File

@ -90,19 +90,17 @@ export const builtins: Record<string, Record<string, string>> = {
export async function loadBuiltinsIntoIndex() { export async function loadBuiltinsIntoIndex() {
console.log("Loading builtins attributes into index"); console.log("Loading builtins attributes into index");
const allTags: ObjectValue<TagObject>[] = []; const allObjects: ObjectValue<any>[] = [];
for (const [tagName, attributes] of Object.entries(builtins)) { for (const [tagName, attributes] of Object.entries(builtins)) {
allTags.push({ allObjects.push({
ref: tagName, ref: tagName,
tag: "tag", tag: "tag",
name: tagName, name: tagName,
page: builtinPseudoPage, page: builtinPseudoPage,
parent: "builtin", parent: "builtin",
}); });
await indexObjects<AttributeObject>( allObjects.push(
builtinPseudoPage, ...Object.entries(attributes).map(([name, attributeType]) => ({
Object.entries(attributes).map(([name, attributeType]) => {
return {
ref: `${tagName}:${name}`, ref: `${tagName}:${name}`,
tag: "attribute", tag: "attribute",
tagName, tagName,
@ -112,9 +110,8 @@ export async function loadBuiltinsIntoIndex() {
: attributeType, : attributeType,
readOnly: attributeType.startsWith("!"), readOnly: attributeType.startsWith("!"),
page: builtinPseudoPage, page: builtinPseudoPage,
}; })),
}),
); );
} }
await indexObjects(builtinPseudoPage, allTags); await indexObjects(builtinPseudoPage, allObjects);
} }

View File

@ -8,7 +8,12 @@ import {
} from "./deps.ts"; } from "./deps.ts";
import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; import { AssetBundle } from "../plugos/asset_bundle/bundle.ts";
import { FileMeta } from "$sb/types.ts"; import { FileMeta } from "$sb/types.ts";
import { ShellRequest, SyscallRequest, SyscallResponse } from "./rpc.ts"; import {
handleRpc,
ShellRequest,
SyscallRequest,
SyscallResponse,
} from "./rpc.ts";
import { determineShellBackend } from "./shell_backend.ts"; import { determineShellBackend } from "./shell_backend.ts";
import { SpaceServer, SpaceServerConfig } from "./instance.ts"; import { SpaceServer, SpaceServerConfig } from "./instance.ts";
import { KvPrimitives } from "../plugos/lib/kv_primitives.ts"; import { KvPrimitives } from "../plugos/lib/kv_primitives.ts";
@ -132,13 +137,6 @@ export class HttpServer {
// Serve static files (javascript, css, html) // Serve static files (javascript, css, html)
this.app.use(this.serveStatic.bind(this)); this.app.use(this.serveStatic.bind(this));
const endpointHook = new EndpointHook("/_/");
this.app.use(async (context, next) => {
const spaceServer = await this.ensureSpaceServer(context.request);
return endpointHook.handleRequest(spaceServer.system!, context, next);
});
this.addAuth(this.app); this.addAuth(this.app);
const fsRouter = this.addFsRoutes(); const fsRouter = this.addFsRoutes();
this.app.use(fsRouter.routes()); this.app.use(fsRouter.routes());
@ -395,45 +393,9 @@ export class HttpServer {
limit: 100 * 1024 * 1024, limit: 100 * 1024 * 1024,
}).value; }).value;
try { try {
if (operation === "shell") { const resp = await handleRpc(spaceServer, operation, body);
const shellCommand: ShellRequest = body;
const shellResponse = await spaceServer.shellBackend.handle(
shellCommand,
);
response.headers.set("Content-Type", "application/json"); response.headers.set("Content-Type", "application/json");
response.body = JSON.stringify(shellResponse); response.body = JSON.stringify({ r: resp });
return;
} else {
// Syscall
if (spaceServer.syncOnly) {
response.headers.set("Content-Type", "text/plain");
response.status = 400;
response.body = "Unknown operation";
return;
}
const [plugName, syscall] = operation.split("/");
const args: any[] = body;
try {
// console.log("Now invoking", operation, "with", args);
const result = await spaceServer.system!.localSyscall(
plugName,
syscall,
args,
);
response.headers.set("Content-type", "application/json");
response.status = 200;
response.body = JSON.stringify({
result: result,
} as SyscallResponse);
} catch (e: any) {
response.headers.set("Content-type", "application/json");
response.status = 500;
response.body = JSON.stringify({
error: e.message,
} as SyscallResponse);
}
return;
}
} catch (e: any) { } catch (e: any) {
console.log("Error", e); console.log("Error", e);
response.status = 500; response.status = 500;

View File

@ -1,15 +1,14 @@
import { SilverBulletHooks } from "../common/manifest.ts";
import { AssetBundlePlugSpacePrimitives } from "../common/spaces/asset_bundle_space_primitives.ts"; import { AssetBundlePlugSpacePrimitives } from "../common/spaces/asset_bundle_space_primitives.ts";
import { FilteredSpacePrimitives } from "../common/spaces/filtered_space_primitives.ts"; import { FilteredSpacePrimitives } from "../common/spaces/filtered_space_primitives.ts";
import { SpacePrimitives } from "../common/spaces/space_primitives.ts"; import { SpacePrimitives } from "../common/spaces/space_primitives.ts";
import { ensureSettingsAndIndex } from "../common/util.ts"; import { ensureSettingsAndIndex } from "../common/util.ts";
import { AssetBundle } from "../plugos/asset_bundle/bundle.ts"; import { AssetBundle } from "../plugos/asset_bundle/bundle.ts";
import { DataStore } from "../plugos/lib/datastore.ts";
import { KvPrimitives } from "../plugos/lib/kv_primitives.ts"; import { KvPrimitives } from "../plugos/lib/kv_primitives.ts";
import { System } from "../plugos/system.ts"; import { PrefixedKvPrimitives } from "../plugos/lib/prefixed_kv_primitives.ts";
import { BuiltinSettings } from "../web/types.ts"; import { BuiltinSettings } from "../web/types.ts";
import { JWTIssuer } from "./crypto.ts"; import { JWTIssuer } from "./crypto.ts";
import { gitIgnoreCompiler } from "./deps.ts"; import { gitIgnoreCompiler } from "./deps.ts";
import { ServerSystem } from "./server_system.ts";
import { ShellBackend } from "./shell_backend.ts"; import { ShellBackend } from "./shell_backend.ts";
import { determineStorageBackend } from "./storage_backend.ts"; import { determineStorageBackend } from "./storage_backend.ts";
@ -31,16 +30,14 @@ export class SpaceServer {
authToken?: string; authToken?: string;
hostname: string; hostname: string;
private settings?: BuiltinSettings; // private settings?: BuiltinSettings;
spacePrimitives!: SpacePrimitives; spacePrimitives!: SpacePrimitives;
jwtIssuer: JWTIssuer; jwtIssuer: JWTIssuer;
// Only set when syncOnly == false
private serverSystem?: ServerSystem;
system?: System<SilverBulletHooks>;
clientEncryption: boolean; clientEncryption: boolean;
syncOnly: boolean; syncOnly: boolean;
ds: DataStore;
constructor( constructor(
config: SpaceServerConfig, config: SpaceServerConfig,
@ -60,38 +57,15 @@ export class SpaceServer {
} }
this.jwtIssuer = new JWTIssuer(kvPrimitives); this.jwtIssuer = new JWTIssuer(kvPrimitives);
this.ds = new DataStore(new PrefixedKvPrimitives(kvPrimitives, ["ds"]));
} }
async init() { async init() {
let fileFilterFn: (s: string) => boolean = () => true; this.spacePrimitives = new AssetBundlePlugSpacePrimitives(
this.spacePrimitives = new FilteredSpacePrimitives(
new AssetBundlePlugSpacePrimitives(
await determineStorageBackend(this.kvPrimitives, this.pagesPath), await determineStorageBackend(this.kvPrimitives, this.pagesPath),
this.plugAssetBundle, this.plugAssetBundle,
),
(meta) => fileFilterFn(meta.name),
async () => {
await this.reloadSettings();
if (typeof this.settings?.spaceIgnore === "string") {
fileFilterFn = gitIgnoreCompiler(this.settings.spaceIgnore).accepts;
} else {
fileFilterFn = () => true;
}
},
); );
// system = undefined in databaseless mode (no PlugOS instance on the server and no DB)
if (!this.syncOnly) {
// Enable server-side processing
const serverSystem = new ServerSystem(
this.spacePrimitives,
this.kvPrimitives,
this.shellBackend,
);
this.serverSystem = serverSystem;
}
if (this.auth) { if (this.auth) {
// Initialize JWT issuer // Initialize JWT issuer
await this.jwtIssuer.init( await this.jwtIssuer.init(
@ -99,25 +73,6 @@ export class SpaceServer {
); );
} }
if (this.serverSystem) {
await this.serverSystem.init();
this.system = this.serverSystem.system;
// Swap in the space primitives from the server system
this.spacePrimitives = this.serverSystem.spacePrimitives;
}
await this.reloadSettings();
console.log("Booted server with hostname", this.hostname); console.log("Booted server with hostname", this.hostname);
} }
async reloadSettings() {
if (!this.clientEncryption) {
// Only attempt this when the space is not encrypted
this.settings = await ensureSettingsAndIndex(this.spacePrimitives);
} else {
this.settings = {
indexPage: "index",
};
}
}
} }

View File

@ -1,3 +1,7 @@
import { shell } from "$sb/syscalls.ts";
import { KV, KvKey, KvQuery } from "$sb/types.ts";
import { SpaceServer } from "./instance.ts";
export type ShellRequest = { export type ShellRequest = {
cmd: string; cmd: string;
args: string[]; args: string[];
@ -19,3 +23,37 @@ export type SyscallResponse = {
result?: any; result?: any;
error?: string; error?: string;
}; };
export async function handleRpc(
spaceServer: SpaceServer,
name: string,
body: any,
): Promise<any> {
switch (name) {
case "shell": {
const shellCommand: ShellRequest = body;
const shellResponse = await spaceServer.shellBackend.handle(
shellCommand,
);
return shellResponse;
}
case "datastore.batchGet": {
const [keys]: [KvKey[]] = body;
return spaceServer.ds.batchGet(keys);
}
case "datastore.batchSet": {
const [entries]: [KV[]] = body;
return spaceServer.ds.batchSet(entries);
}
case "datastore.batchDelete": {
const [keys]: [KvKey[]] = body;
return spaceServer.ds.batchDelete(keys);
}
case "datastore.query": {
const [query]: [KvQuery] = body;
return spaceServer.ds.query(query);
}
default:
throw new Error(`Unknown rpc ${name}`);
}
}

View File

@ -1,198 +0,0 @@
import { PlugNamespaceHook } from "../common/hooks/plug_namespace.ts";
import { SilverBulletHooks } from "../common/manifest.ts";
import { loadMarkdownExtensions } from "../common/markdown_parser/markdown_ext.ts";
import buildMarkdown from "../common/markdown_parser/parser.ts";
import { EventedSpacePrimitives } from "../common/spaces/evented_space_primitives.ts";
import { PlugSpacePrimitives } from "../common/spaces/plug_space_primitives.ts";
import { createSandbox } from "../plugos/environments/webworker_sandbox.ts";
import { CronHook } from "../plugos/hooks/cron.ts";
import { EventHook } from "../plugos/hooks/event.ts";
import { MQHook } from "../plugos/hooks/mq.ts";
import assetSyscalls from "../plugos/syscalls/asset.ts";
import { eventSyscalls } from "../plugos/syscalls/event.ts";
import { mqSyscalls } from "../plugos/syscalls/mq.ts";
import { System } from "../plugos/system.ts";
import { Space } from "../web/space.ts";
import { debugSyscalls } from "../web/syscalls/debug.ts";
import { markdownSyscalls } from "../common/syscalls/markdown.ts";
import { spaceSyscalls } from "./syscalls/space.ts";
import { systemSyscalls } from "../web/syscalls/system.ts";
import { yamlSyscalls } from "../common/syscalls/yaml.ts";
import { sandboxFetchSyscalls } from "../plugos/syscalls/fetch.ts";
import { shellSyscalls } from "./syscalls/shell.ts";
import { SpacePrimitives } from "../common/spaces/space_primitives.ts";
import { base64EncodedDataUrl } from "../plugos/asset_bundle/base64.ts";
import { Plug } from "../plugos/plug.ts";
import { DataStore } from "../plugos/lib/datastore.ts";
import { dataStoreSyscalls } from "../plugos/syscalls/datastore.ts";
import { DataStoreMQ } from "../plugos/lib/mq.datastore.ts";
import { languageSyscalls } from "../common/syscalls/language.ts";
import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts";
import { codeWidgetSyscalls } from "../web/syscalls/code_widget.ts";
import { CodeWidgetHook } from "../web/hooks/code_widget.ts";
import { KVPrimitivesManifestCache } from "../plugos/manifest_cache.ts";
import { KvPrimitives } from "../plugos/lib/kv_primitives.ts";
import { ShellBackend } from "./shell_backend.ts";
import { ensureSpaceIndex } from "../common/space_index.ts";
const fileListInterval = 30 * 1000; // 30s
const plugNameExtractRegex = /([^/]+)\.plug\.js$/;
export class ServerSystem {
system!: System<SilverBulletHooks>;
public spacePrimitives!: SpacePrimitives;
// denoKv!: Deno.Kv;
listInterval?: number;
ds!: DataStore;
constructor(
private baseSpacePrimitives: SpacePrimitives,
readonly kvPrimitives: KvPrimitives,
private shellBackend: ShellBackend,
) {
}
// Always needs to be invoked right after construction
async init(awaitIndex = false) {
this.ds = new DataStore(this.kvPrimitives);
this.system = new System(
"server",
{
manifestCache: new KVPrimitivesManifestCache(
this.kvPrimitives,
"manifest",
),
plugFlushTimeout: 5 * 60 * 1000, // 5 minutes
},
);
// Event hook
const eventHook = new EventHook();
this.system.addHook(eventHook);
// Cron hook
const cronHook = new CronHook(this.system);
this.system.addHook(cronHook);
const mq = new DataStoreMQ(this.ds);
setInterval(() => {
// Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ)
mq.requeueTimeouts(5000, 3, true).catch(console.error);
}, 20000); // Look to requeue every 20s
const plugNamespaceHook = new PlugNamespaceHook();
this.system.addHook(plugNamespaceHook);
this.system.addHook(new MQHook(this.system, mq));
const codeWidgetHook = new CodeWidgetHook();
this.system.addHook(codeWidgetHook);
this.spacePrimitives = new EventedSpacePrimitives(
new PlugSpacePrimitives(
this.baseSpacePrimitives,
plugNamespaceHook,
),
eventHook,
);
const space = new Space(this.spacePrimitives, eventHook);
// Add syscalls
this.system.registerSyscalls(
[],
eventSyscalls(eventHook),
spaceSyscalls(space),
assetSyscalls(this.system),
yamlSyscalls(),
systemSyscalls(this.system),
mqSyscalls(mq),
languageSyscalls(),
handlebarsSyscalls(),
dataStoreSyscalls(this.ds),
debugSyscalls(),
codeWidgetSyscalls(codeWidgetHook),
markdownSyscalls(buildMarkdown([])), // Will later be replaced with markdown extensions
);
// Syscalls that require some additional permissions
this.system.registerSyscalls(
["fetch"],
sandboxFetchSyscalls(),
);
this.system.registerSyscalls(
["shell"],
shellSyscalls(this.shellBackend),
);
await this.loadPlugs();
// Load markdown syscalls based on all new syntax (if any)
this.system.registerSyscalls(
[],
markdownSyscalls(buildMarkdown(loadMarkdownExtensions(this.system))),
);
this.listInterval = setInterval(() => {
space.updatePageList().catch(console.error);
}, fileListInterval);
eventHook.addLocalListener("file:changed", (path, localChange) => {
(async () => {
if (!localChange && path.endsWith(".md")) {
const pageName = path.slice(0, -3);
const data = await this.spacePrimitives.readFile(path);
console.log("Outside page change: reindexing", pageName);
// Change made outside of editor, trigger reindex
await eventHook.dispatchEvent("page:index_text", {
name: pageName,
text: new TextDecoder().decode(data.data),
});
}
if (path.startsWith("_plug/") && path.endsWith(".plug.js")) {
console.log("Plug updated, reloading:", path);
this.system.unload(path);
await this.loadPlugFromSpace(path);
}
})().catch(console.error);
});
// Ensure a valid index
const indexPromise = ensureSpaceIndex(this.ds, this.system);
if (awaitIndex) {
await indexPromise;
}
await eventHook.dispatchEvent("system:ready");
}
async loadPlugs() {
for (const { name } of await this.spacePrimitives.fetchFileList()) {
if (plugNameExtractRegex.test(name)) {
await this.loadPlugFromSpace(name);
}
}
}
async loadPlugFromSpace(path: string): Promise<Plug<SilverBulletHooks>> {
const { meta, data } = await this.spacePrimitives.readFile(path);
const plugName = path.match(plugNameExtractRegex)![1];
return this.system.load(
// Base64 encoding this to support `deno compile` mode
new URL(base64EncodedDataUrl("application/javascript", data)),
plugName,
meta.lastModified,
createSandbox,
);
}
async close() {
clearInterval(this.listInterval);
await this.system.unloadAll();
}
}

View File

@ -7,7 +7,6 @@ import { upgradeCommand } from "./cmd/upgrade.ts";
import { versionCommand } from "./cmd/version.ts"; import { versionCommand } from "./cmd/version.ts";
import { serveCommand } from "./cmd/server.ts"; import { serveCommand } from "./cmd/server.ts";
import { plugCompileCommand } from "./cmd/plug_compile.ts"; import { plugCompileCommand } from "./cmd/plug_compile.ts";
import { plugRunCommand } from "./cmd/plug_run.ts";
import { syncCommand } from "./cmd/sync.ts"; import { syncCommand } from "./cmd/sync.ts";
await new Command() await new Command()
@ -72,15 +71,6 @@ await new Command()
.option("--importmap <path:string>", "Path to import map file to use") .option("--importmap <path:string>", "Path to import map file to use")
.option("--runtimeUrl <url:string>", "URL to worker_runtime.ts to use") .option("--runtimeUrl <url:string>", "URL to worker_runtime.ts to use")
.action(plugCompileCommand) .action(plugCompileCommand)
// plug:run
.command("plug:run", "Run a PlugOS function from the CLI")
.arguments("<spacePath> [function] [...args:string]")
.option(
"--hostname, -L <hostname:string>",
"Hostname or address to listen on",
)
.option("-p, --port <port:number>", "Port to listen on")
.action(plugRunCommand)
// upgrade // upgrade
.command("upgrade", "Upgrade SilverBullet") .command("upgrade", "Upgrade SilverBullet")
.action(upgradeCommand) .action(upgradeCommand)

View File

@ -109,12 +109,12 @@ export class Client {
ui!: MainUI; ui!: MainUI;
openPages!: OpenPages; openPages!: OpenPages;
stateDataStore!: DataStore;
mq!: DataStoreMQ;
// Used by the "wiki link" highlighter to check if a page exists // Used by the "wiki link" highlighter to check if a page exists
public allKnownPages = new Set<string>(); public allKnownPages = new Set<string>();
remoteDataStore!: IDataStore; clientDS!: DataStore;
mq!: DataStoreMQ;
ds!: IDataStore;
constructor( constructor(
private parent: Element, private parent: Element,
@ -140,15 +140,15 @@ export class Client {
`${this.dbPrefix}_state`, `${this.dbPrefix}_state`,
); );
await stateKvPrimitives.init(); await stateKvPrimitives.init();
this.stateDataStore = new DataStore(stateKvPrimitives); this.clientDS = new DataStore(stateKvPrimitives);
// Only used in online mode // In sync mode, reuse the clientDS, otherwise talk to a remote data store (over HTTP)
this.remoteDataStore = new RemoteDataStore(this.httpSpacePrimitives); this.ds = this.syncMode
? this.clientDS
: new RemoteDataStore(this.httpSpacePrimitives);
// Setup message queue // Setup message queue
this.mq = new DataStoreMQ( this.mq = new DataStoreMQ(this.ds);
this.syncMode ? this.stateDataStore : this.remoteDataStore,
);
setInterval(() => { setInterval(() => {
// Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ) // Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ)
@ -162,7 +162,8 @@ export class Client {
this.system = new ClientSystem( this.system = new ClientSystem(
this, this,
this.mq, this.mq,
this.stateDataStore, this.clientDS,
this.ds,
this.eventHook, this.eventHook,
); );
@ -172,7 +173,7 @@ export class Client {
? new SyncService( ? new SyncService(
localSpacePrimitives, localSpacePrimitives,
this.plugSpaceRemotePrimitives, this.plugSpaceRemotePrimitives,
this.stateDataStore, this.clientDS,
this.eventHook, this.eventHook,
(path) => { (path) => {
// TODO: At some point we should remove the data.db exception here // TODO: At some point we should remove the data.db exception here
@ -252,6 +253,12 @@ export class Client {
private async initSync() { private async initSync() {
this.syncService.start(); this.syncService.start();
if (!this.syncMode) {
ensureSpaceIndex(this.ds, this.system.system).catch(
console.error,
);
}
// We're still booting, if a initial sync has already been completed we know this is the initial sync // We're still booting, if a initial sync has already been completed we know this is the initial sync
const initialSync = !await this.syncService.hasInitialSyncCompleted(); const initialSync = !await this.syncService.hasInitialSyncCompleted();
@ -270,12 +277,12 @@ export class Client {
// A full sync just completed // A full sync just completed
if (!initialSync) { if (!initialSync) {
// If this was NOT the initial sync let's check if we need to perform a space reindex // If this was NOT the initial sync let's check if we need to perform a space reindex
ensureSpaceIndex(this.stateDataStore, this.system.system).catch( ensureSpaceIndex(this.ds, this.system.system).catch(
console.error, console.error,
); );
} else { } else {
// This was the initial sync, let's mark a full index as completed // This was the initial sync, let's mark a full index as completed
await markFullSpaceIndexComplete(this.stateDataStore); await markFullSpaceIndexComplete(this.ds);
} }
} }
if (operations) { if (operations) {
@ -368,14 +375,14 @@ export class Client {
scrollIntoView: true, scrollIntoView: true,
}); });
} }
await this.stateDataStore.set(["client", "lastOpenedPage"], pageName); await this.clientDS.set(["client", "lastOpenedPage"], pageName);
}, },
); );
if (location.hash === "#boot") { if (location.hash === "#boot") {
(async () => { (async () => {
// Cold start PWA load // Cold start PWA load
const lastPage = await this.stateDataStore.get([ const lastPage = await this.clientDS.get([
"client", "client",
"lastOpenedPage", "lastOpenedPage",
]); ]);
@ -420,7 +427,7 @@ export class Client {
} }
await encryptedSpacePrimitives.setup(password); await encryptedSpacePrimitives.setup(password);
// this.stateDataStore.set(["encryptionKey"], password); // this.stateDataStore.set(["encryptionKey"], password);
await this.stateDataStore.set( await this.ds.set(
["spaceSalt"], ["spaceSalt"],
encryptedSpacePrimitives.spaceSalt, encryptedSpacePrimitives.spaceSalt,
); );
@ -432,7 +439,7 @@ export class Client {
"Offline, will assume encryption space is initialized, fetching salt from data store", "Offline, will assume encryption space is initialized, fetching salt from data store",
); );
await encryptedSpacePrimitives.init( await encryptedSpacePrimitives.init(
await this.stateDataStore.get(["spaceSalt"]), await this.ds.get(["spaceSalt"]),
); );
} }
} }
@ -442,7 +449,7 @@ export class Client {
await encryptedSpacePrimitives.login( await encryptedSpacePrimitives.login(
prompt("Password")!, prompt("Password")!,
); );
await this.stateDataStore.set( await this.ds.set(
["spaceSalt"], ["spaceSalt"],
encryptedSpacePrimitives.spaceSalt, encryptedSpacePrimitives.spaceSalt,
); );
@ -1042,7 +1049,7 @@ export class Client {
async loadCaches() { async loadCaches() {
const [widgetHeightCache, widgetCache] = await this const [widgetHeightCache, widgetCache] = await this
.stateDataStore.batchGet([[ .clientDS.batchGet([[
"cache", "cache",
"widgetHeight", "widgetHeight",
], ["cache", "widgets"]]); ], ["cache", "widgets"]]);
@ -1051,7 +1058,7 @@ export class Client {
} }
debouncedWidgetHeightCacheFlush = throttle(() => { debouncedWidgetHeightCacheFlush = throttle(() => {
this.stateDataStore.set( this.clientDS.set(
["cache", "widgetHeight"], ["cache", "widgetHeight"],
this.widgetHeightCache.toJSON(), this.widgetHeightCache.toJSON(),
) )
@ -1070,7 +1077,7 @@ export class Client {
} }
debouncedWidgetCacheFlush = throttle(() => { debouncedWidgetCacheFlush = throttle(() => {
this.stateDataStore.set(["cache", "widgets"], this.widgetCache.toJSON()) this.clientDS.set(["cache", "widgets"], this.widgetCache.toJSON())
.catch( .catch(
console.error, console.error,
); );

View File

@ -29,9 +29,8 @@ import {
} from "../common/markdown_parser/markdown_ext.ts"; } from "../common/markdown_parser/markdown_ext.ts";
import { MQHook } from "../plugos/hooks/mq.ts"; import { MQHook } from "../plugos/hooks/mq.ts";
import { mqSyscalls } from "../plugos/syscalls/mq.ts"; import { mqSyscalls } from "../plugos/syscalls/mq.ts";
import { dataStoreProxySyscalls } from "./syscalls/datastore.proxy.ts";
import { dataStoreSyscalls } from "../plugos/syscalls/datastore.ts"; import { dataStoreSyscalls } from "../plugos/syscalls/datastore.ts";
import { DataStore } from "../plugos/lib/datastore.ts"; import { DataStore, IDataStore } from "../plugos/lib/datastore.ts";
import { MessageQueue } from "../plugos/lib/mq.ts"; import { MessageQueue } from "../plugos/lib/mq.ts";
import { languageSyscalls } from "../common/syscalls/language.ts"; import { languageSyscalls } from "../common/syscalls/language.ts";
import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts"; import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts";
@ -56,7 +55,8 @@ export class ClientSystem {
constructor( constructor(
private client: Client, private client: Client,
private mq: MessageQueue, private mq: MessageQueue,
private ds: DataStore, private clientDs: DataStore,
private dataStore: IDataStore,
private eventHook: EventHook, private eventHook: EventHook,
) { ) {
// Only set environment to "client" when running in thin client mode, otherwise we run everything locally (hybrid) // Only set environment to "client" when running in thin client mode, otherwise we run everything locally (hybrid)
@ -65,7 +65,7 @@ export class ClientSystem {
undefined, undefined,
{ {
manifestCache: new KVPrimitivesManifestCache<SilverBulletHooks>( manifestCache: new KVPrimitivesManifestCache<SilverBulletHooks>(
ds.kv, clientDs.kv,
"manifest", "manifest",
), ),
}, },
@ -165,12 +165,10 @@ export class ClientSystem {
clientCodeWidgetSyscalls(), clientCodeWidgetSyscalls(),
languageSyscalls(), languageSyscalls(),
mqSyscalls(this.mq), mqSyscalls(this.mq),
this.client.syncMode dataStoreSyscalls(this.dataStore),
? dataStoreSyscalls(this.ds)
: dataStoreProxySyscalls(this.client),
debugSyscalls(), debugSyscalls(),
syncSyscalls(this.client), syncSyscalls(this.client),
clientStoreSyscalls(this.ds), clientStoreSyscalls(this.clientDs),
); );
// Syscalls that require some additional permissions // Syscalls that require some additional permissions

View File

@ -1,7 +1,7 @@
import { HttpSpacePrimitives } from "../common/spaces/http_space_primitives.ts"; import { HttpSpacePrimitives } from "../common/spaces/http_space_primitives.ts";
import { KV, KvKey, KvQuery } from "$sb/types.ts"; import { KV, KvKey, KvQuery } from "$sb/types.ts";
import { proxySyscall } from "./syscalls/util.ts";
import { IDataStore } from "../plugos/lib/datastore.ts"; import { IDataStore } from "../plugos/lib/datastore.ts";
import { rpcCall } from "./syscalls/datastore.proxy.ts";
// implements DataStore "interface" // implements DataStore "interface"
export class RemoteDataStore implements IDataStore { export class RemoteDataStore implements IDataStore {
@ -10,45 +10,46 @@ export class RemoteDataStore implements IDataStore {
private proxy( private proxy(
name: string, name: string,
args: any[], ...args: any[]
) { ) {
return proxySyscall( // console.trace();
{ plug: { name: "index" } } as any, return rpcCall(
this.httpPrimitives, this.httpPrimitives,
name, name,
args, ...args,
); );
} }
get<T = any>(key: KvKey): Promise<T | null> { async get<T = any>(key: KvKey): Promise<T | null> {
return this.proxy("datastore.get", [key]); const results = await this.batchGet([key]);
return results[0];
} }
batchGet<T = any>(keys: KvKey[]): Promise<(T | null)[]> { batchGet<T = any>(keys: KvKey[]): Promise<(T | null)[]> {
return this.proxy("datastore.batchGet", [keys]); return this.proxy("datastore.batchGet", keys);
} }
set(key: KvKey, value: any): Promise<void> { set(key: KvKey, value: any): Promise<void> {
return this.proxy("datastore.set", [key, value]); return this.batchSet([{ key, value }]);
} }
batchSet<T = any>(entries: KV<T>[]): Promise<void> { batchSet<T = any>(entries: KV<T>[]): Promise<void> {
return this.proxy("datastore.batchSet", [entries]); return this.proxy("datastore.batchSet", entries);
} }
delete(key: KvKey): Promise<void> { delete(key: KvKey): Promise<void> {
return this.proxy("datastore.delete", [key]); return this.batchDelete([key]);
} }
batchDelete(keys: KvKey[]): Promise<void> { batchDelete(keys: KvKey[]): Promise<void> {
return this.proxy("datastore.batchDelete", [keys]); return this.proxy("datastore.batchDelete", keys);
} }
query<T = any>(query: KvQuery): Promise<KV<T>[]> { query<T = any>(query: KvQuery): Promise<KV<T>[]> {
return this.proxy("datastore.query", [query]); return this.proxy("datastore.query", query);
} }
queryDelete(query: KvQuery): Promise<void> { queryDelete(query: KvQuery): Promise<void> {
return this.proxy("datastore.queryDelete", [query]); return this.proxy("datastore.queryDelete", query);
} }
} }

View File

@ -1,15 +1,90 @@
import type { SysCallMapping } from "../../plugos/system.ts"; import { HttpSpacePrimitives } from "../../common/spaces/http_space_primitives.ts";
import type { Client } from "../client.ts"; import { KV, KvKey, KvQuery } from "$sb/types.ts";
import { proxySyscalls } from "./util.ts"; import type { SyscallContext, SysCallMapping } from "../../plugos/system.ts";
export function dataStoreProxySyscalls(client: Client): SysCallMapping { export function dataStoreProxySyscalls(
return proxySyscalls(client, [ httpSpacePrimitives: HttpSpacePrimitives,
"datastore.delete", ): SysCallMapping {
"datastore.set", return {
"datastore.batchSet", "datastore.delete": (ctx, key: KvKey) => {
"datastore.batchDelete", return rpcCall(httpSpacePrimitives, "datastore.batchDelete", [
"datastore.batchGet", addPrefix(ctx, key),
"datastore.get",
"datastore.query",
]); ]);
},
"datastore.batchDelete": (ctx, keys: KvKey[]) => {
return rpcCall(
httpSpacePrimitives,
"datastore.batchDelete",
keys.map((key) => addPrefix(ctx, key)),
);
},
"datastore.set": (ctx, key: KvKey, value: any) => {
return rpcCall(httpSpacePrimitives, "datastore.batchSet", [
{ key: addPrefix(ctx, key), value: value },
]);
},
"datastore.batchSet": (ctx, entries: KV[]) => {
return rpcCall(
httpSpacePrimitives,
"datastore.batchSet",
entries.map(({ key, value }) => ({ key: addPrefix(ctx, key), value })),
);
},
"datastore.get": async (ctx, key: KvKey) => {
const [result] = await rpcCall(
httpSpacePrimitives,
"datastore.batchGet",
[
addPrefix(ctx, key),
],
);
return result;
},
"datastore.batchGet": (ctx, keys: KvKey[]) => {
return rpcCall(
httpSpacePrimitives,
"datastore.batchGet",
keys.map((key) => addPrefix(ctx, key)),
);
},
"datastore.query": async (ctx, query: KvQuery) => {
const results: KV[] = await rpcCall(
httpSpacePrimitives,
"datastore.query",
{ ...query, prefix: addPrefix(ctx, query.prefix || []) },
);
return results.map(({ key, value }) => ({
key: stripPrefix(key),
value,
}));
},
};
function addPrefix(ctx: SyscallContext, key: KvKey): KvKey {
return [ctx.plug.name, ...key];
}
function stripPrefix(key: KvKey): KvKey {
return key.slice(1);
}
}
export async function rpcCall(
httpSpacePrimitives: HttpSpacePrimitives,
name: string,
...args: any[]
): Promise<any> {
const resp = await httpSpacePrimitives.authenticatedFetch(
`${httpSpacePrimitives.url}/.rpc/${name}`,
{
method: "POST",
body: JSON.stringify(args),
},
);
if (!resp.ok) {
const errorText = await resp.text();
console.error("Remote rpc error", errorText);
throw new Error(errorText);
}
return (await resp.json()).r;
} }

View File

@ -2,7 +2,6 @@ import type { Plug } from "../../plugos/plug.ts";
import { SysCallMapping, System } from "../../plugos/system.ts"; import { SysCallMapping, System } from "../../plugos/system.ts";
import type { Client } from "../client.ts"; import type { Client } from "../client.ts";
import { CommandDef } from "../hooks/command.ts"; import { CommandDef } from "../hooks/command.ts";
import { proxySyscall } from "./util.ts";
export function systemSyscalls( export function systemSyscalls(
system: System<any>, system: System<any>,

View File

@ -1,36 +0,0 @@
import { HttpSpacePrimitives } from "../../common/spaces/http_space_primitives.ts";
import { SyscallContext, SysCallMapping } from "../../plugos/system.ts";
import { SyscallResponse } from "../../server/rpc.ts";
import { Client } from "../client.ts";
export function proxySyscalls(client: Client, names: string[]): SysCallMapping {
const syscalls: SysCallMapping = {};
for (const name of names) {
syscalls[name] = (ctx, ...args: any[]) => {
return proxySyscall(ctx, client.httpSpacePrimitives, name, args);
};
}
return syscalls;
}
export async function proxySyscall(
ctx: SyscallContext,
httpSpacePrimitives: HttpSpacePrimitives,
name: string,
args: any[],
): Promise<any> {
const resp = await httpSpacePrimitives.authenticatedFetch(
`${httpSpacePrimitives.url}/.rpc/${ctx.plug.name}/${name}`,
{
method: "POST",
body: JSON.stringify(args),
},
);
const result: SyscallResponse = await resp.json();
if (result.error) {
console.error("Remote syscall error", result.error);
throw new Error(result.error);
} else {
return result.result;
}
}