Source code for conda_workspaces.graph

"""DAG resolution and topological sort for task dependencies."""

from __future__ import annotations

from collections import deque
from graphlib import CycleError, TopologicalSorter
from typing import TYPE_CHECKING

from .exceptions import CyclicDependencyError, TaskNotFoundError

if TYPE_CHECKING:
    from .models import Task


[docs] def resolve_execution_order( target: str, tasks: dict[str, Task], *, skip_deps: bool = False, ) -> list[str]: """Return task names in the order they should execute to run *target*. Uses :class:`graphlib.TopologicalSorter`. Only the transitive closure of *target*'s dependencies is included -- unrelated tasks are omitted. Raises ``TaskNotFoundError`` if *target* or any dependency is missing. Raises ``CyclicDependencyError`` if the dependency graph has a cycle. """ if target not in tasks: raise TaskNotFoundError(target, list(tasks.keys())) if skip_deps: return [target] reachable = _collect_reachable(target, tasks) return _topological_sort(reachable, tasks)
def _collect_reachable(target: str, tasks: dict[str, Task]) -> set[str]: """BFS to gather all tasks reachable via depends-on from *target*.""" visited: set[str] = set() queue = deque([target]) while queue: name = queue.popleft() if name in visited: continue if name not in tasks: raise TaskNotFoundError(name, list(tasks.keys())) visited.add(name) for dep in tasks[name].depends_on: if dep.task not in visited: queue.append(dep.task) return visited def _topological_sort(names: set[str], tasks: dict[str, Task]) -> list[str]: """Topologically sort over the subset *names*.""" graph = { name: sorted(dep.task for dep in tasks[name].depends_on if dep.task in names) for name in sorted(names) } try: return list(TopologicalSorter(graph).static_order()) except CycleError as exc: cycle = list(exc.args[1]) if len(exc.args) > 1 else sorted(names) raise CyclicDependencyError(cycle) from exc