import type { Result, SchemaPathSelector, Unit, URI, } from "@commontools/memory/interface"; import type { Cancel } from "../cancel.ts"; import { IStorageProvider, StorageValue } from "./interface.ts"; export type { Result, Unit }; export abstract class BaseStorageProvider implements IStorageProvider { protected subscribers = new Map void>>(); protected waitingForSync = new Map>(); protected waitingForSyncResolvers = new Map void>(); abstract send( batch: { uri: URI; value: StorageValue }[], ): Promise< { ok: object; error?: undefined } | { ok?: undefined; error: Error } >; abstract sync( uri: URI, selector?: SchemaPathSelector, ): Promise>; // TODO(@ubik2) //): Promise>, Error>>; abstract synced(): Promise; abstract get(uri: URI): StorageValue | undefined; sink(uri: URI, callback: (value: StorageValue) => void): Cancel { if (!this.subscribers.has(uri)) { this.subscribers.set(uri, new Set<(value: StorageValue) => void>()); } const listeners = this.subscribers.get(uri)!; listeners.add(callback); return () => { listeners.delete(callback); if (listeners.size === 0) this.subscribers.delete(uri); }; } protected notifySubscribers(key: string, value: StorageValue): void { const listeners = this.subscribers.get(key); if (this.waitingForSync.has(key) && listeners && listeners.size > 0) { throw new Error( "Subscribers are expected to only start after first sync.", ); } this.resolveWaitingForSync(key); if (listeners) { for (const listener of listeners) listener(value); } } protected waitForSync(key: string): Promise { if (!this.waitingForSync.has(key)) { this.waitingForSync.set( key, new Promise((r) => this.waitingForSyncResolvers.set(key, r)), ); } return this.waitingForSync.get(key)!; } protected resolveWaitingForSync(key: string): void { const resolver = this.waitingForSyncResolvers.get(key); if (resolver) { resolver(); this.waitingForSync.delete(key); } } abstract destroy(): Promise; abstract getReplica(): string | undefined; }