silverbullet/lib/data/mq.datastore.test.ts

76 lines
2.5 KiB
TypeScript
Raw Normal View History

import { DataStoreMQ } from "./mq.datastore.ts";
import { assertEquals } from "$lib/test_deps.ts";
import { DenoKvPrimitives } from "./deno_kv_primitives.ts";
import { DataStore } from "./datastore.ts";
2023-12-11 19:36:35 +08:00
import { PrefixedKvPrimitives } from "./prefixed_kv_primitives.ts";
2024-02-25 15:45:18 +08:00
import { FakeTime } from "$std/testing/time.ts";
2024-02-29 22:23:05 +08:00
import type { MQMessage } from "../../plug-api/types.ts";
2023-08-11 00:32:41 +08:00
2024-02-25 15:45:18 +08:00
Deno.test("DataStore MQ", async () => {
const time = new FakeTime();
const tmpFile = await Deno.makeTempFile();
const db = new DenoKvPrimitives(await Deno.openKv(tmpFile));
2024-02-25 15:45:18 +08:00
try {
const mq = new DataStoreMQ(
new DataStore(new PrefixedKvPrimitives(db, ["mq"])),
);
2023-08-11 00:32:41 +08:00
2024-02-25 15:45:18 +08:00
let messages: MQMessage[];
// Send and ack
await mq.send("test", "Hello World");
messages = await mq.poll("test", 10);
assertEquals(messages.length, 1);
await mq.ack("test", messages[0].id);
assertEquals([], await mq.poll("test", 10));
// Timeout
await mq.send("test", "Hello World");
messages = await mq.poll("test", 10);
assertEquals(messages.length, 1);
assertEquals([], await mq.poll("test", 10));
await time.tickAsync(20);
await mq.requeueTimeouts(10);
messages = await mq.poll("test", 10);
const stats = await mq.getAllQueueStats();
assertEquals(stats["test"].processing, 1);
2023-08-11 00:32:41 +08:00
assertEquals(messages.length, 1);
2024-02-25 15:45:18 +08:00
assertEquals(messages[0].retries, 1);
2023-08-11 00:32:41 +08:00
2024-02-25 15:45:18 +08:00
// Max retries
await time.tickAsync(20);
await mq.requeueTimeouts(10, 1);
assertEquals((await mq.fetchDLQMessages()).length, 1);
2023-08-11 00:32:41 +08:00
2024-02-25 15:45:18 +08:00
// Subscribe
let receivedMessage = false;
const unsubscribe = mq.subscribe("test123", {}, async (messages) => {
assertEquals(messages.length, 1);
receivedMessage = true;
await mq.ack("test123", messages[0].id);
});
await mq.send("test123", "Hello World");
while ((await mq.getQueueStats("test123")).queued > 0) {
// Wait for send to be processed
}
assertEquals(receivedMessage, true);
unsubscribe();
2024-02-25 15:45:18 +08:00
// Batch send and ack
await mq.batchSend("test", ["Hello", "World"]);
const messageBatch1 = await mq.poll("test", 1);
assertEquals(messageBatch1.length, 1);
assertEquals(messageBatch1[0].body, "Hello");
const messageBatch2 = await mq.poll("test", 1);
assertEquals(messageBatch2.length, 1);
assertEquals(messageBatch2[0].body, "World");
await mq.batchAck("test", [messageBatch1[0].id, messageBatch2[0].id]);
assertEquals(await mq.fetchProcessingMessages(), []);
} finally {
db.close();
await Deno.remove(tmpFile);
await time.restore();
}
2023-08-11 00:32:41 +08:00
});