improve mq.datastore test (#751)

pull/753/head
Maks 2024-02-25 08:45:18 +01:00 committed by GitHub
parent 3844718139
commit 481387d235
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 60 additions and 54 deletions

View File

@ -3,54 +3,61 @@ import { assertEquals } from "$lib/test_deps.ts";
import { DenoKvPrimitives } from "./deno_kv_primitives.ts"; import { DenoKvPrimitives } from "./deno_kv_primitives.ts";
import { DataStore } from "./datastore.ts"; import { DataStore } from "./datastore.ts";
import { PrefixedKvPrimitives } from "./prefixed_kv_primitives.ts"; import { PrefixedKvPrimitives } from "./prefixed_kv_primitives.ts";
import { sleep } from "$lib/async.ts"; import { FakeTime } from "$std/testing/time.ts";
import type { MQMessage } from "$type/types.ts";
Deno.test("DataStore MQ", { Deno.test("DataStore MQ", async () => {
sanitizeResources: false, const time = new FakeTime();
sanitizeOps: false,
}, async () => {
const tmpFile = await Deno.makeTempFile(); const tmpFile = await Deno.makeTempFile();
const db = new DenoKvPrimitives(await Deno.openKv(tmpFile)); const db = new DenoKvPrimitives(await Deno.openKv(tmpFile));
try {
const mq = new DataStoreMQ( const mq = new DataStoreMQ(
new DataStore(new PrefixedKvPrimitives(db, ["mq"])), new DataStore(new PrefixedKvPrimitives(db, ["mq"])),
); );
let messages: MQMessage[];
// Send and ack
await mq.send("test", "Hello World"); await mq.send("test", "Hello World");
let messages = await mq.poll("test", 10); messages = await mq.poll("test", 10);
assertEquals(messages.length, 1); assertEquals(messages.length, 1);
await mq.ack("test", messages[0].id); await mq.ack("test", messages[0].id);
assertEquals([], await mq.poll("test", 10)); assertEquals([], await mq.poll("test", 10));
// Timeout
await mq.send("test", "Hello World"); await mq.send("test", "Hello World");
messages = await mq.poll("test", 10); messages = await mq.poll("test", 10);
assertEquals(messages.length, 1); assertEquals(messages.length, 1);
assertEquals([], await mq.poll("test", 10)); assertEquals([], await mq.poll("test", 10));
await sleep(20); await time.tickAsync(20);
await mq.requeueTimeouts(10); await mq.requeueTimeouts(10);
messages = await mq.poll("test", 10); messages = await mq.poll("test", 10);
const stats = await mq.getAllQueueStats(); const stats = await mq.getAllQueueStats();
assertEquals(stats["test"].processing, 1); assertEquals(stats["test"].processing, 1);
assertEquals(messages.length, 1); assertEquals(messages.length, 1);
assertEquals(messages[0].retries, 1); assertEquals(messages[0].retries, 1);
await sleep(20);
// Max retries
await time.tickAsync(20);
await mq.requeueTimeouts(10, 1); await mq.requeueTimeouts(10, 1);
assertEquals((await mq.fetchDLQMessages()).length, 1); assertEquals((await mq.fetchDLQMessages()).length, 1);
// Subscribe
let receivedMessage = false; let receivedMessage = false;
const unsubscribe = mq.subscribe("test123", {}, async (messages) => { const unsubscribe = mq.subscribe("test123", {}, async (messages) => {
assertEquals(messages.length, 1); assertEquals(messages.length, 1);
receivedMessage = true; receivedMessage = true;
console.log("RECEIVED TEH EMSSSAGE");
await mq.ack("test123", messages[0].id); await mq.ack("test123", messages[0].id);
}); });
await mq.send("test123", "Hello World"); await mq.send("test123", "Hello World");
console.log("After send"); while ((await mq.getQueueStats("test123")).queued > 0) {
// Give time to process the message // Wait for send to be processed
await sleep(10); }
console.log("After sleep");
assertEquals(receivedMessage, true); assertEquals(receivedMessage, true);
unsubscribe(); unsubscribe();
// Batch send // Batch send and ack
await mq.batchSend("test", ["Hello", "World"]); await mq.batchSend("test", ["Hello", "World"]);
const messageBatch1 = await mq.poll("test", 1); const messageBatch1 = await mq.poll("test", 1);
assertEquals(messageBatch1.length, 1); assertEquals(messageBatch1.length, 1);
@ -58,12 +65,11 @@ Deno.test("DataStore MQ", {
const messageBatch2 = await mq.poll("test", 1); const messageBatch2 = await mq.poll("test", 1);
assertEquals(messageBatch2.length, 1); assertEquals(messageBatch2.length, 1);
assertEquals(messageBatch2[0].body, "World"); assertEquals(messageBatch2[0].body, "World");
await mq.batchAck("test", [messageBatch1[0].id, messageBatch2[0].id]); await mq.batchAck("test", [messageBatch1[0].id, messageBatch2[0].id]);
assertEquals(await mq.fetchProcessingMessages(), []); assertEquals(await mq.fetchProcessingMessages(), []);
// Give time to close the db } finally {
await sleep(20);
db.close(); db.close();
await Deno.remove(tmpFile); await Deno.remove(tmpFile);
await time.restore();
}
}); });