Source code for conda_tasks.graph
"""DAG resolution and topological sort for task dependencies."""
from __future__ import annotations
from collections import deque
from typing import TYPE_CHECKING
from .exceptions import CyclicDependencyError, TaskNotFoundError
if TYPE_CHECKING:
from .models import Task
[docs]
def resolve_execution_order(
target: str,
tasks: dict[str, Task],
*,
skip_deps: bool = False,
) -> list[str]:
"""Return task names in the order they should execute to run *target*.
Uses Kahn's algorithm for topological sort. Only the transitive
closure of *target*'s dependencies is included -- unrelated tasks
are omitted.
Raises ``TaskNotFoundError`` if *target* or any dependency is missing.
Raises ``CyclicDependencyError`` if the dependency graph has a cycle.
"""
if target not in tasks:
raise TaskNotFoundError(target, list(tasks.keys()))
if skip_deps:
return [target]
reachable = _collect_reachable(target, tasks)
return _topological_sort(reachable, tasks)
def _collect_reachable(target: str, tasks: dict[str, Task]) -> set[str]:
"""BFS to gather all tasks reachable via depends-on from *target*."""
visited: set[str] = set()
queue = deque([target])
while queue:
name = queue.popleft()
if name in visited:
continue
if name not in tasks:
raise TaskNotFoundError(name, list(tasks.keys()))
visited.add(name)
for dep in tasks[name].depends_on:
if dep.task not in visited:
queue.append(dep.task)
return visited
def _topological_sort(names: set[str], tasks: dict[str, Task]) -> list[str]:
"""Kahn's algorithm over the subset *names*."""
in_degree: dict[str, int] = {n: 0 for n in names}
adjacency: dict[str, list[str]] = {n: [] for n in names}
for name in names:
for dep in tasks[name].depends_on:
if dep.task in names:
adjacency[dep.task].append(name)
in_degree[name] += 1
queue: deque[str] = deque(sorted(n for n in names if in_degree[n] == 0))
order: list[str] = []
while queue:
node = queue.popleft()
order.append(node)
for successor in sorted(adjacency[node]):
in_degree[successor] -= 1
if in_degree[successor] == 0:
queue.append(successor)
if len(order) != len(names):
remaining = names - set(order)
cycle = _find_cycle(remaining, tasks)
raise CyclicDependencyError(cycle)
return order
def _find_cycle(names: set[str], tasks: dict[str, Task]) -> list[str]:
"""Find and return one cycle in the dependency graph as a path."""
visited: set[str] = set()
path: list[str] = []
on_stack: set[str] = set()
def dfs(node: str) -> list[str] | None:
visited.add(node)
on_stack.add(node)
path.append(node)
for dep in tasks[node].depends_on:
if dep.task not in names:
continue
if dep.task in on_stack:
idx = path.index(dep.task)
return path[idx:] + [dep.task]
if dep.task not in visited:
result = dfs(dep.task)
if result is not None:
return result
path.pop()
on_stack.discard(node)
return None
for name in sorted(names):
if name not in visited:
result = dfs(name)
if result is not None:
return result
return list(names)