import * as Access from "./access.ts"; import { ACL, AsyncResult, Await, CauseString, CloseResult, Commit, ConnectionError, ConsumerCommandInvocation, ConsumerInvocationFor, ConsumerResultFor, EnhancedCommit, Fact, FactAddress, Invocation, InvocationURL, MemorySession, MemorySpace, Proto, Protocol, ProviderCommand, ProviderCommandFor, ProviderSession, Query, QueryError, Reference, Result, Revision, SchemaQuery, Selection, Subscribe, Subscriber, Transaction, UCAN, } from "./interface.ts"; import * as SelectionBuilder from "./selection.ts"; import * as Memory from "./memory.ts"; import { fromString as causeFromString, refer } from "./reference.ts"; import { redactCommitData, selectFact, type Session as SpaceSession, } from "./space.ts"; import { evaluateDocumentLinks } from "./space-schema.ts"; import * as Subscription from "./subscription.ts"; import * as FactModule from "./fact.ts"; import { setRevision } from "@commontools/memory/selection"; import { getLogger } from "@commontools/utils/logger"; import { ACL_TYPE, isACL } from "./acl.ts"; import { MapSet } from "@commontools/runner/traverse"; import { deepEqual } from "@commontools/runner"; import type { SchemaPathSelector } from "./consumer.ts"; const logger = getLogger("memory-provider", { enabled: false, level: "info", }); export * as Error from "./error.ts"; export * from "./interface.ts"; export * as Memory from "./memory.ts"; export * as Space from "./space.ts"; export * as Subscription from "./subscription.ts"; export * from "./util.ts"; // Convenient shorthand so I don't need this long type for this string type JobId = InvocationURL>>; export type Options = Memory.Options; export const open = async ( options: Options, ): AsyncResult, ConnectionError> => { const result = await Memory.open(options); if (result.error) { return result; } return { ok: new MemoryProvider(result.ok) }; }; /** * Creates an ephemeral memory provider. It does not persist anything * and it's primary use is in testing. */ export const emulate = (options: Memory.ServiceOptions): Provider => new MemoryProvider(Memory.emulate(options)); export const create = (memory: MemorySession): Provider => new MemoryProvider(memory); export interface Provider { fetch(request: Request): Promise; session(): ProviderSession; invoke( ucan: UCAN>, ): Await>; close(): CloseResult; } interface Session { memory: MemorySession; } class MemoryProvider< Space extends MemorySpace, MemoryProtocol extends Protocol, > implements Provider { sessions: Set> = new Set(); #localSession: MemoryProviderSession | null = null; constructor(public memory: MemorySession) {} invoke( ucan: UCAN>, ): Await> { let session = this.#localSession; if (!session) { session = new MemoryProviderSession(this.memory, null); } return session.invoke( ucan as unknown as UCAN>, ); } fetch(request: Request) { return fetch(this, request); } session(): ProviderSession { const session = new MemoryProviderSession( this.memory, this.sessions, ); this.sessions.add(session); return session; } async close() { const promises = []; for (const session of this.sessions) { promises.push(session.close()); } await Promise.all(promises); return this.memory.close(); } } export class SchemaSubscription { constructor( public invocation: SchemaQuery, // True if this is a wildcard query (of: "_") that can't use incremental updates public isWildcardQuery: boolean = false, ) {} } class MemoryProviderSession< Space extends MemorySpace, MemoryProtocol extends Protocol, > implements ProviderSession, Subscriber { readable: ReadableStream>; writable: WritableStream>>; controller: | ReadableStreamDefaultController> | undefined; channels: Map>, Set> = new Map(); schemaChannels: Map = new Map(); // Mapping from fact key to since value of the last fact sent to the client lastRevision: Map = new Map(); // Shared schema tracker for all subscriptions - tracks which docs were scanned with which schemas sharedSchemaTracker: MapSet = new MapSet( deepEqual, ); constructor( public memory: MemorySession, public sessions: null | Set>, ) { this.readable = new ReadableStream>({ start: (controller) => { try { return this.open(controller); } catch (error) { logger.error( "stream-error", () => ["ReadableStream start error:", error], ); throw error; } }, cancel: (reason) => { try { return this.cancel(); } catch (error) { logger.error( "stream-error", () => ["ReadableStream cancel error:", error, "Reason:", reason], ); throw error; } }, }); this.writable = new WritableStream< UCAN> >({ write: async (command) => { try { await this.invoke( command as UCAN>, ); } catch (error) { logger.error( "stream-error", () => ["WritableStream write error:", error], ); logger.error( "stream-error", () => ["Failed command:", JSON.stringify(command)], ); throw error; } }, abort: async (reason) => { try { logger.debug( "stream-abort", () => ["WritableStream abort called with reason:", reason], ); await this.close(); } catch (error) { logger.error( "stream-error", () => ["WritableStream abort error:", error], ); throw error; } }, close: async () => { try { logger.debug("stream-close", () => ["WritableStream close called"]); await this.close(); } catch (error) { logger.error( "stream-error", () => ["WritableStream close error:", error], ); throw error; } }, }); } perform( command: ProviderCommandFor, ) { this.controller?.enqueue(command as ProviderCommand); return { ok: {} }; } open( controller: ReadableStreamDefaultController< ProviderCommand >, ) { this.controller = controller; } cancel() { const promise = this.writable.close(); this.dispose(); return promise; } close() { this.controller?.close(); this.dispose(); return { ok: {} }; } dispose() { this.memory.unsubscribe(this); this.controller = undefined; this.sessions?.delete(this); this.sessions = null; } async invoke( { invocation, authorization }: UCAN>, ) { const acl = await this.getAcl(invocation.sub); const { error } = await Access.claim( invocation, authorization, this.memory.serviceDid(), acl, ); if (error) { logger.error( "auth-error", () => [ "Authorization error:", error, ", failed invocation:", invocation, ], ); return this.perform({ the: "task/return", of: `job:${refer(invocation)}` as InvocationURL< Reference> >, is: { error }, }); } const of = `job:${refer(invocation)}` as InvocationURL< Reference> >; switch (invocation.cmd) { case "/memory/query": { return this.perform({ the: "task/return", of, is: (await this.memory.query(invocation)) as Result< Selection, QueryError >, }); } case "/memory/graph/query": { // Use querySchemaWithTracker when subscribing to capture the schemaTracker // for incremental updates on subsequent commits if (invocation.args.subscribe) { // Pass existing sharedSchemaTracker to enable early termination when // traversing into docs that are already tracked by other subscriptions const trackerResult = await Memory.querySchemaWithTracker( this.memory as Memory.Memory, invocation, this.sharedSchemaTracker, ); if ("error" in trackerResult) { return this.perform({ the: "task/return", of, is: trackerResult, }); } const { selection } = trackerResult.ok; this.addSchemaSubscription(of, invocation, selection); this.memory.subscribe(this); // Filter out any known results if (invocation.args.excludeSent) { const space = invocation.sub; const factSelection = selection[space]; const factVersions = [...FactModule.iterate(factSelection)]; selection[space] = this.toSelection( this.filterKnownFacts(factVersions), ); } return this.perform({ the: "task/return", of, is: { ok: selection }, }); } // Non-subscribing queries use the regular querySchema const result = await this.memory.querySchema(invocation); // Filter out any known results if (result.ok !== undefined && invocation.args.excludeSent) { const space = invocation.sub; const factSelection = result.ok[space]; const factVersions = [...FactModule.iterate(factSelection)]; result.ok[space] = this.toSelection( this.filterKnownFacts(factVersions), ); } return this.perform({ the: "task/return", of, is: result, }); } case "/memory/transact": { logger.info( "server-transact", () => [ "Received transaction:", `space: ${invocation.sub}`, `changes:`, JSON.stringify(invocation.args.changes, null, 2), ], ); const result = await this.memory.transact(invocation); if (result.error) { logger.warn( "server-transact-error", () => [ "Transaction failed:", JSON.stringify(result.error, null, 2), ], ); } return this.perform({ the: "task/return", of, is: result, }); } case "/memory/query/subscribe": { const selector = ("select") in invocation.args ? invocation.args.select : invocation.args.selectSchema; this.channels.set( of, new Set( Subscription.channels(invocation.sub, selector), ), ); return this.memory.subscribe(this); } case "/memory/query/unsubscribe": { this.channels.delete(of); if (this.channels.size === 0) { this.memory.unsubscribe(this); } // End subscription call this.perform({ the: "task/return", of: invocation.args.source, is: { ok: {} }, }); // End unsubscribe call return this.perform({ the: "task/return", of, is: { ok: {} }, }); } default: { return { error: new RangeError( `Unknown command ${(invocation as Invocation).cmd}`, ), }; } } } async commit(commit: Commit) { // We should really only have one item, but it's technically legal to have // multiple transactions in the same commit, so iterate for ( const item of SelectionBuilder.iterate<{ is: Memory.CommitData }>(commit) ) { // We need to remove any classified results from our commit. // The schema subscription has a classification claim, but these don't. const redactedData = redactCommitData(item.value.is); if (Subscription.isTransactionReadOnly(redactedData.transaction)) { continue; } // First, check to see if any of our schema queries need to be notified // Any queries that lack access are skipped (with a console log) const schemaFacts = await this.getSchemaSubscriptionMatches( redactedData.transaction, ); // Send commits with revisions to commit log subscriptions // The client's startSynchronization() reads revisions to update its heap const commitJobIds: InvocationURL>[] = []; for (const [id, channels] of this.channels) { if (Subscription.match(redactedData.transaction, channels)) { commitJobIds.push(id); } } if (commitJobIds.length > 0) { // The client has a subscription to the space's commit log const enhancedCommit: EnhancedCommit = { commit: { [item.of]: { [item.the]: { [item.cause]: { is: redactedData } } }, } as Commit, revisions: this.filterKnownFacts(schemaFacts), }; for (const id of commitJobIds) { // this is sent to a standard subscription (application/commit+json) this.perform({ the: "task/effect", of: id, is: enhancedCommit, }); } } } return { ok: {} }; } private filterKnownFacts( factVersions: Revision[], ): Revision[] { // Filter out any known results const newFactsList = []; for (const fact of factVersions) { const factKey = this.toKey(fact); const previous = this.lastRevision.get(factKey); if (previous === undefined || previous < fact.since) { this.lastRevision.set(factKey, fact.since); newFactsList.push(fact); } } return newFactsList; } private toSelection(factVersions: Revision[]) { const selection: Memory.OfTheCause< { is?: Memory.JSONValue; since: number } > = {}; for (const fact of factVersions) { setRevision( selection, fact.of, fact.the, fact.cause.toString() as CauseString, { is: fact.is, since: fact.since, }, ); } return selection; } private formatAddress( space: Space, fv: Readonly, ) { return Subscription.formatAddress({ at: space, the: fv.the, of: fv.of }); } private toKey(fv: Readonly) { return `${fv.of}/${fv.the}`; } private addSchemaSubscription( of: JobId, invocation: SchemaQuery, _result: Selection, ) { // Check if this is a wildcard query (of: "_") // Wildcard queries can't benefit from incremental updates via schemaTracker const isWildcardQuery = this.isWildcardQuery(invocation); const subscription = new SchemaSubscription( invocation, isWildcardQuery, ); this.schemaChannels.set(of, subscription); // Note: lastRevision is updated by filterKnownFacts when facts are sent } /** * Check if a schema query contains any wildcard selectors (of: "_"). * Wildcard queries match based on type rather than specific document IDs. */ private isWildcardQuery( invocation: SchemaQuery, ): boolean { const selectSchema = invocation.args.selectSchema; for (const of of Object.keys(selectSchema)) { if (of === "_") return true; } return false; } /** * For wildcard queries, find changed docs that match the type pattern. * Returns affected docs with the schema from the wildcard selector. */ private findAffectedDocsForWildcard( changedDocs: Set, invocation: SchemaQuery, ): Array<{ docKey: string; schemas: Set }> { const affected: Array< { docKey: string; schemas: Set } > = []; const selectSchema = invocation.args.selectSchema; // Get the wildcard selector's type patterns const wildcardSelector = selectSchema["_"]; if (!wildcardSelector) return affected; // Build a map of type -> schemas for matching const typeSchemas = new Map>(); for (const [the, causes] of Object.entries(wildcardSelector)) { const schemas = new Set(); for (const schema of Object.values(causes)) { schemas.add(schema as SchemaPathSelector); } if (schemas.size > 0) { typeSchemas.set(the, schemas); } } // Match changed docs against type patterns for (const docKey of changedDocs) { const slashIndex = docKey.indexOf("/"); if (slashIndex === -1) continue; const docType = docKey.slice(slashIndex + 1); // Check if this type matches a wildcard pattern const schemas = typeSchemas.get(docType) ?? typeSchemas.get("_"); if (schemas && schemas.size > 0) { affected.push({ docKey, schemas: new Set(schemas) }); } } return affected; } /** * Incrementally find schema subscription matches after a transaction. * * For wildcard queries (of: "_"): Match changed docs against type pattern. * For specific document queries: Use schemaTracker to find affected docs. * * Returns all facts that match any subscription's criteria. */ private async getSchemaSubscriptionMatches( transaction: Transaction, ): Promise[]> { const space = transaction.sub; // Early exit if no schema subscriptions if (this.schemaChannels.size === 0) { return []; } // Extract changed document keys from transaction const changedDocs = this.extractChangedDocKeys(transaction); if (changedDocs.size === 0) { return []; } // Get access to the space session for evaluating documents const mountResult = await Memory.mount( this.memory as Memory.Memory, space, ); if (mountResult.error) { throw new Error(`Failed to mount space ${space}: ${mountResult.error}`); } const spaceSession = mountResult.ok as unknown as SpaceSession; // Find affected docs using the shared schemaTracker (for non-wildcard) const sharedAffectedDocs = this.findAffectedDocs( changedDocs, this.sharedSchemaTracker, ); // Process shared affected docs const { newFacts } = this.processIncrementalUpdate( spaceSession, sharedAffectedDocs, space, ); // Add facts from wildcard subscriptions for (const [_jobId, subscription] of this.schemaChannels) { if (subscription.isWildcardQuery) { const wildcardDocs = this.findAffectedDocsForWildcard( changedDocs, subscription.invocation, ); if (wildcardDocs.length > 0) { const wildcardResult = this.processIncrementalUpdate( spaceSession, wildcardDocs, space, ); for (const [key, fact] of wildcardResult.newFacts) { newFacts.set(key, fact); } } } } return [...newFacts.values()]; } /** * Extract document keys (id/type format) from a transaction's changes. */ private extractChangedDocKeys( transaction: Transaction, ): Set { const changedDocs = new Set(); for (const fact of SelectionBuilder.iterate(transaction.args.changes)) { if (fact.value !== true) { // Format matches what schemaTracker uses: "id/type" (from BaseObjectManager.toKey) changedDocs.add(`${fact.of}/${fact.the}`); } } return changedDocs; } /** * Find docs in changedDocs that are tracked by the subscription's schemaTracker. * Returns list of (docKey, schemas) pairs. */ private findAffectedDocs( changedDocs: Set, schemaTracker: MapSet, ): Array<{ docKey: string; schemas: Set }> { const affected: Array< { docKey: string; schemas: Set } > = []; for (const docKey of changedDocs) { const schemas = schemaTracker.get(docKey); if (schemas && schemas.size > 0) { affected.push({ docKey, schemas: new Set(schemas) }); } } return affected; } /** * Process incremental update given affected docs. * Re-evaluates each affected doc with its schemas and follows new links. * Uses the shared schemaTracker to track discovered links. */ private processIncrementalUpdate( spaceSession: SpaceSession, affectedDocs: Array<{ docKey: string; schemas: Set }>, space: Space, ): { newFacts: Map> } { const newFacts = new Map>(); // Note: classification is not used here since we're processing across all subscriptions // TODO(ubik2,seefeld): Make this a per-session classification const classification = undefined; // Collect all unique docKeys that need to be fetched // Start with the initially affected docs const docsToFetch = new Set(affectedDocs.map((d) => d.docKey)); // First pass: Remove all affected (docKey, schema) pairs from schemaTracker. // This allows the traverser to re-traverse them and discover any new links. // We do this in a separate pass so that if doc A links to doc B, and both // are affected, we don't re-traverse B while evaluating A only to remove // and re-evaluate B again. const toReEvaluate: Array< { docId: string; docType: string; schema: SchemaPathSelector } > = []; for (const { docKey, schemas } of affectedDocs) { const { docId, docType } = this.parseDocKey(docKey); if (docId === null) continue; for (const schema of schemas) { this.sharedSchemaTracker.deleteValue(docKey, schema); toReEvaluate.push({ docId, docType, schema }); } } // Second pass: Re-evaluate each affected doc with its schema // evaluateDocumentLinks does a full traversal and finds all linked documents for (const { docId, docType, schema } of toReEvaluate) { const result = evaluateDocumentLinks( spaceSession, { id: docId, type: docType }, schema, classification, this.sharedSchemaTracker, ); // Collect newly discovered docs to fetch if (result !== null) { for (const { docKey: newDocKey } of result.newLinks) { docsToFetch.add(newDocKey); } } } // Fetch each unique doc once and add to results for (const docKey of docsToFetch) { const { docId, docType } = this.parseDocKey(docKey); if (docId === null) continue; const fact = selectFact(spaceSession, { of: docId as `${string}:${string}`, the: docType as `${string}/${string}`, }); if (!fact || fact.is === undefined) { // Document doesn't exist yet - skip continue; } const address = this.formatAddress(space, fact); newFacts.set(address, { of: fact.of, the: fact.the, cause: causeFromString(fact.cause), is: fact.is, since: fact.since, }); } return { newFacts }; } /** Parse docKey (format "id/type") back to id and type */ private parseDocKey( docKey: string, ): { docId: string | null; docType: string } { // Note: type can contain slashes (e.g., "application/json"), so we split on the FIRST slash // The id is always in the form "of:HASH" which doesn't contain slashes const slashIndex = docKey.indexOf("/"); if (slashIndex === -1) { return { docId: null, docType: "" }; } return { docId: docKey.slice(0, slashIndex), docType: docKey.slice(slashIndex + 1), }; } private async getAcl(space: MemorySpace): Promise { try { const result = await Memory.mount(this.memory as Memory.Memory, space); if (result.error) { logger.warn( "acl-mount-error", () => ["Failed to mount space for ACL lookup:", result.error], ); return undefined; } const spaceSession = result.ok as unknown as { subject: MemorySpace; store: any; }; const aclFact = selectFact(spaceSession, { the: ACL_TYPE, of: space, }); if ( !aclFact || !aclFact.is || typeof aclFact.is !== "object" || !("value" in aclFact.is) ) { return undefined; } if (isACL(aclFact.is.value)) { return aclFact.is.value; } else { logger.warn( "acl-format-error", () => ["Invalid ACL format in space", space, ":", aclFact.is], ); return undefined; } } catch (error) { logger.error("acl-error", () => ["Error retrieving ACL:", error]); return undefined; } } } export const close = ({ memory }: Session) => memory.close(); export const fetch = async (session: Session, request: Request) => { if (request.method === "PATCH") { return await patch(session, request); } else if (request.method === "POST") { return await post(session, request); } else { return new Response(null, { status: 501 }); } }; export const patch = async (session: Session, request: Request) => { try { const transaction = await request.json() as Transaction; const result = await session.memory.transact(transaction); const body = JSON.stringify(result); const status = result.ok ? 200 : result.error.name === "ConflictError" ? 409 : 503; return new Response(body, { status, headers: { "Content-Type": "application/json", }, }); } catch (cause) { const error = cause as Partial; return new Response( JSON.stringify({ error: { name: error?.name ?? "Error", message: error?.message ?? "Unable to parse request body", stack: error?.stack ?? "", }, }), { status: 400, headers: { "Content-Type": "application/json", }, }, ); } }; export const post = async (session: Session, request: Request) => { try { const selector = await request.json() as Query; const result = await session.memory.query(selector); const body = JSON.stringify(result); const status = result.ok ? 200 : 404; return new Response(body, { status, headers: { "Content-Type": "application/json", }, }); } catch (cause) { const error = cause as Partial; return new Response( JSON.stringify({ error: { name: error?.name ?? "Error", message: error?.message ?? "Unable to parse request body", stack: error?.stack ?? "", }, }), { status: 400, headers: { "Content-Type": "application/json", }, }, ); } };