Source code for conda_workspaces.parsers.toml

"""Parser for conda.toml manifests and shared TOML helpers.

The ``CondaTomlParser`` handles ``conda.toml`` — the conda-native
manifest format for both workspace configuration and task definitions.

Helper functions for parsing channels, dependencies, environments,
and target overrides are shared with ``pixi_toml.py`` and
``pyproject_toml.py``.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import tomlkit

from ..exceptions import TaskNotFoundError, TaskParseError, WorkspaceParseError
from ..models import (
    Channel,
    Environment,
    Feature,
    MatchSpec,
    PyPIDependency,
)
from .base import ManifestParser
from .normalize import parse_tasks_and_targets

if TYPE_CHECKING:
    from pathlib import Path
    from typing import Any

    from tomlkit.items import InlineTable

    from ..models import Task, WorkspaceConfig

log = logging.getLogger(__name__)


[docs] class CondaTomlParser(ManifestParser): """Parse ``conda.toml`` manifests (workspace and tasks). This is the conda-native format that mirrors pixi.toml structure but uses ``[workspace]`` exclusively (no ``[project]`` fallback). """ filenames = ("conda.toml",)
[docs] def can_handle(self, path: Path) -> bool: return path.name in self.filenames
[docs] def has_workspace(self, path: Path) -> bool: if not path.exists(): return False try: data = tomlkit.loads(path.read_text(encoding="utf-8")) except Exception: return False return "workspace" in data
[docs] def parse(self, path: Path) -> WorkspaceConfig: # Import inline to avoid circular dependency (pixi_toml imports toml). from .pixi_toml import PixiTomlParser pixi_parser = PixiTomlParser() try: config = pixi_parser.parse(path) except WorkspaceParseError: raise except Exception as exc: raise WorkspaceParseError(path, str(exc)) from exc config.manifest_path = str(path) return config
[docs] def has_tasks(self, path: Path) -> bool: if not path.exists(): return False try: data = tomlkit.loads(path.read_text(encoding="utf-8")) except Exception: return False return bool(data.get("tasks"))
[docs] def parse_tasks(self, path: Path) -> dict[str, Task]: try: data = tomlkit.loads(path.read_text(encoding="utf-8")).unwrap() except Exception as exc: raise TaskParseError(str(path), str(exc)) from exc return parse_tasks_and_targets(data)
[docs] def add_task(self, path: Path, name: str, task: Task) -> None: if path.exists(): doc = tomlkit.loads(path.read_text(encoding="utf-8")) else: doc = tomlkit.document() tasks_section = doc.setdefault("tasks", tomlkit.table()) tasks_section[name] = self.task_to_toml_inline(task) path.write_text(tomlkit.dumps(doc), encoding="utf-8")
[docs] def remove_task(self, path: Path, name: str) -> None: doc = tomlkit.loads(path.read_text(encoding="utf-8")) tasks_section = doc.get("tasks", {}) if name not in tasks_section: raise TaskNotFoundError(name, list(tasks_section.keys())) del tasks_section[name] self.remove_target_overrides(doc, name) path.write_text(tomlkit.dumps(doc), encoding="utf-8")
[docs] def tasks_to_toml(tasks: dict[str, Task]) -> str: """Serialize a full task dict to ``conda.toml`` TOML string.""" parser = CondaTomlParser() doc = tomlkit.document() task_table = tomlkit.table() for name, task in tasks.items(): task_table.add(name, parser.task_to_toml_inline(task)) doc.add("tasks", task_table) targets: dict[str, dict[str, str | InlineTable]] = {} for name, task in tasks.items(): if not task.platforms: continue for platform, override in task.platforms.items(): override_table = tomlkit.inline_table() if override.cmd is not None: override_table.append("cmd", override.cmd) if override.env is not None: override_table.append("env", dict(override.env)) if override.cwd is not None: override_table.append("cwd", override.cwd) if override.clean_env is not None: override_table.append("clean-env", override.clean_env) if override.inputs is not None: override_table.append("inputs", list(override.inputs)) if override.outputs is not None: override_table.append("outputs", list(override.outputs)) if override.args is not None: override_table.append("args", [a.to_toml() for a in override.args]) if override.depends_on is not None: override_table.append( "depends-on", [d.to_toml() for d in override.depends_on], ) if len(override_table) == 1 and "cmd" in override_table: targets.setdefault(platform, {})[name] = str(override_table["cmd"]) else: targets.setdefault(platform, {})[name] = override_table for platform, platform_tasks in targets.items(): target_tbl = tomlkit.table(is_super_table=True) tasks_tbl = tomlkit.table() for tname, tval in platform_tasks.items(): tasks_tbl.add(tname, tval) target_tbl.add("tasks", tasks_tbl) doc.setdefault("target", tomlkit.table(is_super_table=True)).add( platform, target_tbl ) return tomlkit.dumps(doc)
def _parse_channels(raw: list[Any]) -> list[Channel]: """Parse a channels list, handling both strings and dicts.""" channels: list[Channel] = [] for item in raw: if isinstance(item, str): channels.append(Channel(item)) elif isinstance(item, dict): if "priority" in item: log.debug( "Channel priority is not supported by conda; " "ignoring priority=%s for channel '%s'", item["priority"], item["channel"], ) channels.append(Channel(item["channel"])) return channels def _parse_conda_deps(raw: dict[str, Any]) -> dict[str, MatchSpec]: """Parse conda dependency specs into MatchSpec objects.""" deps: dict[str, MatchSpec] = {} for name, spec in raw.items(): if isinstance(spec, str): deps[name] = MatchSpec(f"{name} {spec}".strip()) elif isinstance(spec, dict): version = spec.get("version", "") build = spec.get("build", "") parts = [name] if version: parts.append(version) if build: parts.append(build) deps[name] = MatchSpec(" ".join(parts)) else: deps[name] = MatchSpec(f"{name} {spec}") return deps def _parse_pypi_deps(raw: dict[str, Any]) -> dict[str, PyPIDependency]: """Parse PyPI dependency specs.""" deps: dict[str, PyPIDependency] = {} for name, spec in raw.items(): if isinstance(spec, str): deps[name] = PyPIDependency(name=name, spec=spec) elif isinstance(spec, dict): extras = spec.get("extras", []) deps[name] = PyPIDependency( name=name, spec=spec.get("version", ""), extras=tuple(extras) if extras else (), path=spec.get("path"), editable=spec.get("editable", False), git=spec.get("git"), url=spec.get("url"), ) else: deps[name] = PyPIDependency(name=name, spec=str(spec)) return deps def _parse_environment(name: str, raw: Any, path: Path) -> Environment: """Parse a single environment entry. Environments can be specified as: - A list of feature names: ``env = ["feat1", "feat2"]`` - A dict with keys: ``env = {features = [...]}`` """ if isinstance(raw, list): return Environment(name=name, features=raw) if isinstance(raw, dict): return Environment( name=name, features=list(raw.get("features", [])), no_default_feature=raw.get("no-default-feature", False), ) raise WorkspaceParseError( path, f"Invalid environment definition for '{name}': " f"expected list or dict, got {type(raw).__name__}", ) def _parse_target_overrides(target_data: dict[str, Any], feature: Feature) -> None: """Parse ``[target.<platform>]`` dep overrides into a feature.""" for platform, tdata in target_data.items(): conda = _parse_conda_deps(tdata.get("dependencies", {})) if conda: feature.target_conda_dependencies[platform] = conda pypi = _parse_pypi_deps(tdata.get("pypi-dependencies", {})) if pypi: feature.target_pypi_dependencies[platform] = pypi def _parse_feature(name: str, feat_data: dict[str, Any]) -> Feature: """Parse a single ``[feature.<name>]`` table into a Feature. Shared by ``PixiTomlParser`` and ``PyprojectTomlParser`` — the per-feature logic is identical once the data dict is resolved. """ feature = Feature(name=name) feature.conda_dependencies = _parse_conda_deps(feat_data.get("dependencies", {})) feature.pypi_dependencies = _parse_pypi_deps(feat_data.get("pypi-dependencies", {})) feature.channels = _parse_channels(feat_data.get("channels", [])) feature.platforms = list(feat_data.get("platforms", [])) sysreq = feat_data.get("system-requirements", {}) if sysreq: feature.system_requirements = {k: str(v) for k, v in sysreq.items()} activation = feat_data.get("activation", {}) if activation: feature.activation_scripts = list(activation.get("scripts", [])) feature.activation_env = dict(activation.get("env", {})) _parse_target_overrides(feat_data.get("target", {}), feature) return feature def _parse_features_and_envs( source: dict[str, Any], config: WorkspaceConfig, path: Path, ) -> None: """Parse features and environments from *source* into *config*. Adds the default feature (from top-level deps/activation/system-reqs), all named features, and all environments. Shared by ``PixiTomlParser`` and ``PyprojectTomlParser``. """ config.features[Feature.DEFAULT_NAME] = _parse_feature(Feature.DEFAULT_NAME, source) for feat_name, feat_data in source.get("feature", {}).items(): config.features[feat_name] = _parse_feature(feat_name, feat_data) envs_data = source.get("environments", {}) if envs_data: for env_name, env_val in envs_data.items(): config.environments[env_name] = _parse_environment(env_name, env_val, path) else: config.environments[Environment.DEFAULT_NAME] = Environment( name=Environment.DEFAULT_NAME )