import { refer } from "merkle-reference/json"; import { getLogger } from "@commontools/utils/logger"; import { isObject, isRecord, type Mutable } from "@commontools/utils/types"; import { vdomSchema } from "./schemas.ts"; import { type Frame, isModule, isOpaqueRef, isRecipe, isStreamValue, type JSONSchema, type JSONValue, type Module, NAME, type NodeFactory, type Recipe, TYPE, UI, unsafe_materializeFactory, unsafe_originalRecipe, } from "./builder/types.ts"; import { popFrame, pushFrameFromCause, recipeFromFrame, } from "./builder/recipe.ts"; import { type Cell, isCell } from "./cell.ts"; import { type Action } from "./scheduler.ts"; import { diffAndUpdate } from "./data-updating.ts"; import { findAllWriteRedirectCells, unsafe_noteParentOnRecipes, unwrapOneLevelAndBindtoDoc, } from "./recipe-binding.ts"; import { resolveLink } from "./link-resolution.ts"; import { areNormalizedLinksSame, createSigilLinkFromParsedLink, isCellLink, isSigilLink, isWriteRedirectLink, type NormalizedFullLink, parseLink, } from "./link-utils.ts"; import { deepEqual } from "./path-utils.ts"; import { sendValueToBinding } from "./recipe-binding.ts"; import { type AddCancel, type Cancel, useCancelGroup } from "./cancel.ts"; import { LINK_V1_TAG, SigilLink } from "./sigil-types.ts"; import type { Runtime } from "./runtime.ts"; import type { IExtendedStorageTransaction, IStorageSubscription, MemorySpace, URI, } from "./storage/interface.ts"; import { ignoreReadForScheduling, markReadAsPotentialWrite, } from "./scheduler.ts"; import { FunctionCache } from "./function-cache.ts"; import { isRawBuiltinResult, type RawBuiltinReturnType } from "./module.ts"; import "./builtins/index.ts"; import { isCellResult } from "./query-result-proxy.ts"; const logger = getLogger("runner"); export class Runner { readonly cancels = new Map<`${MemorySpace}/${URI}`, Cancel>(); private allCancels = new Set(); private functionCache = new FunctionCache(); // Map whose key is the result cell's full key, and whose values are the // recipes as strings private resultRecipeCache = new Map<`${MemorySpace}/${URI}`, string>(); constructor(readonly runtime: Runtime) { this.runtime.storageManager.subscribe(this.createStorageSubscription()); } /** * Creates and returns a new storage subscription. * * This will be used to remove the cached recipe information when the result * cell changes. As a result, if we are scheduled, we will run that recipe * and regenerate the result. * * @returns A new IStorageSubscription instance */ private createStorageSubscription(): IStorageSubscription { return { next: (notification) => { const space = notification.space; if ("changes" in notification) { for (const change of notification.changes) { if (change.address.type === "application/json") { this.resultRecipeCache.delete(`${space}/${change.address.id}`); } } } else if (notification.type === "reset") { // copy keys, since we'll mutate the collection while iterating const cacheKeys = [...this.resultRecipeCache.keys()]; cacheKeys.filter((key) => key.startsWith(`${notification.space}/`)) .forEach((key) => this.resultRecipeCache.delete(key)); } return { done: false }; }, }; } /** * Prepare a charm for running by creating/updating its process and result * cells, registering the recipe, and applying defaults/arguments. * This does not schedule any nodes. Use start() to schedule execution. * If the charm is already running and the recipe changes, it will stop the * charm. */ setup( tx: IExtendedStorageTransaction | undefined, recipeFactory: NodeFactory, argument: T, resultCell: Cell, ): Promise>; setup( tx: IExtendedStorageTransaction | undefined, recipe: Recipe | Module | undefined, argument: T, resultCell: Cell, ): Promise>; setup( providedTx: IExtendedStorageTransaction | undefined, recipeOrModule: Recipe | Module | undefined, argument: T, resultCell: Cell, ): Promise> { if (providedTx) { this.setupInternal(providedTx, recipeOrModule, argument, resultCell); return Promise.resolve(resultCell); } else { // Ignore errors after retrying for now, as outside the tx, we'll see the // latest true value, it just lost the ract against someone else changing // the recipe or argument. Correct action is anyhow similar to what would // have happened if the write succeeded and was immediately overwritten. return this.runtime.editWithRetry((tx) => { this.setupInternal(tx, recipeOrModule, argument, resultCell); }).then(() => resultCell); } } /** * Internal setup that returns whether scheduling is required. */ private setupInternal( providedTx: IExtendedStorageTransaction | undefined, recipeOrModule: Recipe | Module | undefined, argument: T, resultCell: Cell, ): { resultCell: Cell; recipe?: Recipe; processCell?: Cell; needsStart: boolean; } { const tx = providedTx ?? this.runtime.edit(); type ProcessCellData = { [TYPE]: string; spell?: SigilLink; argument?: T; internal?: JSONValue; resultRef: SigilLink; }; let processCell: Cell; const sourceCell = resultCell.withTx(tx).getSourceCell(); if (sourceCell !== undefined) { processCell = sourceCell as Cell; } else { processCell = this.runtime.getCell( resultCell.space, resultCell, // Cause undefined, tx, ); resultCell.withTx(tx).setSourceCell(processCell); } logger.debug("cell-info", () => [ `resultCell: ${resultCell.getAsNormalizedFullLink().id}`, `processCell: ${ resultCell.withTx(tx).getSourceCell()?.getAsNormalizedFullLink().id }`, ]); let recipeId: string | undefined; const previousRecipeId = processCell.withTx(tx).key(TYPE).getRaw({ meta: ignoreReadForScheduling, }); if (!recipeOrModule && previousRecipeId) { recipeId = previousRecipeId; recipeOrModule = this.runtime.recipeManager.recipeById(recipeId!); if (!recipeOrModule) throw new Error(`Unknown recipe: ${recipeId}`); } else if (!recipeOrModule) { console.warn( "No recipe provided and no recipe found in process doc. Not running.", ); return { resultCell, needsStart: false }; } let recipe: Recipe; // If this is a module, not a recipe, wrap it in a recipe that just runs, // passing arguments in unmodified and passing all results through as is if (isModule(recipeOrModule)) { const module = recipeOrModule as Module; recipeId ??= this.runtime.recipeManager.registerRecipe(module); recipe = { argumentSchema: module.argumentSchema ?? {}, resultSchema: module.resultSchema ?? {}, result: { $alias: { path: ["internal"] } }, nodes: [ { module, inputs: { $alias: { path: ["argument"] } }, outputs: { $alias: { path: ["internal"] } }, }, ], } satisfies Recipe; } else { recipe = recipeOrModule as Recipe; } recipeId ??= this.runtime.recipeManager.registerRecipe(recipe); this.runtime.recipeManager.saveRecipe({ recipeId, space: resultCell.space, }, tx); // If the bindings are a cell, doc or doc link, convert them to an alias if (isCellLink(argument)) { argument = createSigilLinkFromParsedLink( parseLink(argument), { base: processCell, includeSchema: true, overwrite: "redirect" }, ) as T; } const key = this.getDocKey(resultCell); const alreadyRunning = this.cancels.has(key); if (alreadyRunning) { // If it's already running and no new recipe or argument are given, // we are just returning the result doc if (argument === undefined && recipeId === previousRecipeId) { return { resultCell, needsStart: false }; } if (previousRecipeId === recipeId) { // If the recipe is the same, but argument is different, just update the // argument without stopping diffAndUpdate( this.runtime, tx, processCell.key("argument").getAsNormalizedFullLink(), argument, processCell.getAsNormalizedFullLink(), ); return { resultCell, needsStart: false }; } // Recipe changed - let the $TYPE sink detect the change and handle // stop() + start(). Don't call stop() here as it would cancel the sink. } // Walk the recipe's schema and extract all default values const defaults = extractDefaultValues(recipe.argumentSchema) as Partial; // Important to use DeepCopy here, as the resulting object will be modified! const previousInternal = processCell.key("internal").getRaw({ meta: ignoreReadForScheduling, }); const internal: JSONValue = Object.assign( {}, cellAwareDeepCopy( (defaults as unknown as { internal: JSONValue })?.internal, ), cellAwareDeepCopy( isRecord(recipe.initial) && isRecord(recipe.initial.internal) ? recipe.initial.internal : {}, ), isRecord(previousInternal) ? previousInternal : {}, ); // Still necessary until we consistently use schema for defaults. // Only do it on first load. if ( !processCell.key("argument").getRaw({ meta: ignoreReadForScheduling }) ) { argument = mergeObjects(argument as any, defaults); } processCell.withTx(tx).setRaw({ ...processCell.getRaw({ meta: ignoreReadForScheduling }), [TYPE]: recipeId || "unknown", resultRef: (recipe.resultSchema !== undefined ? resultCell.asSchema(recipe.resultSchema).getAsLink({ base: processCell, includeSchema: true, }) : resultCell.getAsLink({ base: processCell, })), internal, ...(recipeId !== undefined) ? { spell: getSpellLink(recipeId) } : {}, }); if (argument) { diffAndUpdate( this.runtime, tx, processCell.key("argument").getAsNormalizedFullLink(), argument, processCell.getAsNormalizedFullLink(), ); } // Send "query" to results to the result doc only on initial run or if // recipe changed. This preserves user modifications like renamed charms. let result = unwrapOneLevelAndBindtoDoc( recipe.result as R, processCell, ); const previousResult = resultCell.withTx(tx).getRaw({ meta: ignoreReadForScheduling, }); if (isRecord(previousResult) && previousResult[NAME]) { result = { ...result, [NAME]: previousResult[NAME] }; } if (!deepEqual(result, previousResult)) { resultCell.withTx(tx).setRaw(result); } // [unsafe closures:] For recipes from closures, add a materialize factory if (recipe[unsafe_originalRecipe]) { recipe[unsafe_materializeFactory] = (tx: any) => (path: readonly PropertyKey[]) => processCell.getAsQueryResult(path as PropertyKey[], tx); } // Discover and cache all JavaScript functions in the recipe before start this.discoverAndCacheFunctions(recipe, new Set()); return { resultCell, recipe, processCell, needsStart: true }; } /** * Start scheduling nodes for a previously set up charm. * If already started, this is a no-op. * * Returns a Promise that resolves to true on success, or rejects with an error. * Runs synchronously when data is available (important for tests). */ start(resultCell: Cell): Promise { const key = this.getDocKey(resultCell); if (this.cancels.has(key)) return Promise.resolve(true); // Already started // Sync result cell if empty, then continue if (resultCell.getRaw() === undefined) { return Promise.resolve(resultCell.sync()).then(() => this.doStart(resultCell) ); } // Data available - run synchronously return this.doStart(resultCell); } private doStart(resultCell: Cell): Promise { const key = this.getDocKey(resultCell); // Check again after potential sync if (this.cancels.has(key)) return Promise.resolve(true); // Subpath cells (created via .key()) have nothing to start - the parent // charm is responsible for running. Just return success. const link = resultCell.getAsNormalizedFullLink(); if (link.path.length > 0) { return Promise.resolve(true); } // No process cell means this is just data, not a charm. Nothing to start. const processCell = resultCell.getSourceCell(); if (!processCell) { return Promise.resolve(true); } // Create cancel group early - before the $TYPE sink const [cancel, addCancel] = useCancelGroup(); this.cancels.set(key, cancel); this.allCancels.add(cancel); // Track recipe ID and node cancellation let currentRecipeId: string | undefined; let cancelNodes: Cancel | undefined; // Helper to resolve module to recipe const resolveToRecipe = (recipeOrModule: Recipe | Module): Recipe => { if (isModule(recipeOrModule)) { const module = recipeOrModule as Module; return { argumentSchema: module.argumentSchema ?? {}, resultSchema: module.resultSchema ?? {}, result: { $alias: { path: ["internal"] } }, nodes: [ { module, inputs: { $alias: { path: ["argument"] } }, outputs: { $alias: { path: ["internal"] } }, }, ], } satisfies Recipe; } return recipeOrModule as Recipe; }; // Helper to instantiate nodes for a recipe const instantiateRecipe = (recipe: Recipe) => { // Create new cancel group for nodes const [nodeCancel, addNodeCancel] = useCancelGroup(); cancelNodes = nodeCancel; addCancel(nodeCancel); // Instantiate nodes this.discoverAndCacheFunctions(recipe, new Set()); const tx = this.runtime.edit(); try { for (const node of recipe.nodes) { this.instantiateNode( tx, node.module, node.inputs, node.outputs, processCell, addNodeCancel, recipe, ); } } finally { tx.commit(); } }; // Helper to set up the $TYPE watcher const setupTypeWatcher = () => { const typeCell = processCell.key(TYPE); addCancel( typeCell.sink((typeValue) => { const newRecipeId = typeValue as unknown as string | undefined; if (!newRecipeId) return; if (newRecipeId === currentRecipeId) return; // No change // Recipe changed - cancel previous nodes and re-instantiate cancelNodes?.(); currentRecipeId = newRecipeId; const resolved = this.runtime.recipeManager.recipeById(newRecipeId); if (!resolved) { // Async load this.runtime.recipeManager .loadRecipe(newRecipeId, resultCell.space) .then((loaded) => { if (currentRecipeId !== newRecipeId) return; instantiateRecipe(resolveToRecipe(loaded)); }); return; } instantiateRecipe(resolveToRecipe(resolved)); }), ); }; // Get initial recipe ID const initialRecipeId = processCell.key(TYPE).getRaw({ meta: ignoreReadForScheduling, }) as string | undefined; if (!initialRecipeId) { // No recipe yet - set up watcher to handle when $TYPE is set setupTypeWatcher(); return Promise.resolve(true); } // Try sync lookup first for initial recipe const initialResolved = this.runtime.recipeManager.recipeById( initialRecipeId, ); if (!initialResolved) { // Async load, then instantiate return this.runtime.recipeManager .loadRecipe(initialRecipeId, resultCell.space) .then((loaded) => { currentRecipeId = initialRecipeId; instantiateRecipe(resolveToRecipe(loaded)); setupTypeWatcher(); return true; }); } // Sync path - instantiate immediately currentRecipeId = initialRecipeId; instantiateRecipe(resolveToRecipe(initialResolved)); setupTypeWatcher(); return Promise.resolve(true); } private startWithTx( tx: IExtendedStorageTransaction, resultCell: Cell, givenRecipe?: Recipe, ): void { const key = this.getDocKey(resultCell); if (this.cancels.has(key)) return; // Already started const processCell = resultCell.withTx(tx).getSourceCell(); if (!processCell) { console.warn("Cannot start: process cell missing. Did you call setup()?"); return; } // Create cancel group early - before the $TYPE sink const [cancel, addCancel] = useCancelGroup(); this.cancels.set(key, cancel); this.allCancels.add(cancel); // Track recipe ID and node cancellation let currentRecipeId: string | undefined; let cancelNodes: Cancel | undefined; // Helper to resolve module to recipe const resolveToRecipe = (recipeOrModule: Recipe | Module): Recipe => { if (isModule(recipeOrModule)) { const module = recipeOrModule as Module; return { argumentSchema: module.argumentSchema ?? {}, resultSchema: module.resultSchema ?? {}, result: { $alias: { path: ["internal"] } }, nodes: [ { module, inputs: { $alias: { path: ["argument"] } }, outputs: { $alias: { path: ["internal"] } }, }, ], } satisfies Recipe; } return recipeOrModule as Recipe; }; // Helper to instantiate nodes for a recipe const instantiateRecipe = ( recipe: Recipe, useTx: IExtendedStorageTransaction, commitTx: boolean, ) => { // Create new cancel group for nodes const [nodeCancel, addNodeCancel] = useCancelGroup(); cancelNodes = nodeCancel; addCancel(nodeCancel); // Instantiate nodes this.discoverAndCacheFunctions(recipe, new Set()); try { for (const node of recipe.nodes) { this.instantiateNode( useTx, node.module, node.inputs, node.outputs, processCell, addNodeCancel, recipe, ); } } finally { if (commitTx) useTx.commit(); } }; // Get initial recipe ID and do initial setup const initialRecipeId = processCell.withTx(tx).key(TYPE).getRaw({ meta: ignoreReadForScheduling, }) as string | undefined; if (initialRecipeId) { currentRecipeId = initialRecipeId; const initialRecipe = givenRecipe ?? (() => { const resolved = this.runtime.recipeManager.recipeById( initialRecipeId, ); if (!resolved) throw new Error(`Unknown recipe: ${initialRecipeId}`); return resolveToRecipe(resolved); })(); instantiateRecipe(initialRecipe, tx, false); } // Watch $TYPE for future changes const typeCell = processCell.key(TYPE); addCancel( typeCell.sink((newRecipeId) => { if (!newRecipeId) return; // No recipe yet if (newRecipeId === currentRecipeId) return; // No change // Recipe changed - cancel previous nodes and re-instantiate cancelNodes?.(); currentRecipeId = newRecipeId; const resolved = this.runtime.recipeManager.recipeById(newRecipeId); if (!resolved) throw new Error(`Unknown recipe: ${newRecipeId}`); const recipe = resolveToRecipe(resolved); instantiateRecipe(recipe, this.runtime.edit(), true); }), ); } /** * Run a recipe. * * resultCell is required and should have an id. processCell is created if not * already set. * * If no recipe is provided, the previous one is used, and the recipe is * started if it isn't already started. * * If no argument is provided, the previous one is used, and the recipe is * started if it isn't already running. * * If a new recipe or any argument value is provided, a currently running * recipe is stopped, the recipe and argument replaced and the recipe * restarted. * * @param recipeFactory - Function that takes the argument and returns a * recipe. * @param argument - The argument to pass to the recipe. Can be static data * and/or cell references, including cell value proxies, docs and regular * cells. * @param resultCell - Cell to run the recipe off. * @returns The result cell. */ run( tx: IExtendedStorageTransaction | undefined, recipeFactory: NodeFactory, argument: T, resultCell: Cell, ): Cell; run( tx: IExtendedStorageTransaction | undefined, recipe: Recipe | Module | undefined, argument: T, resultCell: Cell, ): Cell; run( providedTx: IExtendedStorageTransaction, recipeOrModule: Recipe | Module | undefined, argument: T, resultCell: Cell, ): Cell { const tx = providedTx ?? this.runtime.edit(); const { needsStart, recipe } = this.setupInternal( tx, recipeOrModule, argument, resultCell, ); if (needsStart) { this.startWithTx(tx, resultCell, recipe); } if (!providedTx) tx.commit(); return resultCell; } async runSynced( resultCell: Cell, recipe: Recipe | Module, inputs?: any, ) { await resultCell.sync(); const synced = await this.syncCellsForRunningRecipe( resultCell, recipe, inputs, ); // Run the recipe. // // If the result cell has a transaction attached, and it is still open, // we'll use it for all reads and writes as it might be a pending read. // // TODO(seefeld): There is currently likely a race condition with the // scheduler if the transaction isn't committed before the first functions // run. Though most likely the worst case is just extra invocations. const givenTx = resultCell.tx?.status().status === "ready" && resultCell.tx; let setupRes: ReturnType | undefined; if (givenTx) { // If tx is given, i.e. result cell was part of a tx that is still open, // caller manages retries setupRes = this.setupInternal( givenTx, recipe, inputs, resultCell.withTx(givenTx), ); } else { const { error } = await this.runtime.editWithRetry((tx) => { setupRes = this.setupInternal( tx, recipe, inputs, resultCell.withTx(tx), ); }); if (error) { logger.error("recipe-setup-error", "Error setting up recipe", error); setupRes = undefined; } } // If a new recipe was specified, make sure to sync any new cells if (recipe || !synced) { await this.syncCellsForRunningRecipe(resultCell, recipe); } if (setupRes?.needsStart) { const tx = givenTx || this.runtime.edit(); this.startWithTx(tx, resultCell.withTx(tx), setupRes.recipe); if (!givenTx) { // Should be unnecessary as the start itself is read-only // TODO(seefeld): Enforce this by adding a read-only flag for tx await tx.commit().then(({ error }) => { if (error) { logger.error( "tx-commit-error", () => [ "Error committing transaction", "\nError:", JSON.stringify(error, null, 2), error.name === "ConflictError" ? [ "\nConflict details:", JSON.stringify(error.conflict, null, 2), "\nTransaction:", JSON.stringify(error.transaction, null, 2), ] : [], ], ); } }); } } return recipe?.resultSchema ? resultCell.asSchema(recipe.resultSchema) : resultCell; } private getDocKey(cell: Cell): `${MemorySpace}/${URI}` { const { space, id } = cell.getAsNormalizedFullLink(); return `${space}/${id}`; } private async syncCellsForRunningRecipe( resultCell: Cell, recipe: Module | Recipe, inputs?: any, ): Promise { const seen = new Set>(); const promises = new Set>(); const syncAllMentionedCells = (value: any) => { if (seen.has(value)) return; seen.add(value); const link = parseLink(value, resultCell); if (link) { const maybePromise = this.runtime.getCellFromLink(link).sync(); if (maybePromise instanceof Promise) promises.add(maybePromise); } else if (isRecord(value)) { for (const key in value) syncAllMentionedCells(value[key]); } }; syncAllMentionedCells(inputs); await Promise.all(promises); // TODO(@ubik2): Move this to a more general method in schema.ts or cfc.ts const processCellSchema: any = { type: "object", properties: { [TYPE]: { type: "string" }, argument: recipe.argumentSchema ?? true, }, required: [TYPE], }; if ( isRecord(processCellSchema) && "properties" in processCellSchema && isObject(recipe.argumentSchema) ) { // extract $defs and definitions and remove them from argumentSchema const { $defs, definitions, ...rest } = recipe.argumentSchema; (processCellSchema as any).properties.argument = rest ?? true; if (isRecord($defs)) { (processCellSchema as any).$defs = $defs; } if (isRecord(definitions)) { (processCellSchema as any).definitions = definitions; } } const sourceCell = resultCell.getSourceCell(processCellSchema); if (!sourceCell) return false; await sourceCell.sync(); // We could support this by replicating what happens in runner, but since // we're calling this again when returning false, this is good enough for now. if (isModule(recipe)) return false; const cells: Cell[] = []; // Sync all the inputs and outputs of the recipe nodes. for (const node of recipe.nodes) { const inputs = findAllWriteRedirectCells(node.inputs, sourceCell); const outputs = findAllWriteRedirectCells(node.outputs, sourceCell); // TODO(seefeld): This ignores schemas provided by modules, so it might // still fetch a lot. [...inputs, ...outputs].forEach((link) => { cells.push(this.runtime.getCellFromLink(link)); }); } // Sync all the previously computed results. if (recipe.resultSchema !== undefined) { cells.push(resultCell.asSchema(recipe.resultSchema)); } // If the result has a UI and it wasn't already included in the result // schema, sync it as well. This prevents the UI from flashing, because it's // first locally computed, then conflicts on write and only then properly // received from the server. if ( isRecord(recipe.result) && recipe.result[UI] && (!isRecord(recipe.resultSchema) || !recipe.resultSchema.properties?.[UI]) ) { cells.push(resultCell.key(UI).asSchema(vdomSchema)); } await Promise.all(cells.map((c) => c.sync())); return true; } /** * Stop a recipe. This will cancel the recipe and all its children. * * TODO: This isn't a good strategy, as other instances might depend on behavior * provided here, even if the user might no longer care about e.g. the UI here. * A better strategy would be to schedule based on effects and unregister the * effects driving execution, e.g. the UI. * * @param resultCell - The result doc or cell to stop. */ stop(resultCell: Cell): void { const key = this.getDocKey(resultCell); this.cancels.get(key)?.(); this.cancels.delete(key); } stopAll(): void { // Cancel all tracked operations for (const cancel of this.allCancels) { try { cancel(); } catch (error) { console.warn("Error canceling operation:", error); } } this.allCancels.clear(); // Clear the result recipe cache as well, since the actions have been // canceled this.resultRecipeCache.clear(); } /** * Discover and cache JavaScript functions from a recipe. * This recursively traverses the recipe structure to find all JavaScript modules * with string implementations and evaluates them for caching. * * @param recipe The recipe to discover functions from */ private discoverAndCacheFunctions( recipe: Recipe, seen: Set, ): void { if (seen.has(recipe)) return; seen.add(recipe); for (const node of recipe.nodes) { this.discoverAndCacheFunctionsFromModule(node.module, seen); // Also check inputs for nested recipes (e.g., in map operations) this.discoverAndCacheFunctionsFromValue(node.inputs, seen); } } /** * Discover and cache functions from a module. * * @param module The module to process */ private discoverAndCacheFunctionsFromModule( module: Module, seen: Set, ): void { if (seen.has(module)) return; seen.add(module); if (!isModule(module)) return; switch (module.type) { case "javascript": // Cache JavaScript functions that are already function objects if ( typeof module.implementation === "function" && !this.functionCache.has(module) ) { this.functionCache.set(module, module.implementation); } break; case "recipe": // Recursively discover functions in nested recipes if (isRecipe(module.implementation)) { this.discoverAndCacheFunctions(module.implementation, seen); } break; case "ref": // Resolve reference and process the referenced module try { const referencedModule = this.runtime.moduleRegistry.getModule( module.implementation as string, ); this.discoverAndCacheFunctionsFromModule(referencedModule, seen); } catch (error) { console.warn( `Failed to resolve module reference for implementation "${module.implementation}":`, error, ); } break; } } /** * Discover and cache functions from a value that might contain recipes. * This handles cases where recipes are passed as inputs (e.g., to map operations). * * @param value The value to search for recipes */ private discoverAndCacheFunctionsFromValue( value: JSONValue, seen: Set, ): void { if (isRecipe(value)) { this.discoverAndCacheFunctions(value, seen); return; } if (isModule(value)) { this.discoverAndCacheFunctionsFromModule(value, seen); return; } if ( !isRecord(value) || isCell(value) || isCellResult(value) ) { return; } if (seen.has(value)) return; seen.add(value); // Recursively search in objects and arrays if (Array.isArray(value)) { for (const item of value as JSONValue[]) { this.discoverAndCacheFunctionsFromValue(item, seen); } return; } for (const key in value as Record) { this.discoverAndCacheFunctionsFromValue( value[key] as JSONValue, seen, ); } } private instantiateNode( tx: IExtendedStorageTransaction, module: Module, inputBindings: JSONValue, outputBindings: JSONValue, processCell: Cell, addCancel: AddCancel, recipe: Recipe, ) { if (isModule(module)) { switch (module.type) { case "ref": this.instantiateNode( tx, this.runtime.moduleRegistry.getModule( module.implementation as string, ), inputBindings, outputBindings, processCell, addCancel, recipe, ); break; case "javascript": this.instantiateJavaScriptNode( tx, module, inputBindings, outputBindings, processCell, addCancel, recipe, ); break; case "raw": this.instantiateRawNode( tx, module, inputBindings, outputBindings, processCell, addCancel, recipe, ); break; case "passthrough": this.instantiatePassthroughNode( tx, module, inputBindings, outputBindings, processCell, addCancel, recipe, ); break; case "recipe": this.instantiateRecipeNode( tx, module, inputBindings, outputBindings, processCell, addCancel, recipe, ); break; default: throw new Error(`Unknown module type: ${module.type}`); } } else if (isWriteRedirectLink(module)) { // TODO(seefeld): Implement, a dynamic node } else { throw new Error(`Unknown module: ${JSON.stringify(module)}`); } } private instantiateJavaScriptNode( tx: IExtendedStorageTransaction, module: Module, inputBindings: JSONValue, outputBindings: JSONValue, processCell: Cell, addCancel: AddCancel, recipe: Recipe, ) { const inputs = unwrapOneLevelAndBindtoDoc( inputBindings, processCell, ); const reads = findAllWriteRedirectCells(inputs, processCell); const outputs = unwrapOneLevelAndBindtoDoc(outputBindings, processCell); const writes = findAllWriteRedirectCells(outputs, processCell); let fn: (inputs: any) => any; if (typeof module.implementation === "string") { // Try to get from cache first const cached = this.functionCache.get(module); if (cached) { fn = cached as (inputs: any) => any; } else { // Fall back to evaluating and cache it fn = this.runtime.harness.getInvocation(module.implementation) as ( inputs: any, ) => any; this.functionCache.set(module, fn); } } else { fn = module.implementation as (inputs: any) => any; } // Prefer .src (backup) over .name since name can be finicky const name = (fn as { src?: string; name?: string }).src || fn.name; if (module.wrapper && module.wrapper in moduleWrappers) { fn = moduleWrappers[module.wrapper](fn); } // Check if any of the read cells is a stream alias let streamLink: NormalizedFullLink | undefined = undefined; if (isRecord(inputs)) { for (const key in inputs) { let value = inputs[key]; while (isWriteRedirectLink(value)) { const maybeStreamLink = resolveLink( this.runtime, tx, parseLink(value, processCell), "writeRedirect", ); value = tx.readValueOrThrow(maybeStreamLink); } if (isStreamValue(value)) { streamLink = parseLink(inputs[key], processCell); break; } } } if (streamLink) { // Helper to merge event into inputs const mergeEventIntoInputs = (event: any) => { const eventInputs = { ...(inputs as Record) }; for (const key in eventInputs) { if (isWriteRedirectLink(eventInputs[key])) { const eventLink = parseLink(eventInputs[key], processCell); if (areNormalizedLinksSame(eventLink, streamLink)) { eventInputs[key] = event; } } } return eventInputs; }; // Register as event handler for the stream const handler = (tx: IExtendedStorageTransaction, event: any) => { // TODO(seefeld): Scheduler has to create the transaction instead if (event.preventDefault) event.preventDefault(); const eventInputs = mergeEventIntoInputs(event); const cause = { ...(inputs as Record) }; for (const key in cause) { if (isWriteRedirectLink(cause[key])) { const eventLink = parseLink(cause[key], processCell); if (areNormalizedLinksSame(eventLink, streamLink)) { cause[key] = crypto.randomUUID(); } } } const frame = pushFrameFromCause( cause, { unsafe_binding: { recipe, materialize: (path: readonly PropertyKey[]) => processCell.getAsQueryResult(path), space: processCell.space, tx, }, inHandler: true, runtime: this.runtime, space: processCell.space, tx, }, ); try { const inputsCell = this.runtime.getImmutableCell( processCell.space, eventInputs, undefined, tx, ); const argument = module.argumentSchema ? inputsCell.asSchema(module.argumentSchema).get() : inputsCell.getAsQueryResult([], tx); const result = fn(argument); const postRun = (result: any) => { if (containsOpaqueRef(result) || frame.opaqueRefs.size > 0) { const resultRecipe = recipeFromFrame( "event handler result", undefined, () => result, ); const resultCell = this.run( tx, resultRecipe, undefined, this.runtime.getCell( processCell.space, { resultFor: cause }, undefined, tx, ), ); const rawResult = tx.readValueOrThrow( resultCell.getAsNormalizedFullLink(), { meta: ignoreReadForScheduling }, ); const resultRedirects = findAllWriteRedirectCells( rawResult, processCell, ); // Create effect that re-runs when inputs change // (nothing else would read from it, otherwise) const readResultAction: Action = (tx) => resultRedirects.forEach((link) => tx.readValueOrThrow(link)); if (name) { Object.defineProperty(readResultAction, "name", { value: `readResult:${name}`, configurable: true, }); // Also set .src as backup (name can be finicky) (readResultAction as Action & { src?: string }).src = `readResult:${name}`; } const cancel = this.runtime.scheduler.subscribe( readResultAction, readResultAction, { isEffect: true }, ); addCancel(() => { cancel(); this.stop(resultCell); }); } return result; }; if (result instanceof Promise) { return result.then(postRun); } else { return postRun(result); } } catch (error) { (error as Error & { frame?: Frame }).frame = frame; throw error; } finally { popFrame(frame); } }; if (name) { Object.defineProperty(handler, "name", { value: `handler:${name}`, configurable: true, }); } const wrappedHandler = Object.assign(handler, { reads, writes, module, recipe, }); // Create callback to populate dependencies for pull mode scheduling. // This reads all cells the handler will access (from the argument schema and event). const populateDependencies = module.argumentSchema ? (depTx: IExtendedStorageTransaction, event: any) => { // Merge event into inputs the same way the handler does const eventInputs = mergeEventIntoInputs(event); const inputsCell = this.runtime.getImmutableCell( processCell.space, eventInputs, undefined, depTx, ); // Use traverseCells to read into all nested Cell objects (including event) inputsCell.asSchema(module.argumentSchema!).get({ traverseCells: true, }); } : undefined; addCancel( this.runtime.scheduler.addEventHandler( wrappedHandler, streamLink, populateDependencies, ), ); } else { if (isRecord(inputs) && "$event" in inputs) { throw new Error( "Handler used as lift, because $stream: true was overwritten", ); } // Schedule the action to run when the inputs change const inputsCell = this.runtime.getImmutableCell( processCell.space, inputs, undefined, tx, ); // Cache the result cell, so we don't regenerate it // This will break if we altered the process cell to point to a // different result, so don't do that. let previousResultCell: Cell | undefined; const action: Action = (tx: IExtendedStorageTransaction) => { const frame = pushFrameFromCause( { inputs, outputs, fn: fn.toString() }, { unsafe_binding: { recipe, materialize: (path: readonly PropertyKey[]) => processCell.getAsQueryResult(path, tx), space: processCell.space, tx, }, inHandler: false, runtime: this.runtime, space: processCell.space, tx, }, ); try { const argument = module.argumentSchema ? inputsCell.asSchema(module.argumentSchema).withTx(tx).get() : inputsCell.getAsQueryResult([], tx); const result = fn(argument); const postRun = (result: any) => { if (containsOpaqueRef(result) || frame.opaqueRefs.size > 0) { const resultRecipe = recipeFromFrame( "action result", undefined, () => result, ); const resultCell = previousResultCell ?? this.runtime.getCell( processCell.space, { resultFor: { inputs, outputs, fn: fn.toString() } }, undefined, tx, ); // If nothing changed, don't rerun the recipe const resultRecipeAsString = JSON.stringify(resultRecipe); const previousResultRecipeAsString = this.resultRecipeCache.get( `${resultCell.space}/${resultCell.sourceURI}`, ); if (previousResultRecipeAsString === resultRecipeAsString) { return; } this.resultRecipeCache.set( `${resultCell.space}/${resultCell.sourceURI}`, resultRecipeAsString, ); this.run( tx, resultRecipe, undefined, resultCell, ); addCancel(() => this.stop(resultCell)); if (!previousResultCell) { previousResultCell = resultCell; sendValueToBinding( tx, processCell, outputs, resultCell.getAsLink({ base: processCell }), ); } } else { sendValueToBinding( tx, processCell, outputs, result, ); } return result; }; if (result instanceof Promise) { return result.then(postRun); } else { return postRun(result); } } catch (error) { (error as Error & { frame?: Frame }).frame = frame; throw error; } finally { popFrame(frame); } }; if (name) { Object.defineProperty(action, "name", { value: `action:${name}`, configurable: true, }); // Also set .src as backup (name can be finicky) (action as Action & { src?: string }).src = `action:${name}`; } const wrappedAction = Object.assign(action, { reads, writes, module, recipe, }); // Create populateDependencies callback to discover what cells the action reads // and writes. Both are needed for pull-based scheduling: // - reads: to know when to re-run the action (input dependencies) // - writes: so collectDirtyDependencies() can find this computation when // an effect needs its outputs const populateDependencies = (depTx: IExtendedStorageTransaction) => { // Capture read dependencies - use the pre-computed reads list // Note: We DON'T run fn(depTx) here because that would execute // user code with side effects during dependency discovery if (module.argumentSchema !== undefined) { const inputsCell = this.runtime.getImmutableCell( processCell.space, inputs, undefined, depTx, ); inputsCell.asSchema(module.argumentSchema!).get({ traverseCells: true, }); } else { for (const read of reads) { this.runtime.getCellFromLink(read, undefined, depTx)?.get(); } } // Capture write dependencies by marking outputs as potential writes for (const output of writes) { // Reading with markReadAsPotentialWrite registers this as a write dependency this.runtime.getCellFromLink(output, undefined, depTx)?.getRaw({ meta: markReadAsPotentialWrite, }); } }; addCancel( this.runtime.scheduler.subscribe(wrappedAction, populateDependencies), ); } } private instantiateRawNode( tx: IExtendedStorageTransaction, module: Module, inputBindings: JSONValue, outputBindings: JSONValue, processCell: Cell, addCancel: AddCancel, recipe: Recipe, ) { if (typeof module.implementation !== "function") { throw new Error( `Raw module is not a function, got: ${module.implementation}`, ); } const mappedInputBindings = unwrapOneLevelAndBindtoDoc( inputBindings, processCell, ); const mappedOutputBindings = unwrapOneLevelAndBindtoDoc( outputBindings, processCell, ); // For `map` and future other node types that take closures, we need to // note the parent recipe on the closure recipes. unsafe_noteParentOnRecipes(recipe, mappedInputBindings); const inputCells = findAllWriteRedirectCells( mappedInputBindings, processCell, ); // outputCells tracks what cells this action writes to. This is needed for // pull-based scheduling so collectDirtyDependencies() can find computations // that write to cells being read by effects. const outputCells = findAllWriteRedirectCells( mappedOutputBindings, processCell, ); const inputsCell = this.runtime.getImmutableCell( processCell.space, mappedInputBindings, undefined, tx, ); const builtinResult: RawBuiltinReturnType = module.implementation( inputsCell, (tx: IExtendedStorageTransaction, result: any) => { sendValueToBinding( tx, processCell, mappedOutputBindings, result, ); }, addCancel, { inputs: inputsCell, parents: processCell.entityId }, processCell, this.runtime, ); // Handle both legacy (just Action) and new (RawBuiltinResult) return formats const action = isRawBuiltinResult(builtinResult) ? builtinResult.action : builtinResult; const builtinIsEffect = isRawBuiltinResult(builtinResult) ? builtinResult.isEffect : undefined; const builtinPopulateDependencies = isRawBuiltinResult(builtinResult) ? builtinResult.populateDependencies : undefined; // Name the raw action for debugging - use implementation name or fallback to "raw" const impl = module.implementation as (...args: unknown[]) => Action; const rawName = `raw:${impl.name || "anonymous"}`; Object.defineProperty(action, "name", { value: rawName, configurable: true, }); (action as Action & { src?: string }).src = rawName; // Create populateDependencies callback. // If builtin provides custom reads, use that; otherwise read all inputs. // Always register output writes so collectDirtyDependencies() can find this // computation when an effect needs its outputs. const populateDependencies = (depTx: IExtendedStorageTransaction) => { // Capture read dependencies - use custom if provided, otherwise read all inputs if (builtinPopulateDependencies) { if (typeof builtinPopulateDependencies === "function") { builtinPopulateDependencies(depTx); } else { // It's a ReactivityLog - reads are already captured, nothing to do for (const read of builtinPopulateDependencies.reads) { depTx.readOrThrow(read); } } } else { // Default: read all inputs for (const input of inputCells) { this.runtime.getCellFromLink(input, undefined, depTx)?.get(); } } // Always capture write dependencies by marking outputs as potential writes for (const output of outputCells) { // Reading with markReadAsPotentialWrite registers this as a write dependency this.runtime.getCellFromLink(output, undefined, depTx)?.getRaw({ meta: markReadAsPotentialWrite, }); } }; // isEffect can come from module options or from the builtin result const isEffect = module.isEffect ?? builtinIsEffect; addCancel( this.runtime.scheduler.subscribe(action, populateDependencies, { isEffect, }), ); } private instantiatePassthroughNode( tx: IExtendedStorageTransaction, _module: Module, inputBindings: JSONValue, outputBindings: JSONValue, processCell: Cell, _addCancel: AddCancel, _recipe: Recipe, ) { const inputs = unwrapOneLevelAndBindtoDoc(inputBindings, processCell); sendValueToBinding(tx, processCell, outputBindings, inputs); } private instantiateRecipeNode( tx: IExtendedStorageTransaction, module: Module, inputBindings: JSONValue, outputBindings: JSONValue, processCell: Cell, addCancel: AddCancel, _recipe: Recipe, ) { if (!isRecipe(module.implementation)) throw new Error(`Invalid recipe`); const recipeImpl = unwrapOneLevelAndBindtoDoc( module.implementation, processCell, ); const inputs = unwrapOneLevelAndBindtoDoc(inputBindings, processCell); // If output bindings is a link to a non-redirect cell, // use that instead of creating a new cell. let resultCell; let sendToBindings: boolean; if (isSigilLink(outputBindings) && !isWriteRedirectLink(outputBindings)) { resultCell = this.runtime.getCellFromLink( parseLink(outputBindings, processCell), recipeImpl.resultSchema, tx, ); sendToBindings = false; } else { resultCell = this.runtime.getCell( processCell.space, { recipe: module.implementation, parent: processCell.entityId, inputBindings, outputBindings, }, recipeImpl.resultSchema, tx, ); sendToBindings = true; } this.run(tx, recipeImpl, inputs, resultCell); if (sendToBindings) { sendValueToBinding( tx, processCell, outputBindings, resultCell.getAsLink({ base: processCell }), ); } // TODO(seefeld): Make sure to not cancel after a recipe is elevated to a // charm, e.g. via navigateTo. Nothing is cancelling right now, so leaving // this as TODO. addCancel(() => this.stop(resultCell)); } } // This takes a recipe id and returns a sigil link with the corresponding entity. function getSpellLink(recipeId: string): SigilLink { const id = refer({ causal: { recipeId, type: "recipe" } }).toJSON()["/"]; return { "/": { [LINK_V1_TAG]: { id: `of:${id}` } } }; } function containsOpaqueRef(value: unknown): boolean { if (isOpaqueRef(value)) return true; if (isCellLink(value)) return false; if (isRecord(value)) { return Object.values(value).some(containsOpaqueRef); } return false; } export function cellAwareDeepCopy(value: T): Mutable { if (isCellLink(value)) return value as Mutable; if (isRecord(value)) { return Array.isArray(value) ? value.map(cellAwareDeepCopy) as unknown as Mutable : Object.fromEntries( Object.entries(value).map(( [key, value], ) => [key, cellAwareDeepCopy(value)]), ) as unknown as Mutable; // Literal value: } else return value as Mutable; } /** * Extracts default values from a JSON schema object. * @param schema - The JSON schema to extract defaults from * @returns An object containing the default values, or undefined if none found */ export function extractDefaultValues( schema: JSONSchema, ): JSONValue | undefined { if (typeof schema !== "object" || schema === null) return undefined; if ( schema.type === "object" && schema.properties && isObject(schema.properties) ) { // Ignore the schema.default if it's not an object, since it's not a valid // default value for an object. const obj = cellAwareDeepCopy( isRecord(schema.default) ? schema.default : {}, ); for (const [propKey, propSchema] of Object.entries(schema.properties)) { const value = extractDefaultValues(propSchema); if (value !== undefined) { (obj as Record)[propKey] = value; } } return Object.entries(obj).length > 0 ? obj : undefined; } return schema.default; } /** * Merges objects into a single object, preferring values from later objects. * Recursively calls itself for nested objects, passing on any objects that * matching properties. * @param objects - Objects to merge * @returns A merged object, or undefined if no objects provided */ export function mergeObjects( ...objects: (Partial | undefined)[] ): T { objects = objects.filter((obj) => obj !== undefined); if (objects.length === 0) return {} as T; if (objects.length === 1) return objects[0] as T; const seen = new Set(); const result: Record = {}; for (const obj of objects) { // If we have a literal value, return it. Same for arrays, since we wouldn't // know how to merge them. Note that earlier objects take precedence, so if // an earlier was e.g. an object, we'll return that instead of the literal. if (!isObject(obj) || isCellLink(obj)) { return obj as T; } // Then merge objects, only passing those on that have any values. for (const key of Object.keys(obj)) { if (seen.has(key)) continue; seen.add(key); const merged = mergeObjects( ...objects.map((obj) => (obj as Record)?.[key] as T[keyof T] ), ); if (merged !== undefined) result[key] = merged; } } return result as T; } const moduleWrappers = { handler: (fn: (event: any, ...props: any[]) => any) => (props: any) => fn(props.$event, props.$ctx), };