More cleanup
parent
b644801e7b
commit
f366a2fd63
|
@ -4,7 +4,7 @@ import { System } from "../plugos/system.ts";
|
||||||
const indexVersionKey = ["$indexVersion"];
|
const indexVersionKey = ["$indexVersion"];
|
||||||
|
|
||||||
// Bump this one every time a full reinxex is needed
|
// Bump this one every time a full reinxex is needed
|
||||||
const desiredIndexVersion = 2;
|
const desiredIndexVersion = 1;
|
||||||
|
|
||||||
let indexOngoing = false;
|
let indexOngoing = false;
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,6 @@ export class DataStoreMQ implements MessageQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
async poll(queue: string, maxItems: number): Promise<MQMessage[]> {
|
async poll(queue: string, maxItems: number): Promise<MQMessage[]> {
|
||||||
// console.log("Polling", queue, maxItems);
|
|
||||||
// console.trace();
|
|
||||||
// Note: this is not happening in a transactional way, so we may get duplicate message delivery
|
// Note: this is not happening in a transactional way, so we may get duplicate message delivery
|
||||||
// Retrieve a batch of messages
|
// Retrieve a batch of messages
|
||||||
const messages = await this.ds.query<MQMessage>({
|
const messages = await this.ds.query<MQMessage>({
|
||||||
|
@ -109,7 +107,7 @@ export class DataStoreMQ implements MessageQueue {
|
||||||
if (timeout) {
|
if (timeout) {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
}
|
}
|
||||||
timeout = setTimeout(run, options.pollInterval || 5000);
|
// timeout = setTimeout(run, options.pollInterval || 5000);
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
console.error("Error in MQ subscription handler", e);
|
console.error("Error in MQ subscription handler", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { KV, KvKey, KvQuery } from "$sb/types.ts";
|
import { KV, KvKey, KvQuery } from "$sb/types.ts";
|
||||||
import type { DataStore, IDataStore } from "../lib/datastore.ts";
|
import type { IDataStore } from "../lib/datastore.ts";
|
||||||
import type { SyscallContext, SysCallMapping } from "../system.ts";
|
import type { SyscallContext, SysCallMapping } from "../system.ts";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -9,7 +9,6 @@ import type { SyscallContext, SysCallMapping } from "../system.ts";
|
||||||
*/
|
*/
|
||||||
export function dataStoreSyscalls(
|
export function dataStoreSyscalls(
|
||||||
ds: IDataStore,
|
ds: IDataStore,
|
||||||
prefix: KvKey = ["ds"],
|
|
||||||
): SysCallMapping {
|
): SysCallMapping {
|
||||||
return {
|
return {
|
||||||
"datastore.delete": (ctx, key: KvKey) => {
|
"datastore.delete": (ctx, key: KvKey) => {
|
||||||
|
@ -66,10 +65,10 @@ export function dataStoreSyscalls(
|
||||||
};
|
};
|
||||||
|
|
||||||
function applyPrefix(ctx: SyscallContext, key?: KvKey): KvKey {
|
function applyPrefix(ctx: SyscallContext, key?: KvKey): KvKey {
|
||||||
return [...prefix, ctx.plug.name!, ...(key ? key : [])];
|
return [ctx.plug.name!, ...(key ? key : [])];
|
||||||
}
|
}
|
||||||
|
|
||||||
function stripPrefix(key: KvKey): KvKey {
|
function stripPrefix(key: KvKey): KvKey {
|
||||||
return key.slice(prefix.length + 1);
|
return key.slice(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,9 @@ export function dataStoreProxySyscalls(
|
||||||
]);
|
]);
|
||||||
},
|
},
|
||||||
"datastore.batchDelete": (ctx, keys: KvKey[]) => {
|
"datastore.batchDelete": (ctx, keys: KvKey[]) => {
|
||||||
|
if (keys.length === 0) {
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
return rpcCall(
|
return rpcCall(
|
||||||
httpSpacePrimitives,
|
httpSpacePrimitives,
|
||||||
"datastore.batchDelete",
|
"datastore.batchDelete",
|
||||||
|
@ -24,6 +27,9 @@ export function dataStoreProxySyscalls(
|
||||||
]);
|
]);
|
||||||
},
|
},
|
||||||
"datastore.batchSet": (ctx, entries: KV[]) => {
|
"datastore.batchSet": (ctx, entries: KV[]) => {
|
||||||
|
if (entries.length === 0) {
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
return rpcCall(
|
return rpcCall(
|
||||||
httpSpacePrimitives,
|
httpSpacePrimitives,
|
||||||
"datastore.batchSet",
|
"datastore.batchSet",
|
||||||
|
@ -41,6 +47,9 @@ export function dataStoreProxySyscalls(
|
||||||
return result;
|
return result;
|
||||||
},
|
},
|
||||||
"datastore.batchGet": (ctx, keys: KvKey[]) => {
|
"datastore.batchGet": (ctx, keys: KvKey[]) => {
|
||||||
|
if (keys.length === 0) {
|
||||||
|
return Promise.resolve([]);
|
||||||
|
}
|
||||||
return rpcCall(
|
return rpcCall(
|
||||||
httpSpacePrimitives,
|
httpSpacePrimitives,
|
||||||
"datastore.batchGet",
|
"datastore.batchGet",
|
||||||
|
|
Loading…
Reference in New Issue