import { afterEach, beforeEach, describe, it } from "@std/testing/bdd"; import { expect } from "@std/expect"; import { Identity } from "@commontools/identity"; import { StorageManager } from "@commontools/runner/storage/cache.deno"; import type { IExtendedStorageTransaction, IStorageSubscription, StorageNotification, } from "../src/storage/interface.ts"; import type { MIME, URI } from "@commontools/memory/interface"; import * as Fact from "@commontools/memory/fact"; import * as Changes from "@commontools/memory/changes"; import { Runtime } from "../src/runtime.ts"; const signer = await Identity.fromPassphrase("test storage subscription"); const space = signer.did(); class Subscription implements IStorageSubscription { notifications: StorageNotification[] = []; next(notification: StorageNotification) { this.notifications.push(notification); return { done: false }; } get commits() { return this.notifications.filter((n) => n.type === "commit"); } get reverts() { return this.notifications.filter((n) => n.type === "revert"); } get loads() { return this.notifications.filter((n) => n.type === "load"); } get pulls() { return this.notifications.filter((n) => n.type === "pull"); } get integrates() { return this.notifications.filter((n) => n.type === "integrate"); } get resets() { return this.notifications.filter((n) => n.type === "reset"); } } describe("Storage Subscription", () => { let runtime: Runtime; let storageManager: ReturnType; let tx: IExtendedStorageTransaction; beforeEach(() => { storageManager = StorageManager.emulate({ as: signer }); runtime = new Runtime({ apiUrl: new URL(import.meta.url), storageManager, }); tx = runtime.edit(); }); afterEach(async () => { // Only commit if the transaction hasn't been committed yet const status = tx?.status(); if (tx && status?.status === "ready") { await tx.commit(); } await runtime?.dispose(); await storageManager?.close(); // Allow pending operations to complete await new Promise((resolve) => setTimeout(resolve, 1)); }); describe("commit notifications", () => { it("should receive commit notification when transaction is committed", async () => { const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); // Use direct transaction operations like the original test const entityId = `of:test-commit-${Date.now()}` as URI; const value = { message: "Hello, world!" }; tx.write({ space, id: entityId, type: "application/json", path: [], }, { value }); await tx.commit(); expect(subscription.commits.length).toBeGreaterThanOrEqual(1); const commit = subscription.commits[subscription.commits.length - 1]; expect(commit.type).toBe("commit"); expect(commit.space).toBe(space); expect(commit.source).toBe(tx.tx); expect([...commit.changes].map((c) => c.after)).toContainEqual({ value, }); }); it("should include source transaction in commit notification", async () => { const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); // Create a cell and write some data const cell = runtime.getCell<{ test: string }>( space, "test:source", undefined, tx, ); cell.set({ test: "data" }); await tx.commit(); expect(subscription.commits.length).toBeGreaterThanOrEqual(1); const commit = subscription.commits[0]; expect(commit.type).toBe("commit"); expect(commit.space).toBe(space); expect(commit.source).toBe(tx.tx); expect([...commit.changes].map((c) => c.after)).toContainEqual({ value: { test: "data" }, }); }); it("should handle multiple subscribers", async () => { const subscription1 = new Subscription(); const subscription2 = new Subscription(); runtime.storageManager.subscribe(subscription1); runtime.storageManager.subscribe(subscription2); // Create a cell and write some data const cell = runtime.getCell<{ value: number }>( space, "test:multi", undefined, tx, ); cell.set({ value: 42 }); await tx.commit(); // Both subscribers should receive the same notification expect(subscription1.commits.length).toBeGreaterThanOrEqual(1); expect(subscription2.commits.length).toBeGreaterThanOrEqual(1); const commit1 = subscription1.commits[0]; const commit2 = subscription2.commits[0]; expect(commit1.type).toBe("commit"); expect(commit1.space).toBe(space); expect(commit1.source).toBe(tx.tx); expect(commit2.type).toBe("commit"); expect(commit2.space).toBe(space); expect(commit2.source).toBe(tx.tx); // Both should have the same changes expect([...commit1.changes].map((c) => c.after)).toContainEqual({ value: { value: 42 }, }); expect([...commit2.changes].map((c) => c.after)).toContainEqual({ value: { value: 42 }, }); }); }); describe("revert notifications", () => { it("should receive revert notification on conflict", async () => { // Create memory from session before subscribing const memory = storageManager.session().mount(space); const entityId = `test:conflict-${Date.now()}` as URI; // Transact the memory so we have some fact in there const existingFact = Fact.assert({ the: "application/json", of: entityId, is: { version: 1 }, }); await memory.transact({ changes: Changes.from([existingFact]) }); // Get replica for the space and reset it (so it assumes it's empty) const { replica } = storageManager.open(space); (replica as any).reset(); // Now subscribe to notifications const subscription = new Subscription(); storageManager.subscribe(subscription); // Use storageManager.edit() to perform conflicting write and commit const tx = storageManager.edit(); tx.write({ space, id: entityId, type: "application/json", path: [], }, { version: 2 }); // Check that before commit provider.replica.get returns undefined const factAddress = { id: entityId, type: "application/json" as MIME }; expect(replica.get(factAddress)).toBeUndefined(); // Send commit (without await) and check optimistic update const commitPromise = tx.commit(); expect(replica.get(factAddress)?.is).toEqual({ version: 2 }); // After commit result is back, check final state const result = await commitPromise; expect(replica.get(factAddress)?.is).toEqual({ version: 1 }); // The commit should fail and generate a revert notification expect(result.ok).toBeFalsy(); expect(subscription.reverts.length).toBe(1); const revert = subscription.reverts[0]; expect(revert.type).toBe("revert"); expect(revert.space).toBe(space); expect(revert.reason).toBe(result.error); expect(revert.source).toBe(tx); const changes = [...revert.changes]; expect(changes.length).toBeGreaterThan(0); expect([...changes]).toContainEqual({ address: { id: entityId, type: "application/json", path: [] }, before: { version: 2 }, after: { version: 1 }, }); }); }); describe("load notifications", () => { it("should receive load notification when data is loaded from cache", async () => { // Subscribe to notifications const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); // Get the replica and call load to trigger load notification const { replica } = storageManager.open(space); const entityId = `test:load-${Date.now()}` as URI; const factAddress = { the: "application/json", of: entityId }; await (replica as any).load([[factAddress, undefined]]); // Check for load notifications expect(subscription.loads.length).toBeGreaterThanOrEqual(1); const load = subscription.loads[0]; expect(load.type).toBe("load"); expect(load.space).toBe(space); // Note that because cache is disabled we will not be able to load anything // from cache so we can not test case where load gets something back. const changes = [...load.changes]; expect(changes.length).toBeGreaterThanOrEqual(0); }); }); describe("pull notifications", () => { it("should receive pull notification when data is pulled from remote", async () => { // Put something in the memory first const memory = storageManager.session().mount(space); const entityId = `test:pull-${Date.now()}` as URI; const fact = Fact.assert({ the: "application/json", of: entityId, is: { value: { data: "to be pulled" } }, }); await memory.transact({ changes: Changes.from([fact]) }); // Create subscription const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); // Call pull on the replica const { replica } = storageManager.open(space); const factAddress = { id: entityId, type: "application/json" }; await (replica as any).pull([[factAddress, undefined]]); // Check for pull notifications expect(subscription.pulls.length).toBeGreaterThanOrEqual(1); const pull = subscription.pulls[0]; expect(pull.type).toBe("pull"); expect(pull.space).toBe(space); expect([...pull.changes].map((c) => c.after)).toContainEqual({ value: { data: "to be pulled" }, }); }); }); describe("integrate notifications", () => { it("should receive integrate notification when data is integrated", async () => { // Subscribe to notifications const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); // Use Cell interface to write something and commit const cell1 = runtime.getCell<{ version: number }>( space, "test:integrate-1", undefined, tx, ); cell1.set({ version: 1 }); await tx.commit(); // Try another commit with different entity using a new transaction const tx2 = runtime.edit(); const cell2 = runtime.getCell<{ version: number }>( space, "test:integrate-2", undefined, tx2, ); cell2.set({ version: 2 }); await tx2.commit(); // When second commit is returned we should have integrate notification expect(subscription.integrates.length).toBeGreaterThan(0); const integrate = subscription.integrates[0]; expect(integrate.type).toBe("integrate"); expect(integrate.space).toBe(space); // Integrating upstream changes seem to have got broken due // to commits being reducted, so disabling this for now. // expect([...integrate.changes]).toBeGreaterThan(0); }); }); describe("reset notifications", () => { it("should receive reset notification when storage is reset", async () => { // Create a cell first const cell = runtime.getCell<{ data: string }>( space, "test:reset", undefined, tx, ); cell.set({ data: "to be reset" }); await tx.commit(); // Subscribe to notifications after commit const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); // Get the provider and trigger a reset operation const { replica } = storageManager.open(space); // Trigger a reset operation (replica as any).reset(); await new Promise((resolve) => setTimeout(resolve, 10)); // Check for reset notifications expect(subscription.resets.length).toBeGreaterThan(0); const reset = subscription.resets[0]; expect(reset.type).toBe("reset"); expect(reset.space).toBe(space); }); }); describe("subscription lifecycle", () => { it("should stop receiving notifications when subscription returns done", async () => { let notificationCount = 0; const subscription: IStorageSubscription = { next(_notification) { notificationCount++; // Return done after first notification return { done: notificationCount >= 1 }; }, }; runtime.storageManager.subscribe(subscription); // First transaction const cell1 = runtime.getCell<{ count: number }>( space, "test:lifecycle-1", undefined, tx, ); cell1.set({ count: 1 }); await tx.commit(); await new Promise((resolve) => setTimeout(resolve, 10)); // Second transaction const tx2 = runtime.edit(); const cell2 = runtime.getCell<{ count: number }>( space, "test:lifecycle-2", undefined, tx2, ); cell2.set({ count: 2 }); await tx2.commit(); await new Promise((resolve) => setTimeout(resolve, 10)); // Should only have received one notification expect(notificationCount).toBe(1); }); it("should handle subscription errors gracefully", async () => { let errorThrown = false; const errorSubscription: IStorageSubscription = { next(_notification) { if (!errorThrown) { errorThrown = true; throw new Error("Subscription error"); } return { done: false }; }, }; // Subscribe with error-throwing subscription runtime.storageManager.subscribe(errorSubscription); // Also subscribe with a normal subscription to verify system continues const normalSubscription = new Subscription(); runtime.storageManager.subscribe(normalSubscription); const cell = runtime.getCell<{ test: boolean }>( space, "test:error-test", undefined, tx, ); cell.set({ test: true }); await tx.commit(); await new Promise((resolve) => setTimeout(resolve, 10)); // Error subscription should have been called but error handled expect(errorThrown).toBe(true); // Normal subscription should still work expect(normalSubscription.notifications.length).toBeGreaterThan(0); }); }); describe("source cell", () => { let runtime: Runtime; let storageManager: ReturnType; let tx: IExtendedStorageTransaction; beforeEach(() => { storageManager = StorageManager.emulate({ as: signer }); runtime = new Runtime({ apiUrl: new URL(import.meta.url), storageManager, }); tx = runtime.edit(); }); afterEach(async () => { await tx.commit(); await runtime?.dispose(); await storageManager?.close(); }); it("should receive source cell notification when source cell is set", async () => { const subscription = new Subscription(); runtime.storageManager.subscribe(subscription); const cell = runtime.getCell<{ test: string }>( space, "source-cell-test", undefined, tx, ); cell.setSourceCell(cell); await tx.commit(); expect(subscription.commits.length).toBeGreaterThanOrEqual(1); const commit = subscription.commits[0]; expect(commit.type).toBe("commit"); expect(commit.space).toBe(space); expect([...commit.changes].length).toBeGreaterThanOrEqual(1); const change = [...commit.changes][0]; // The actual value contains the full document where source field has the JSON string expect(change.after).toEqual({ source: JSON.parse(JSON.stringify(cell.entityId)), }); expect((change.before as any)?.source).toBeUndefined(); }); }); });