import type { Abilities, Authorization, AuthorizationError, Await, CauseString, Clock, Command, ConnectionError, ConsumerCommandFor, ConsumerCommandInvocation, ConsumerEffectFor, ConsumerInvocationFor, ConsumerResultFor, DID, EnhancedCommit, Fact, FactSelection, InferOf, Invocation, InvocationURL, JSONValue, MemorySpace, MIME, OfTheCause, Proto, Protocol, ProviderChannel, ProviderCommand, Query, QueryArgs, QueryError, Reference, Result, Revision, SchemaPathSelector, SchemaQuery, SchemaQueryArgs, SchemaSelector, Seconds, Select, SelectAll, Selection, Selector, Signer, Transaction, TransactionResult, UCAN, URI, UTCUnixTimestampInSeconds, } from "./interface.ts"; import { fromJSON, refer } from "./reference.ts"; import * as Socket from "./socket.ts"; import { getSelectorRevision, iterate, setEmptyObj, setRevision, } from "./selection.ts"; import * as FactModule from "./fact.ts"; import * as Access from "./access.ts"; import * as Subscription from "./subscription.ts"; import { toStringStream } from "./ucan.ts"; import { fromStringStream } from "./receipt.ts"; import * as Settings from "./settings.ts"; export * from "./interface.ts"; import { toRevision } from "./commit.ts"; import { SchemaNone } from "./schema.ts"; import { getLogger } from "@commontools/utils/logger"; const logger = getLogger("memory-consumer", { enabled: true, level: "info", }); export const connect = ({ address, as, clock, ttl, }: { address: URL; as: Signer; clock?: Clock; ttl?: Seconds; }) => { const { readable, writable } = Socket.from( new WebSocket(address), ); const invocations = toStringStream(); invocations.readable.pipeTo(writable); return open({ as, clock, ttl, session: { writable: invocations.writable, readable: readable.pipeThrough(fromStringStream()), }, }); }; export const open = ({ as, session, clock, ttl, }: { as: Signer; session: ProviderChannel; clock?: Clock; ttl?: Seconds; }) => { const consumer = create({ as, clock, ttl }); session.readable.pipeThrough(consumer).pipeTo( session.writable as WritableStream, ); return consumer; }; export const create = ( { as, clock, ttl }: { as: Signer; clock?: Clock; ttl?: Seconds }, ) => new MemoryConsumerSession(as, clock, ttl); class MemoryConsumerSession< Space extends MemorySpace, MemoryProtocol extends Protocol, > extends TransformStream< ProviderCommand, UCAN> > implements MemoryConsumer { controller: | TransformStreamDefaultController< UCAN> > | undefined; invocations: Map< InvocationURL>, Job, MemoryProtocol> > = new Map(); // Promises that are resolved when the message is at the front of the queue private sendQueue: PromiseWithResolvers[] = []; constructor( public as: Signer, public clock: Clock = Settings.clock, public ttl: Seconds = Settings.ttl, ) { let controller: | undefined | TransformStreamDefaultController< UCAN> >; super({ start: (control) => { controller = control as typeof this.controller; }, transform: (command) => { try { return this.receive(command as ProviderCommand); } catch (error) { logger.error( "stream-error", () => ["TransformStream transform error:", error], ); logger.error( "stream-error", () => ["Failed command:", JSON.stringify(command)], ); throw error; } }, flush: () => { try { return this.close(); } catch (error) { logger.error( "stream-error", () => ["TransformStream flush error:", error], ); throw error; } }, }); this.controller = controller; } send(command: UCAN>) { this.controller?.enqueue(command); } receive(command: ProviderCommand) { const id = command.of; if (command.the === "task/return") { const invocation = this.invocations.get(id); if ( invocation !== undefined && !invocation.return(command.is as NonNullable) ) { this.invocations.delete(id); } } // If it is an effect it can be for one specific subscription, yet we may // have other subscriptions that will be affected. // We can't just send one message over, since the client needs to know // about which extra objects are needed for that specific subscription. // There's a chance we'll send the same object over more than once because // of this (in particular, this is almost guaranteed by the cache that // maintains a subscription to every object in the cache). // For now, I think this is the best approach, but we can use the since // fields to remove these later. else if (command.the === "task/effect") { const invocation = this.invocations.get(id); invocation?.perform(command.is); } } invoke( command: ConsumerCommandFor, ) { const invocation = ConsumerInvocation.create( this.as.did(), command as Command>, this.clock.now(), this.ttl, ); this.execute(invocation); return invocation; } getInvocation( command: ConsumerCommandFor, ) { const invocation = ConsumerInvocation.create( this.as.did(), command as Command>, this.clock.now(), this.ttl, ); return invocation; } async execute( invocation: ConsumerInvocation, ) { const queueEntry = Promise.withResolvers(); // put it in the queue immediately -- messages are sent in the order // they are executed, regardless of authorization timing this.sendQueue.push(queueEntry); const authorizationResult = await Access.authorize([ invocation.refer(), ], this.as); // If we're not at the front of the queue, wait until we are if (queueEntry !== this.sendQueue[0]) { await queueEntry.promise; // We can be resolved, then rejected (noop) via cancel. if (this.sendQueue.length === 0 || queueEntry !== this.sendQueue[0]) { throw new Error("session cancel"); } } try { this.executeAuthorized(authorizationResult, invocation); } finally { // if that throws, we still want to unblock the queue this.sendQueue.shift(); if (this.sendQueue.length > 0) { this.sendQueue[0].resolve(); } } } private executeAuthorized< Ability extends string, Access extends Reference[], >( authorizationResult: Result, Error>, invocation: ConsumerInvocation, ) { const { error, ok: authorization } = authorizationResult; if (error) { invocation.return({ error }); } else { const url = invocation.toURL(); const pending = this.invocations.get(url); if (pending) { invocation.return( pending.promise as unknown as ConsumerResultFor< Ability, MemoryProtocol >, ); } else { this.invocations.set( url, invocation as unknown as Job, ); this.send( { invocation: invocation.source, authorization } as unknown as UCAN< ConsumerCommandInvocation >, ); } } } close() { this.cancel(); this.controller?.terminate(); } cancel() { for (const queueEntry of [...this.sendQueue]) { queueEntry.reject(new Error("session cancel")); } this.sendQueue = []; } abort(invocation: InvocationHandle) { this.invocations.delete(invocation.toURL()); } mount( space: Subject, ): MemorySpaceConsumerSession { return new MemorySpaceConsumerSession( space, this as unknown as MemoryConsumerSession>, ); } } export interface MemorySession { as: Signer; mount(space: Subject): MemorySpaceSession; } export interface MemoryConsumer extends MemorySession, TransformStream< ProviderCommand, UCAN> > { as: Signer; } export interface MemorySpaceSession { as: Signer; transact(source: Transaction["args"]): TransactionResult; query( source: Query["args"] | SchemaQuery["args"], ): QueryView>; } export type { QueryView }; class MemorySpaceConsumerSession implements MemorySpaceSession { as: Signer; constructor( public space: Space, public session: MemoryConsumerSession>, ) { this.as = session.as; } transact(source: Transaction["args"]) { return this.session.invoke({ cmd: "/memory/transact", sub: this.space, args: source, }); } query( source: Query["args"] | SchemaQuery["args"], ): QueryView> { const selectSchema = ("select" in source) ? MemorySpaceConsumerSession.asSelectSchema(source) : source; const query = this.session.invoke({ cmd: "/memory/graph/query" as const, sub: this.space, args: selectSchema, }); return QueryView.create(this.session, query); } private static asSelectSchema(queryArg: QueryArgs): SchemaQueryArgs { const selectSchema: Select< URI, Select> > = {}; for (const [of, attributes] of Object.entries(queryArg.select)) { const entityEntry: Select> = {}; selectSchema[of as URI | SelectAll] = entityEntry; let attrEntries = Object.entries(attributes); // A Selector may not have a "the", but SchemaSelector needs all three levels if (attrEntries.length === 0) { attrEntries = [["_", {}]]; } for (const [the, causes] of attrEntries) { const attributeEntry: Select = {}; entityEntry[the as MIME | SelectAll] = attributeEntry; // A Selector may not have a cause, but SchemaSelector needs all three levels let causeEntries = Object.entries(causes); if (causeEntries.length === 0) { causeEntries = [["_", {}]]; } for (const [cause, selector] of causeEntries) { const causeEntry: SchemaPathSelector = { path: [], schemaContext: SchemaNone, ...selector.is ? { is: selector.is } : {}, }; attributeEntry[cause as CauseString | SelectAll] = causeEntry; } } } return { selectSchema: selectSchema, ...queryArg.since ? { since: queryArg.since } : {}, }; } } interface InvocationHandle { toURL(): InvocationURL; } interface Job { promise: Promise>; // Return false to remove listener return(input: Await>): boolean; perform(effect: ConsumerEffectFor): void; } class ConsumerInvocation { promise: Promise>; return: (input: ConsumerResultFor) => boolean; source: ConsumerInvocationFor; #reference: Reference; static create( as: DID, { cmd, sub, args, nonce }: Command>, time: UTCUnixTimestampInSeconds, ttl: Seconds, ) { return new this({ cmd, sub, args, iss: as, prf: [], iat: time, exp: time + ttl, ...(nonce ? { nonce } : undefined), } as ConsumerInvocationFor); } constructor(source: ConsumerInvocationFor) { // JSON.parse(JSON.stringify) is used to strip `undefined` values and ensure consistent serialization this.source = JSON.parse(JSON.stringify(source)); this.#reference = refer(this.source); let receive; this.promise = new Promise>( (resolve) => (receive = resolve), ); this.return = receive as typeof receive & NonNullable; } refer() { return this.#reference; } toURL() { return `job:${this.refer()}` as InvocationURL; } then( onResolve: ( value: ConsumerResultFor, ) => T | PromiseLike, onReject: (reason: unknown) => X | Promise, ) { return this.promise.then(onResolve, onReject); } get sub() { return this.source.sub; } get meta() { return this.source.meta; } get args() { return this.source.args; } perform(_effect: ConsumerEffectFor) {} } class QueryView< Space extends MemorySpace, MemoryProtocol extends Protocol, > { static create< Space extends MemorySpace, MemoryProtocol extends Protocol, >( session: MemoryConsumerSession, invocation: | ConsumerInvocation<"/memory/query", MemoryProtocol> | ConsumerInvocation<"/memory/graph/query", MemoryProtocol>, ): QueryView { const view: QueryView = new QueryView( session, invocation, // FIXME: typing // deno-lint-ignore no-explicit-any invocation.promise.then((result: any) => { if (result.error) { return result; } else { view.selection = result.ok as Selection>; return { ok: view }; } }), ); return view; } selection: Selection; selector: Selector | SchemaSelector; constructor( public session: MemoryConsumerSession, public invocation: | ConsumerInvocation<"/memory/query", MemoryProtocol> | ConsumerInvocation<"/memory/graph/query", MemoryProtocol>, public promise: Promise< Result< QueryView, QueryError | AuthorizationError | ConnectionError > >, ) { invocation.perform = ( effect: | ConsumerEffectFor<"/memory/query", MemoryProtocol> | ConsumerEffectFor<"/memory/graph/query", MemoryProtocol>, ) => this.perform(effect as Selection); this.selection = { [this.space]: {} } as Selection>; this.selector = ("select" in this.invocation.args) ? (this.invocation.args as { select?: Selector }).select as Selector : (this.invocation.args as { selectSchema?: Selector }) .selectSchema as SchemaSelector; } return(selection: Selection>) { this.selection = selection; return !("subscribe" in this.selector && this.selector.subscribe === true); } perform(effect: Selection) { const differential = effect[this.space]; this.integrate(differential); return { ok: effect }; } then( onResolve: ( value: Result< QueryView, QueryError | AuthorizationError | ConnectionError >, ) => T | PromiseLike, onReject: (reason: unknown) => X | Promise, ) { return this.promise.then(onResolve, onReject); } get space(): Space { return this.invocation.sub as MemorySpace as Space; } integrate(differential: FactSelection) { const selection = this.selection[this.space]; for (const change of iterate(differential)) { setRevision(selection, change.of, change.the, change.cause, change.value); } } get facts(): Revision[] { return [...FactModule.iterate(this.selection[this.space])]; } // Get the facts returned by the query, together with the associated // SchemaPathSelector used to query get schemaFacts(): [Revision, SchemaPathSelector | undefined][] { return this.facts.map((fact) => [fact, this.getSchemaPathSelector(fact)]); } subscribe() { const subscription = new QuerySubscriptionInvocation(this); this.session.execute(subscription); return subscription; } // Get the schema context used to fetch the specified fact. // If the fact was included from another fact, it will not have a schemaContext. getSchemaPathSelector(fact: Revision): SchemaPathSelector | undefined { const factSelector = this.selector as SchemaSelector; return getSelectorRevision(factSelector, fact.of, fact.the); } } class QuerySubscriptionInvocation< Space extends MemorySpace, MemoryProtocol extends Protocol, > extends ConsumerInvocation<"/memory/query/subscribe", MemoryProtocol> { readable: ReadableStream>; controller: | undefined | ReadableStreamDefaultController>; patterns: { the?: MIME; of?: URI; cause?: CauseString }[]; selection: Selection; constructor(public query: QueryView) { super({ ...query.invocation.source, cmd: "/memory/query/subscribe", }); this.readable = new ReadableStream>({ start: (controller) => this.open(controller), cancel: () => this.close().then(), }); this.selection = query.selection; this.patterns = [...Subscription.fromSelector(this.selector)]; } get space() { return this.query.space; } get selector() { return this.query.selector; } open(controller: ReadableStreamDefaultController>) { this.controller = controller; } async close() { this.controller = undefined; this.query.session.abort(this); const unsubscribe = this.query.session.invoke({ cmd: "/memory/query/unsubscribe", sub: this.sub, args: { source: this.toURL() }, }); await unsubscribe; } // This function is called for both subscriptions to the commit log as well as subscriptions // to individual docs. override perform(commit: EnhancedCommit) { const selection = this.selection[this.space]; // Here we will collect subset of changes that match the query. const differential: OfTheCause<{ is?: JSONValue; since: number }> = {}; const fact = toRevision(commit.commit); const { the, of, is } = fact; const cause = fact.cause.toString() as CauseString; const { transaction, since } = is; const matchCommit = this.patterns.some((pattern) => (!pattern.of || pattern.of === of) && (!pattern.the || pattern.the === the) && (!pattern.cause || pattern.cause === cause) ); if (matchCommit) { // Update the main application/commit+json record for the space setRevision(differential, of, the, cause, { is, since }); } for (const [k1, attributes] of Object.entries(transaction.args.changes)) { const of = k1 as URI; for (const [k2, changes] of Object.entries(attributes)) { const the = k2 as MIME; const causeEntries = Object.entries(changes); if (causeEntries.length === 0) { // A classified object will not have a cause/change pair const matchDoc = this.patterns.some((pattern) => (!pattern.of || pattern.of === of) && (!pattern.the || pattern.the === the) && !pattern.cause ); if (matchDoc) { setEmptyObj(differential, of, the); } } else { const [[k3, change]] = causeEntries; const cause = k3 as CauseString; if (change !== true) { const state = Object.entries( selection?.[of]?.[the] ?? {}, ); const [current] = state.length > 0 ? state[0] : []; if (cause !== current) { const matchDoc = this.patterns.some((pattern) => (!pattern.of || pattern.of === of) && (!pattern.the || pattern.the === the) && (!pattern.cause || pattern.cause === cause) ); if (matchDoc) { const value = change.is ? { is: change.is, since: since } : { since: since }; setRevision(differential, of, the, cause, value); } } } } } } if (Object.keys(differential).length !== 0) { this.query.integrate(differential); } this.integrate(commit); // This is a bit strange, but the revisions in here aren't proper // They've lost their Reference methods, so recreate them commit.revisions.forEach((item) => { item.cause = fromJSON(JSON.parse(JSON.stringify(item.cause))); }); return { ok: {} }; } integrate(commit: EnhancedCommit) { this.controller?.enqueue(commit); } getReader() { return this.readable.getReader(); } async *[Symbol.asyncIterator]() { const reader = this.getReader(); try { while (true) { const next = await reader.read(); if (next.done) { break; } else { yield next.value; } } } finally { reader.releaseLock(); } } }