From 97a84e85388ef856cc08b7a2ee136071a334a877 Mon Sep 17 00:00:00 2001 From: Zef Hemel Date: Thu, 10 Aug 2023 18:32:41 +0200 Subject: [PATCH] Fixes #115: By introducing MQ workers --- common/manifest.ts | 2 + .../spaces/indexeddb_space_primitives.test.ts | 2 +- import_map.json | 2 +- plug-api/mq.ts | 6 + plug-api/plugos-syscall/mod.ts | 1 + plug-api/plugos-syscall/mq.ts | 17 ++ plugos/hooks/mq.ts | 107 +++++++ plugos/lib/mq.dexie.test.ts | 53 ++++ plugos/lib/mq.dexie.ts | 275 ++++++++++++++++++ plugos/lib/mq_util.ts | 7 + plugos/syscalls/mq.dexie.ts | 22 ++ plugs/core/core.plug.yaml | 6 + plugs/core/page.ts | 31 +- web/client.ts | 10 + web/client_system.ts | 8 + website/SilverBullet.md | 2 +- 16 files changed, 544 insertions(+), 7 deletions(-) create mode 100644 plug-api/mq.ts create mode 100644 plug-api/plugos-syscall/mq.ts create mode 100644 plugos/hooks/mq.ts create mode 100644 plugos/lib/mq.dexie.test.ts create mode 100644 plugos/lib/mq.dexie.ts create mode 100644 plugos/lib/mq_util.ts create mode 100644 plugos/syscalls/mq.dexie.ts diff --git a/common/manifest.ts b/common/manifest.ts index 3aadcb2e..3dbb0c81 100644 --- a/common/manifest.ts +++ b/common/manifest.ts @@ -5,11 +5,13 @@ import { CommandHookT } from "../web/hooks/command.ts"; import { SlashCommandHookT } from "../web/hooks/slash_command.ts"; import { PlugNamespaceHookT } from "./hooks/plug_namespace.ts"; import { CodeWidgetT } from "../web/hooks/code_widget.ts"; +import { MQHookT } from "../plugos/hooks/mq.ts"; export type SilverBulletHooks = & CommandHookT & SlashCommandHookT & CronHookT + & MQHookT & EventHookT & CodeWidgetT & PlugNamespaceHookT; diff --git a/common/spaces/indexeddb_space_primitives.test.ts b/common/spaces/indexeddb_space_primitives.test.ts index 35d9aade..b09e4a4b 100644 --- a/common/spaces/indexeddb_space_primitives.test.ts +++ b/common/spaces/indexeddb_space_primitives.test.ts @@ -1,4 +1,4 @@ -import { indexedDB } from "https://deno.land/x/indexeddb@v1.1.0/ponyfill_memory.ts"; +import { indexedDB } from "https://deno.land/x/indexeddb@1.3.5/ponyfill_memory.ts"; import { IndexedDBSpacePrimitives } from "./indexeddb_space_primitives.ts"; import { assertEquals } from "../../test_deps.ts"; diff --git a/import_map.json b/import_map.json index 696c2d1e..c1ecd6ae 100644 --- a/import_map.json +++ b/import_map.json @@ -20,6 +20,6 @@ "preact": "https://esm.sh/preact@10.11.1", "$sb/": "./plug-api/", "handlebars": "https://esm.sh/handlebars@4.7.7?target=es2022", - "dexie": "https://esm.sh/dexie@3.2.2" + "dexie": "https://esm.sh/dexie@3.2.2?target=es2022" } } diff --git a/plug-api/mq.ts b/plug-api/mq.ts new file mode 100644 index 00000000..ae35b7d6 --- /dev/null +++ b/plug-api/mq.ts @@ -0,0 +1,6 @@ +export type Message = { + id: string; + queue: string; + body: any; + retries?: number; +}; diff --git a/plug-api/plugos-syscall/mod.ts b/plug-api/plugos-syscall/mod.ts index 3a8ffd21..afd52523 100644 --- a/plug-api/plugos-syscall/mod.ts +++ b/plug-api/plugos-syscall/mod.ts @@ -3,4 +3,5 @@ export * as events from "./event.ts"; export * as shell from "./shell.ts"; export * as store from "./store.ts"; export * as YAML from "./yaml.ts"; +export * as mq from "./mq.ts"; export * from "./syscall.ts"; diff --git a/plug-api/plugos-syscall/mq.ts b/plug-api/plugos-syscall/mq.ts new file mode 100644 index 00000000..f4d7544f --- /dev/null +++ b/plug-api/plugos-syscall/mq.ts @@ -0,0 +1,17 @@ +import { syscall } from "$sb/plugos-syscall/syscall.ts"; + +export function send(queue: string, body: any) { + return syscall("mq.send", queue, body); +} + +export function batchSend(queue: string, bodies: any[]) { + return syscall("mq.batchSend", queue, bodies); +} + +export function ack(queue: string, id: string) { + return syscall("mq.ack", queue, id); +} + +export function batchAck(queue: string, ids: string[]) { + return syscall("mq.batchAck", queue, ids); +} diff --git a/plugos/hooks/mq.ts b/plugos/hooks/mq.ts new file mode 100644 index 00000000..700fcd23 --- /dev/null +++ b/plugos/hooks/mq.ts @@ -0,0 +1,107 @@ +import { Hook, Manifest } from "../types.ts"; +import { System } from "../system.ts"; +import { DexieMQ } from "../lib/mq.dexie.ts"; +import { fullQueueName } from "../lib/mq_util.ts"; +import { Message } from "$sb/mq.ts"; + +type MQSubscription = { + queue: string; + batchSize?: number; + autoAck?: boolean; +}; + +export type MQHookT = { + mqSubscriptions?: MQSubscription[]; +}; + +export class MQHook implements Hook { + subscriptions: (() => void)[] = []; + + constructor(private system: System, readonly mq: DexieMQ) { + } + + apply(system: System): void { + this.system = system; + system.on({ + plugLoaded: () => { + this.reloadQueues(); + }, + plugUnloaded: () => { + this.reloadQueues(); + }, + }); + + this.reloadQueues(); + } + + stop() { + // console.log("Unsubscribing from all queues"); + this.subscriptions.forEach((sub) => sub()); + this.subscriptions = []; + } + + reloadQueues() { + this.stop(); + for (const plug of this.system.loadedPlugs.values()) { + if (!plug.manifest) { + continue; + } + for ( + const [name, functionDef] of Object.entries( + plug.manifest.functions, + ) + ) { + if (!functionDef.mqSubscriptions) { + continue; + } + const subscriptions = functionDef.mqSubscriptions; + for (const subscriptionDef of subscriptions) { + const queue = fullQueueName(plug.name!, subscriptionDef.queue); + // console.log("Subscribing to queue", queue); + this.subscriptions.push( + this.mq.subscribe( + queue, + { + batchSize: subscriptionDef.batchSize, + }, + async (messages: Message[]) => { + try { + await plug.invoke(name, [messages]); + if (subscriptionDef.autoAck) { + await this.mq.batchAck(queue, messages.map((m) => m.id)); + } + } catch (e: any) { + console.error( + "Execution of mqSubscription for queue", + queue, + "invoking", + name, + "with messages", + messages, + "failed:", + e, + ); + } + }, + ), + ); + } + } + } + } + + validateManifest(manifest: Manifest): string[] { + const errors: string[] = []; + for (const functionDef of Object.values(manifest.functions)) { + if (!functionDef.mqSubscriptions) { + continue; + } + for (const subscriptionDef of functionDef.mqSubscriptions) { + if (!subscriptionDef.queue) { + errors.push("Missing queue name for mqSubscription"); + } + } + } + return errors; + } +} diff --git a/plugos/lib/mq.dexie.test.ts b/plugos/lib/mq.dexie.test.ts new file mode 100644 index 00000000..fd852a20 --- /dev/null +++ b/plugos/lib/mq.dexie.test.ts @@ -0,0 +1,53 @@ +import { IDBKeyRange, indexedDB } from "https://esm.sh/fake-indexeddb@4.0.2"; +import { DexieMQ } from "./mq.dexie.ts"; +import { assertEquals } from "../../test_deps.ts"; +import { sleep } from "../../common/async_util.ts"; + +Deno.test("Dexie MQ", async () => { + const mq = new DexieMQ("test", indexedDB, IDBKeyRange); + await mq.send("test", "Hello World"); + let messages = await mq.poll("test", 10); + assertEquals(messages.length, 1); + await mq.ack("test", messages[0].id); + assertEquals([], await mq.poll("test", 10)); + await mq.send("test", "Hello World"); + messages = await mq.poll("test", 10); + assertEquals(messages.length, 1); + assertEquals([], await mq.poll("test", 10)); + await sleep(20); + await mq.requeueTimeouts(10); + messages = await mq.poll("test", 10); + const stats = await mq.getAllQueueStats(); + assertEquals(stats["test"].processing, 1); + assertEquals(messages.length, 1); + assertEquals(messages[0].retries, 1); + await sleep(20); + await mq.requeueTimeouts(10, 1); + assertEquals((await mq.fetchDLQMessages()).length, 1); + + let receivedMessage = false; + const unsubscribe = mq.subscribe("test123", {}, async (messages) => { + assertEquals(messages.length, 1); + await mq.ack("test123", messages[0].id); + receivedMessage = true; + }); + mq.send("test123", "Hello World"); + // Give time to process the message + await sleep(1); + assertEquals(receivedMessage, true); + unsubscribe(); + + // Batch send + await mq.batchSend("test", ["Hello", "World"]); + const messageBatch1 = await mq.poll("test", 1); + assertEquals(messageBatch1.length, 1); + assertEquals(messageBatch1[0].body, "Hello"); + const messageBatch2 = await mq.poll("test", 1); + assertEquals(messageBatch2.length, 1); + assertEquals(messageBatch2[0].body, "World"); + + await mq.batchAck("test", [messageBatch1[0].id, messageBatch2[0].id]); + assertEquals(await mq.fetchProcessingMessages(), []); + // Give time to close the db + await sleep(20); +}); diff --git a/plugos/lib/mq.dexie.ts b/plugos/lib/mq.dexie.ts new file mode 100644 index 00000000..1334f6b7 --- /dev/null +++ b/plugos/lib/mq.dexie.ts @@ -0,0 +1,275 @@ +import Dexie, { Table } from "dexie"; +import { Message } from "$sb/mq.ts"; + +export type ProcessingMessage = Message & { + ts: number; +}; + +export type SubscribeOptions = { + batchSize?: number; + pollInterval?: number; +}; + +export type QueueStats = { + queued: number; + processing: number; + dlq: number; +}; + +export class DexieMQ { + db: Dexie; + queued: Table; + processing: Table; + dlq: Table; + + // queue -> set of run() functions + localSubscriptions = new Map void>>(); + + constructor( + dbName: string, + indexedDB?: any, + IDBKeyRange?: any, + ) { + this.db = new Dexie(dbName, { + indexedDB, + IDBKeyRange, + }); + this.db.version(1).stores({ + queued: "[queue+id], queue, id", + processing: "[queue+id], queue, id, ts", + dlq: "[queue+id], queue, id", + }); + this.queued = this.db.table("queued"); + this.processing = this.db.table("processing"); + this.dlq = this.db.table("dlq"); + } + + // Internal sequencer for messages, only really necessary when batch sending tons of messages within a millisecond + seq = 0; + + async batchSend(queue: string, bodies: any[]) { + const messages = bodies.map((body) => ({ + id: `${Date.now()}-${String(++this.seq).padStart(6, "0")}`, + queue, + body, + })); + + await this.queued.bulkAdd(messages); + + // See if we can immediately process the message with a local subscription + const localSubscriptions = this.localSubscriptions.get(queue); + if (localSubscriptions) { + for (const run of localSubscriptions) { + run(); + } + } + } + + send(queue: string, body: any) { + return this.batchSend(queue, [body]); + } + + poll(queue: string, maxItems: number): Promise { + return this.db.transaction( + "rw", + [this.queued, this.processing], + async (tx) => { + const messages = + (await tx.table("queued").where({ queue }) + .sortBy("id")).slice(0, maxItems); + const ids: [string, string][] = messages.map((m) => [queue, m.id]); + await tx.table("queued").bulkDelete(ids); + await tx.table("processing") + .bulkPut( + messages.map((m) => ({ + ...m, + ts: Date.now(), + })), + ); + return messages; + }, + ); + } + + /** + * @param queue + * @param batchSize + * @param callback + * @returns a function to be called to unsubscribe + */ + subscribe( + queue: string, + options: SubscribeOptions, + callback: (messages: Message[]) => Promise | void, + ): () => void { + let running = true; + let timeout: number | undefined; + const batchSize = options.batchSize || 1; + const run = async () => { + try { + if (!running) { + return; + } + const messages = await this.poll(queue, batchSize); + if (messages.length > 0) { + await callback(messages); + } + // If we got exactly the batch size, there might be more messages + if (messages.length === batchSize) { + await run(); + } + if (timeout) { + clearTimeout(timeout); + } + timeout = setTimeout(run, options.pollInterval || 5000); + } catch (e: any) { + console.error("Error in MQ subscription handler", e); + } + }; + + // Register as a local subscription handler + const localSubscriptions = this.localSubscriptions.get(queue); + if (!localSubscriptions) { + this.localSubscriptions.set(queue, new Set([run])); + } else { + localSubscriptions.add(run); + } + + // Run the first time (which will schedule subsequent polling intervals) + run(); + + // And return an unsubscribe function + return () => { + running = false; + if (timeout) { + clearTimeout(timeout); + } + // Remove the subscription from localSubscriptions + const queueSubscriptions = this.localSubscriptions.get(queue); + if (queueSubscriptions) { + queueSubscriptions.delete(run); + } + }; + } + + ack(queue: string, id: string) { + return this.batchAck(queue, [id]); + } + + async batchAck(queue: string, ids: string[]) { + await this.processing.bulkDelete(ids.map((id) => [queue, id])); + } + + async requeueTimeouts(timeout: number, maxRetries?: number) { + const now = Date.now(); + const messages = await this.processing.where("ts").below(now - timeout) + .toArray(); + const ids: [string, string][] = messages.map((m) => [m.queue, m.id]); + await this.db.transaction( + "rw", + [this.queued, this.processing, this.dlq], + async (tx) => { + await tx.table("processing").bulkDelete(ids); + const requeuedMessages: ProcessingMessage[] = []; + const dlqMessages: ProcessingMessage[] = []; + for (const m of messages) { + const retries = (m.retries || 0) + 1; + if (maxRetries && retries > maxRetries) { + console.warn( + "[mq]", + "Message exceeded max retries, moving to DLQ", + m, + ); + dlqMessages.push({ + queue: m.queue, + id: m.id, + body: m.body, + ts: Date.now(), + retries, + }); + } else { + console.info("[mq]", "Message ack timed out, requeueing", m); + requeuedMessages.push({ + ...m, + retries, + }); + } + } + await tx.table("queued").bulkPut(requeuedMessages); + await tx.table("dlq").bulkPut(dlqMessages); + }, + ); + } + + fetchDLQMessages(): Promise { + return this.dlq.toArray(); + } + + fetchProcessingMessages(): Promise { + return this.processing.toArray(); + } + + flushDLQ(): Promise { + return this.dlq.clear(); + } + + getQueueStats(queue: string): Promise { + return this.db.transaction( + "r", + [this.queued, this.processing, this.dlq], + async (tx) => { + const queued = await tx.table("queued").where({ queue }).count(); + const processing = await tx.table("processing").where({ queue }) + .count(); + const dlq = await tx.table("dlq").where({ queue }).count(); + return { + queued, + processing, + dlq, + }; + }, + ); + } + + async getAllQueueStats(): Promise> { + const allStatus: Record = {}; + await this.db.transaction( + "r", + [this.queued, this.processing, this.dlq], + async (tx) => { + for (const item of await tx.table("queued").toArray()) { + if (!allStatus[item.queue]) { + allStatus[item.queue] = { + queued: 0, + processing: 0, + dlq: 0, + }; + } + allStatus[item.queue].queued++; + } + for (const item of await tx.table("processing").toArray()) { + if (!allStatus[item.queue]) { + allStatus[item.queue] = { + queued: 0, + processing: 0, + dlq: 0, + }; + } + allStatus[item.queue].processing++; + } + for (const item of await tx.table("dlq").toArray()) { + if (!allStatus[item.queue]) { + allStatus[item.queue] = { + queued: 0, + processing: 0, + dlq: 0, + }; + } + allStatus[item.queue].dlq++; + } + }, + ); + + return allStatus; + } +} diff --git a/plugos/lib/mq_util.ts b/plugos/lib/mq_util.ts new file mode 100644 index 00000000..6fbe3b70 --- /dev/null +++ b/plugos/lib/mq_util.ts @@ -0,0 +1,7 @@ +// Adds a plug name to a queue name if it doesn't already have one. +export function fullQueueName(plugName: string, queueName: string) { + if (queueName.includes(".")) { + return queueName; + } + return plugName + "." + queueName; +} diff --git a/plugos/syscalls/mq.dexie.ts b/plugos/syscalls/mq.dexie.ts new file mode 100644 index 00000000..d303cbb4 --- /dev/null +++ b/plugos/syscalls/mq.dexie.ts @@ -0,0 +1,22 @@ +import { SysCallMapping } from "../system.ts"; +import { DexieMQ } from "../lib/mq.dexie.ts"; +import { fullQueueName } from "../lib/mq_util.ts"; + +export function mqSyscalls( + mq: DexieMQ, +): SysCallMapping { + return { + "mq.send": (ctx, queue: string, body: any) => { + return mq.send(fullQueueName(ctx.plug.name!, queue), body); + }, + "mq.batchSend": (ctx, queue: string, bodies: any[]) => { + return mq.batchSend(fullQueueName(ctx.plug.name!, queue), bodies); + }, + "mq.ack": (ctx, queue: string, id: string) => { + return mq.ack(fullQueueName(ctx.plug.name!, queue), id); + }, + "mq.batchAck": (ctx, queue: string, ids: string[]) => { + return mq.batchAck(fullQueueName(ctx.plug.name!, queue), ids); + }, + }; +} diff --git a/plugs/core/core.plug.yaml b/plugs/core/core.plug.yaml index 67f16021..275d82a1 100644 --- a/plugs/core/core.plug.yaml +++ b/plugs/core/core.plug.yaml @@ -52,6 +52,12 @@ functions: path: "./page.ts:reindexCommand" command: name: "Space: Reindex" + processIndexQueue: + path: ./page.ts:processIndexQueue + mqSubscriptions: + - queue: indexQueue + batchSize: 10 + autoAck: true reindexSpace: path: "./page.ts:reindexSpace" deletePage: diff --git a/plugs/core/page.ts b/plugs/core/page.ts index 4c4990d9..b91ece1c 100644 --- a/plugs/core/page.ts +++ b/plugs/core/page.ts @@ -10,10 +10,11 @@ import { space, } from "$sb/silverbullet-syscall/mod.ts"; -import { events } from "$sb/plugos-syscall/mod.ts"; +import { events, mq } from "$sb/plugos-syscall/mod.ts"; import { applyQuery } from "$sb/lib/query.ts"; import { invokeFunction } from "$sb/silverbullet-syscall/system.ts"; +import type { Message } from "$sb/mq.ts"; // Key space: // meta: => metaJson @@ -82,9 +83,17 @@ export async function newPageCommand() { } export async function reindexCommand() { - await editor.flashNotification("Reindexing..."); - await reindexSpace(); - await editor.flashNotification("Reindexing done"); + await editor.flashNotification("Scheduling full reindex..."); + console.log("Clearing page index..."); + await index.clearPageIndex(); + // Executed this way to not have to embed the search plug code here + await invokeFunction("client", "search.clearIndex"); + const pages = await space.listPages(); + + await mq.batchSend("indexQueue", pages.map((page) => page.name)); + + // console.log("Indexing queued!"); + // await editor.flashNotification("Reindexing done"); } // Completion @@ -128,6 +137,20 @@ export async function reindexSpace() { console.log("Indexing completed!"); } +export async function processIndexQueue(messages: Message[]) { + // console.log("Processing batch of", messages.length, "pages to index"); + for (const message of messages) { + const name: string = message.body; + console.log(`Indexing page ${name}`); + const text = await space.readPage(name); + const parsed = await markdown.parseMarkdown(text); + await events.dispatchEvent("page:index", { + name, + tree: parsed, + }); + } +} + export async function clearPageIndex(page: string) { // console.log("Clearing page index for page", page); await index.clearPageIndexForPage(page); diff --git a/web/client.ts b/web/client.ts index c3e0bdd6..03853baa 100644 --- a/web/client.ts +++ b/web/client.ts @@ -33,6 +33,7 @@ import { ClientSystem } from "./client_system.ts"; import { createEditorState } from "./editor_state.ts"; import { OpenPages } from "./open_pages.ts"; import { MainUI } from "./editor_ui.tsx"; +import { DexieMQ } from "../plugos/lib/mq.dexie.ts"; const frontMatterRegex = /^---\n(([^\n]|\n)*?)---\n/; const autoSaveInterval = 1000; @@ -75,6 +76,7 @@ export class Client { syncService: SyncService; settings!: BuiltinSettings; kvStore: DexieKVStore; + mq: DexieMQ; // Event bus used to communicate between components eventHook: EventHook; @@ -94,6 +96,13 @@ export class Client { globalThis.indexedDB, ); + this.mq = new DexieMQ(`${this.dbPrefix}_mq`, indexedDB, IDBKeyRange); + + setInterval(() => { + // Timeout after 5s + this.mq.requeueTimeouts(5000, 3).catch(console.error); + }, 20000); // Look to requeue every 20s + // Event hook this.eventHook = new EventHook(); @@ -101,6 +110,7 @@ export class Client { this.system = new ClientSystem( this, this.kvStore, + this.mq, this.dbPrefix, this.eventHook, ); diff --git a/web/client_system.ts b/web/client_system.ts index ae2211d6..93dfd3b7 100644 --- a/web/client_system.ts +++ b/web/client_system.ts @@ -30,6 +30,9 @@ import { loadMarkdownExtensions, MDExt, } from "../common/markdown_parser/markdown_ext.ts"; +import { DexieMQ } from "../plugos/lib/mq.dexie.ts"; +import { MQHook } from "../plugos/hooks/mq.ts"; +import { mqSyscalls } from "../plugos/syscalls/mq.dexie.ts"; export class ClientSystem { system: System = new System("client"); @@ -44,6 +47,7 @@ export class ClientSystem { constructor( private editor: Client, private kvStore: DexieKVStore, + private mq: DexieMQ, private dbPrefix: string, private eventHook: EventHook, ) { @@ -66,6 +70,9 @@ export class ClientSystem { this.codeWidgetHook = new CodeWidgetHook(); this.system.addHook(this.codeWidgetHook); + // MQ hook + this.system.addHook(new MQHook(this.system, this.mq)); + // Command hook this.commandHook = new CommandHook(); this.commandHook.on({ @@ -115,6 +122,7 @@ export class ClientSystem { markdownSyscalls(buildMarkdown(this.mdExtensions)), assetSyscalls(this.system), yamlSyscalls(), + mqSyscalls(this.mq), storeCalls, this.indexSyscalls, debugSyscalls(), diff --git a/website/SilverBullet.md b/website/SilverBullet.md index 09dfa56f..369710ff 100644 --- a/website/SilverBullet.md +++ b/website/SilverBullet.md @@ -2,7 +2,7 @@ SilverBullet is an extensible, [open source](https://github.com/silverbulletmd/s You’ve been told there is _no such thing_ as a [silver bullet](https://en.wikipedia.org/wiki/Silver_bullet). You were told wrong. -Before we get to the nitty gritty, some _quick links_ for the impatient reader: [[Install]], [[Manual]], [[CHANGELOG]], [Roadmap](https://github.com/orgs/silverbulletmd/projects/2/views/1), [Issues](https://github.com/silverbulletmd/silverbullet/issues), [Discussions](https://github.com/silverbulletmd/silverbullet/discussions), [Mastodon](https://hachyderm.io/@silverbullet), [Discord](https://discord.gg/EvXbFucTxn), [Docker Hub](https://hub.docker.com/r/zefhemel/silverbullet). +Before we get to the nitty gritty, some _quick links_ for the impatient reader: [[Install]], [[Manual]], [[CHANGELOG]], [Roadmap](https://github.com/orgs/silverbulletmd/projects/2/views/1), [Issues](https://github.com/silverbulletmd/silverbullet/issues), [Discussions](https://github.com/silverbulletmd/silverbullet/discussions), [Mastodon](https://fosstodon.org/@silverbulletmd), [Discord](https://discord.gg/EvXbFucTxn), [Docker Hub](https://hub.docker.com/r/zefhemel/silverbullet). Now that we got that out of the way let’s have a look at some of SilverBullet’s features.