Working with graphs¶
Use std.graph when relationships are part of the program model. A graph is a better fit than ad hoc dict and list structures when callers need stable node ids, edge queries, traversal, topological ordering, or DAG cycle rejection.
Choose a graph type¶
| Need | Use |
|---|---|
| Ordinary directed relationships with at most one edge from one node to another | DiGraph[T] |
| Dependency plans, build steps, or staged pipelines that must stay acyclic | Dag[T] |
| Parallel relationships between the same source and target nodes | MultiDiGraph[T] |
Use a builtin dict or list when the relationships are temporary implementation details. Use std.graph when the relationship structure should be visible to callers or tests.
Build a directed graph¶
Add payload-carrying nodes first, then connect node ids with directed edges:
from std.graph import DiGraph, NodeId
mut g = DiGraph[str]()
scan: NodeId = g.add_node(payload="scan_users")
filter: NodeId = g.add_node(payload="filter_active")
join: NodeId = g.add_node(payload="join_orders")
g.add_edge(from_=scan, to=filter)
g.add_edge(from_=filter, to=join)
add_node(payload=...) returns the node id. add_edge(from_=..., to=...) connects existing node ids, and both endpoints must belong to the same graph.
Carry structured payloads¶
Payloads can be model, class, enum, newtype, or other ordinary Incan values:
from std.graph import DiGraph, NodeId
pub model Step:
pub name: str
pub timeout_ms: int
def build_step_graph() -> DiGraph[Step]:
mut g = DiGraph[Step]()
fetch = Step(name="fetch", timeout_ms=5000)
parse = Step(name="parse", timeout_ms=2000)
n_fetch: NodeId = g.add_node(payload=fetch)
n_parse: NodeId = g.add_node(payload=parse)
g.add_edge(from_=n_fetch, to=n_parse)
return g
def step_name_at(g: DiGraph[Step], nid: NodeId) -> str:
match g.node_payload(nid):
case Ok(step):
return step.name
case Err(_):
return ""
The node id is the graph identity. If two nodes should represent the same logical object, decide that in the domain model by reusing one node id or adding separate nodes with separate payload values.
Remove nodes and edges¶
Removing a node also removes every incoming and outgoing edge attached to that node:
from std.graph import DiGraph, GraphError, NodeId
mut g = DiGraph[str]()
first: NodeId = g.add_node(payload="first")
second: NodeId = g.add_node(payload="second")
g.add_edge(from_=first, to=second)
removed: Result[None, GraphError] = g.remove_node(first)
assert not g.contains_node(first)
assert not g.contains_edge(first, second)
DiGraph[T] and Dag[T] remove simple edges by endpoint pair:
removed_edge: Result[None, GraphError] = g.remove_edge(first, second)
Traverse reachable nodes¶
Use breadth-first traversal when edge distance from the start node matters. Use depth-first preorder when each node should appear before the nodes reached by continuing down that path.
from std.graph import DiGraph, NodeId
def collect_bfs(g: DiGraph[str], start: NodeId) -> list[NodeId]:
match g.bfs_nodes(start):
case Ok(nodes):
return nodes
case Err(_):
return []
def collect_dfs(g: DiGraph[str], start: NodeId) -> list[NodeId]:
match g.dfs_preorder_nodes(start):
case Ok(nodes):
return nodes
case Err(_):
return []
Traversal methods visit each reachable node at most once, even when cycles exist. Dag[T] traversal follows the same order contracts with the extra guarantee that the graph value itself is acyclic.
Schedule dependency work with Dag¶
Use topological order when every edge points from an earlier requirement to a later dependent:
from std.graph import Dag, NodeId
mut plan = Dag[str]()
fetch: NodeId = plan.add_node(payload="fetch")
parse: NodeId = plan.add_node(payload="parse")
plan.add_edge(from_=fetch, to=parse)
order: list[NodeId] = plan.topological_order()
Dag[T] rejects cycle-creating edges when they are inserted, so topological order is the normal scheduling view. DiGraph[T] and MultiDiGraph[T] can contain cycles, so their topological_order() methods return Result[list[NodeId], GraphError].
Keep parallel edges separate¶
Use MultiDiGraph[T] when multiple relationships can connect the same node pair and each one needs separate identity:
from std.graph import EdgeId, GraphError, MultiDiGraph, NodeId
mut g = MultiDiGraph[str]()
read: NodeId = g.add_node(payload="read")
write: NodeId = g.add_node(payload="write")
data: Result[EdgeId, GraphError] = g.add_edge(read, write)
control: Result[EdgeId, GraphError] = g.add_edge(read, write)
edges: Result[list[EdgeId], GraphError] = g.edges_between(read, write)
edges_between(from_=..., to=...) returns every active edge id from the source node to the destination node. edge_endpoints(edge) returns the source and destination node ids for one edge. If relationships need labels or attributes, store that data in your own payload model and keep the graph as the structural index.
Store graphs as data¶
Graphs compose with ordinary Incan models:
from std.graph import Dag, NodeId
pub model PipelinePlan:
pub deps: Dag[str]
pub tip: NodeId
def root_names(plan: PipelinePlan) -> list[str]:
mut names: list[str] = []
for nid in plan.deps.roots():
match plan.deps.node_payload(nid):
case Ok(name):
names.append(name)
case Err(_):
pass
return names
This keeps graph ownership explicit. It also keeps tests isolated because each test can construct only the graph it needs.