import type { IMemorySpaceAddress } from "../storage/interface.ts"; import { entityKey } from "./keys.ts"; import type { MaterializerIndexState } from "./materializers.ts"; import type { NodeRegistry, SchedulerNode } from "./node-record.ts"; import { readsOverlapWrites } from "./scheduling-writes.ts"; import type { SchedulerStaleness } from "./staleness.ts"; import type { TriggerIndexState } from "./trigger-index.ts"; import type { Action, DirtyDependencyTraceContext, ReactivityLog, SpaceScopeAndURI, } from "./types.ts"; export interface DependencyGraphState { readonly triggerIndex: TriggerIndexState; readonly writersByEntity: Map>; readonly dependencies: WeakMap; readonly dependents: WeakMap>; readonly reverseDependencies: WeakMap>; readonly staleness: SchedulerStaleness; readonly nodes: NodeRegistry; readonly materializerIndex: Pick; readonly getSchedulingWrites: ( action: Action, ) => readonly IMemorySpaceAddress[] | undefined; readonly isStale: (action: Action) => boolean; readonly queueExecution: () => void; } export type SchedulerLivenessState = Pick< DependencyGraphState, "nodes" | "reverseDependencies" | "materializerIndex" >; export function isLive( state: SchedulerLivenessState, node: SchedulerNode, ): boolean { if (!isRegisteredNode(state, node)) return false; return state.nodes.isEffect(node.action) || node.liveRefs > 0 || node.provisionalDemand || state.materializerIndex.isMaterializer(node.action); } export function notifyNodeLivenessChange( state: SchedulerLivenessState, action: Action, wasLive: boolean, ): void { const node = state.nodes.get(action); if (!node) return; const nowLive = isLive(state, node); if (wasLive === nowLive) return; if (nowLive) { addLiveRefsFromWriters(state, node, new Set()); } else { dropLiveRefsFromWriters(state, node, new Set()); } } export function setNodeProvisionalDemand( state: SchedulerLivenessState, node: SchedulerNode, provisionalDemand: boolean, passId?: number, ): void { const wasLive = isLive(state, node); node.provisionalDemand = provisionalDemand; if (provisionalDemand) { node.provisionalDemandPass = passId; } else { node.provisionalDemandPass = undefined; } notifyNodeLivenessChange(state, node.action, wasLive); } export function groupReadsByEntity( reads: readonly IMemorySpaceAddress[], ): Map { const readsByEntity = new Map(); for (const read of reads) { const entity = entityKey(read); let entityReads = readsByEntity.get(entity); if (!entityReads) { entityReads = []; readsByEntity.set(entity, entityReads); } entityReads.push(read); } return readsByEntity; } export function hasDependentPath( dependentsByAction: WeakMap>, from: Action, to: Action, visited = new Set(), ): boolean { if (from === to) return true; if (visited.has(from)) return false; visited.add(from); const dependents = dependentsByAction.get(from); if (!dependents) return false; for (const dependent of dependents) { if (hasDependentPath(dependentsByAction, dependent, to, visited)) { return true; } } return false; } export function collectDirectWritersForLog(state: { readonly writersByEntity: Map>; readonly effects: ReadonlySet; readonly getSchedulingWrites: ( action: Action, ) => readonly IMemorySpaceAddress[] | undefined; readonly trace?: DirtyDependencyTraceContext; }, log: ReactivityLog): Set { const directWriters = new Set(); if (state.trace) { state.trace.logReadCount += log.reads.length; state.trace.logShallowReadCount += log.shallowReads.length; } for (const read of log.reads) { const entity = entityKey(read); const writers = state.writersByEntity.get(entity); if (!writers) continue; for (const writer of writers) { if (state.effects.has(writer)) continue; if (state.trace) state.trace.writerCandidateCount++; const writes = state.getSchedulingWrites(writer) ?? []; if (readsOverlapWrites([read], [], writes)) { if (state.trace && !directWriters.has(writer)) { state.trace.writerOverlapCount++; } directWriters.add(writer); } } } for (const read of log.shallowReads) { const entity = entityKey(read); const writers = state.writersByEntity.get(entity); if (!writers) continue; for (const writer of writers) { if (state.effects.has(writer)) continue; if (state.trace) state.trace.writerCandidateCount++; const writes = state.getSchedulingWrites(writer) ?? []; if (readsOverlapWrites([], [read], writes)) { if (state.trace && !directWriters.has(writer)) { state.trace.writerOverlapCount++; } directWriters.add(writer); } } } return directWriters; } export function collectReverseDependenciesForLog( state: { readonly writersByEntity: Map>; readonly getSchedulingWrites: ( action: Action, ) => readonly IMemorySpaceAddress[] | undefined; }, action: Action, log: ReactivityLog, ): Set { const dependencies = new Set(); // Group reads by entity for efficient writer lookups. const readsByEntity = groupReadsByEntity(log.reads); const nonRecursiveByEntity = groupReadsByEntity(log.shallowReads); const allEntities = new Set([ ...readsByEntity.keys(), ...nonRecursiveByEntity.keys(), ]); // For each entity we read from, find actions that write to it. for (const entity of allEntities) { const writers = state.writersByEntity.get(entity); if (!writers) continue; const entityReads = readsByEntity.get(entity) ?? []; const entityNonRecursiveReads = nonRecursiveByEntity.get(entity) ?? []; for (const otherAction of writers) { if (otherAction === action) continue; if (dependencies.has(otherAction)) continue; const otherWrites = state.getSchedulingWrites(otherAction) ?? []; if ( readsOverlapWrites( entityReads, entityNonRecursiveReads, otherWrites, ) ) { dependencies.add(otherAction); } } } return dependencies; } export function updateDependentEdgesForLog( state: DependencyGraphState, action: Action, log: ReactivityLog, ): void { const previousDependencies = state.reverseDependencies.get(action); if (previousDependencies) { for (const dependency of [...previousDependencies]) { unregisterDependentEdge(state, dependency, action); } state.reverseDependencies.delete(action); } const newDependencies = collectReverseDependenciesForLog( state, action, log, ); for (const dependency of newDependencies) { registerDependentEdge(state, dependency, action); } state.reverseDependencies.set(action, newDependencies); } export function registerDependentEdge( state: DependencyGraphState, writer: Action, dependent: Action, ): void { if (writer === dependent) return; let dependents = state.dependents.get(writer); if (!dependents) { dependents = new Set(); state.dependents.set(writer, dependents); } const alreadyDependent = dependents.has(dependent); dependents.add(dependent); let reverse = state.reverseDependencies.get(dependent); if (!reverse) { reverse = new Set(); state.reverseDependencies.set(dependent, reverse); } reverse.add(writer); if (!alreadyDependent) { const dependentRecord = state.nodes.get(dependent); if (dependentRecord && isLive(state, dependentRecord)) { addLiveRef(state, writer, new Set()); } } if (!alreadyDependent && state.isStale(writer)) { state.staleness.addStaleUpstream(writer, dependent); const writerRecord = state.nodes.get(writer); if ( writerRecord?.kind === "computation" && isLive(state, writerRecord) ) { state.queueExecution(); } } } export function registerDependentsForWriterSurface( state: DependencyGraphState, writer: Action, writes: readonly IMemorySpaceAddress[], ): void { const readers = new Set(); for (const write of writes) { for (const action of state.triggerIndex.collectReadersForWrite(write)) { readers.add(action); } } readers.delete(writer); for (const action of readers) { registerDependentEdge(state, writer, action); } } export function unregisterDependentEdge( state: DependencyGraphState, writer: Action, dependent: Action, ): void { const dependentRecord = state.nodes.get(dependent); const dependentWasLive = dependentRecord ? isLive(state, dependentRecord) : false; const dependents = state.dependents.get(writer); const hadDependent = dependents?.delete(dependent) ?? false; if (dependents && dependents.size === 0) { state.dependents.delete(writer); } const reverse = state.reverseDependencies.get(dependent); reverse?.delete(writer); if (reverse && reverse.size === 0) { state.reverseDependencies.delete(dependent); } if (hadDependent) { if (dependentWasLive) { dropLiveRef(state, writer, new Set()); } state.staleness.removeStaleUpstream(writer, dependent); } } // Spec §5.2: refcount deltas propagate upstream with a visited-set cycle // guard — each node is updated AT MOST ONCE per propagation pass. Guarding // the increment itself (not just the recursion) keeps a cycle's back edge // from double-counting its origin, which would leave the cycle live forever // once its only root unsubscribes. Caveat (recorded in PROGRESS.md): this // per-pass dedup undercounts multi-path (diamond) graphs relative to // per-edge accounting when an individual edge is later unregistered while // its reader stays live. function addLiveRef( state: SchedulerLivenessState, action: Action, visited: Set, ): void { if (visited.has(action)) return; visited.add(action); const node = state.nodes.get(action); if (!node || !isRegisteredNode(state, node)) return; const wasLive = isLive(state, node); node.liveRefs++; if (!wasLive && isLive(state, node)) { addLiveRefsFromWriters(state, node, visited); } } function dropLiveRef( state: SchedulerLivenessState, action: Action, visited: Set, ): void { if (visited.has(action)) return; visited.add(action); const node = state.nodes.get(action); if (!node || !isRegisteredNode(state, node) || node.liveRefs === 0) { return; } const wasLive = isLive(state, node); node.liveRefs--; if (wasLive && !isLive(state, node)) { dropLiveRefsFromWriters(state, node, visited); } } function addLiveRefsFromWriters( state: SchedulerLivenessState, node: SchedulerNode, visited: Set, ): void { updateLiveRefsFromWriters(state, node, visited, addLiveRef); } function dropLiveRefsFromWriters( state: SchedulerLivenessState, node: SchedulerNode, visited: Set, ): void { updateLiveRefsFromWriters(state, node, visited, dropLiveRef); } function updateLiveRefsFromWriters( state: SchedulerLivenessState, node: SchedulerNode, visited: Set, update: ( state: SchedulerLivenessState, action: Action, visited: Set, ) => void, ): void { // Direction convention: `dependents` is writer -> readers, and // `reverseDependencies` is reader -> writers. Liveness therefore propagates // from a live reader upstream through `reverseDependencies`. Mark the // origin so cyclic back edges cannot update it again this pass; the // per-node guard lives in addLiveRef/dropLiveRef. visited.add(node.action); const writers = state.reverseDependencies.get(node.action); if (!writers) return; for (const writer of writers) { update(state, writer, visited); } } function isRegisteredNode( state: SchedulerLivenessState, node: SchedulerNode, ): boolean { return state.nodes.isEffect(node.action) || state.nodes.isComputation(node.action); } export function pendingDependencyCollectionMightAffect( state: { readonly pendingDependencyCollection: ReadonlySet; readonly effects: ReadonlySet; readonly isThrottled: (action: Action) => boolean; readonly getSchedulingWrites: ( action: Action, ) => readonly IMemorySpaceAddress[] | undefined; readonly hasDependentPath: (from: Action, to: Action) => boolean; }, action: Action, reads: readonly IMemorySpaceAddress[], shallowReads: readonly IMemorySpaceAddress[], ): boolean { if (reads.length === 0 && shallowReads.length === 0) return false; for (const pendingAction of state.pendingDependencyCollection) { if (pendingAction === action) continue; if (state.effects.has(pendingAction)) continue; if (state.isThrottled(pendingAction)) continue; const writes = state.getSchedulingWrites(pendingAction); if (!writes || writes.length === 0) return true; if (state.hasDependentPath(pendingAction, action)) return true; if (readsOverlapWrites(reads, shallowReads, writes)) { return true; } } return false; }