diff --git a/lib/data/mq.datastore.test.ts b/lib/data/mq.datastore.test.ts index bdac6e7d..96098325 100644 --- a/lib/data/mq.datastore.test.ts +++ b/lib/data/mq.datastore.test.ts @@ -3,67 +3,73 @@ import { assertEquals } from "$lib/test_deps.ts"; import { DenoKvPrimitives } from "./deno_kv_primitives.ts"; import { DataStore } from "./datastore.ts"; import { PrefixedKvPrimitives } from "./prefixed_kv_primitives.ts"; -import { sleep } from "$lib/async.ts"; +import { FakeTime } from "$std/testing/time.ts"; +import type { MQMessage } from "$type/types.ts"; -Deno.test("DataStore MQ", { - sanitizeResources: false, - sanitizeOps: false, -}, async () => { +Deno.test("DataStore MQ", async () => { + const time = new FakeTime(); const tmpFile = await Deno.makeTempFile(); const db = new DenoKvPrimitives(await Deno.openKv(tmpFile)); - const mq = new DataStoreMQ( - new DataStore(new PrefixedKvPrimitives(db, ["mq"])), - ); - await mq.send("test", "Hello World"); - let messages = await mq.poll("test", 10); - assertEquals(messages.length, 1); - await mq.ack("test", messages[0].id); - assertEquals([], await mq.poll("test", 10)); - await mq.send("test", "Hello World"); - messages = await mq.poll("test", 10); - assertEquals(messages.length, 1); - assertEquals([], await mq.poll("test", 10)); - await sleep(20); - await mq.requeueTimeouts(10); - messages = await mq.poll("test", 10); - const stats = await mq.getAllQueueStats(); - assertEquals(stats["test"].processing, 1); - assertEquals(messages.length, 1); - assertEquals(messages[0].retries, 1); - await sleep(20); - await mq.requeueTimeouts(10, 1); - assertEquals((await mq.fetchDLQMessages()).length, 1); + try { + const mq = new DataStoreMQ( + new DataStore(new PrefixedKvPrimitives(db, ["mq"])), + ); - let receivedMessage = false; - const unsubscribe = mq.subscribe("test123", {}, async (messages) => { + let messages: MQMessage[]; + + // Send and ack + await mq.send("test", "Hello World"); + messages = await mq.poll("test", 10); assertEquals(messages.length, 1); - receivedMessage = true; - console.log("RECEIVED TEH EMSSSAGE"); - await mq.ack("test123", messages[0].id); - }); - await mq.send("test123", "Hello World"); - console.log("After send"); - // Give time to process the message - await sleep(10); - console.log("After sleep"); - assertEquals(receivedMessage, true); - unsubscribe(); + await mq.ack("test", messages[0].id); + assertEquals([], await mq.poll("test", 10)); - // Batch send - await mq.batchSend("test", ["Hello", "World"]); - const messageBatch1 = await mq.poll("test", 1); - assertEquals(messageBatch1.length, 1); - assertEquals(messageBatch1[0].body, "Hello"); - const messageBatch2 = await mq.poll("test", 1); - assertEquals(messageBatch2.length, 1); - assertEquals(messageBatch2[0].body, "World"); + // Timeout + await mq.send("test", "Hello World"); + messages = await mq.poll("test", 10); + assertEquals(messages.length, 1); + assertEquals([], await mq.poll("test", 10)); + await time.tickAsync(20); + await mq.requeueTimeouts(10); + messages = await mq.poll("test", 10); + const stats = await mq.getAllQueueStats(); + assertEquals(stats["test"].processing, 1); + assertEquals(messages.length, 1); + assertEquals(messages[0].retries, 1); - await mq.batchAck("test", [messageBatch1[0].id, messageBatch2[0].id]); - assertEquals(await mq.fetchProcessingMessages(), []); - // Give time to close the db - await sleep(20); + // Max retries + await time.tickAsync(20); + await mq.requeueTimeouts(10, 1); + assertEquals((await mq.fetchDLQMessages()).length, 1); - db.close(); - await Deno.remove(tmpFile); + // Subscribe + let receivedMessage = false; + const unsubscribe = mq.subscribe("test123", {}, async (messages) => { + assertEquals(messages.length, 1); + receivedMessage = true; + await mq.ack("test123", messages[0].id); + }); + await mq.send("test123", "Hello World"); + while ((await mq.getQueueStats("test123")).queued > 0) { + // Wait for send to be processed + } + assertEquals(receivedMessage, true); + unsubscribe(); + + // Batch send and ack + await mq.batchSend("test", ["Hello", "World"]); + const messageBatch1 = await mq.poll("test", 1); + assertEquals(messageBatch1.length, 1); + assertEquals(messageBatch1[0].body, "Hello"); + const messageBatch2 = await mq.poll("test", 1); + assertEquals(messageBatch2.length, 1); + assertEquals(messageBatch2[0].body, "World"); + await mq.batchAck("test", [messageBatch1[0].id, messageBatch2[0].id]); + assertEquals(await mq.fetchProcessingMessages(), []); + } finally { + db.close(); + await Deno.remove(tmpFile); + await time.restore(); + } });