Source code for conda_workspaces.manifests.toml

"""Parser for conda.toml manifests and shared TOML helpers.

The ``CondaTomlParser`` handles ``conda.toml`` — the conda-native
manifest format for both workspace configuration and task definitions.

Public helpers for parsing shared TOML workspace tables are reused by
``pixi_toml.py`` and ``pyproject_toml.py``.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import tomlkit

from ..exceptions import TaskParseError, WorkspaceParseError
from ..models import (
    ArchiveConfig,
    Channel,
    Environment,
    Feature,
    MatchSpec,
    PyPIDependency,
)
from .base import ManifestParser
from .normalize import parse_tasks_and_targets

if TYPE_CHECKING:
    from pathlib import Path
    from typing import Any, ClassVar, NoReturn

    from tomlkit.items import InlineTable

    from ..models import Task, WorkspaceConfig

log = logging.getLogger(__name__)


[docs] class CondaTomlParser(ManifestParser): """Parse ``conda.toml`` manifests (workspace and tasks). This is the conda-native format that mirrors pixi.toml structure but uses ``[workspace]`` exclusively (no ``[project]`` fallback). """ format_alias = "conda" filenames = ("conda.toml",) exporter_format = "conda-toml"
[docs] def can_handle(self, path: Path) -> bool: return path.name in self.filenames
[docs] def has_workspace(self, path: Path) -> bool: return "workspace" in self.read_toml(str(path))
[docs] def parse(self, path: Path) -> WorkspaceConfig: # Import inline to avoid circular dependency (pixi_toml imports toml). from .pixi_toml import PixiTomlParser pixi_parser = PixiTomlParser() try: config = pixi_parser.parse(path) except WorkspaceParseError: raise except Exception as exc: raise WorkspaceParseError(path, str(exc)) from exc config.manifest_path = str(path) return config
[docs] def has_tasks(self, path: Path) -> bool: return bool(self.read_toml(str(path)).get("tasks"))
[docs] def parse_tasks(self, path: Path) -> dict[str, Task]: try: data = tomlkit.loads(path.read_text(encoding="utf-8")).unwrap() except Exception as exc: raise TaskParseError(str(path), str(exc)) from exc return parse_tasks_and_targets(data)
[docs] def tasks_to_toml(tasks: dict[str, Task]) -> str: """Serialize a full task dict to ``conda.toml`` TOML string.""" parser = CondaTomlParser() doc = tomlkit.document() task_table = tomlkit.table() for name, task in tasks.items(): task_table.add(name, parser.task_to_toml_inline(task)) doc.add("tasks", task_table) targets: dict[str, dict[str, str | InlineTable]] = {} for name, task in tasks.items(): if not task.platforms: continue for platform, override in task.platforms.items(): override_table = tomlkit.inline_table() if override.cmd is not None: override_table.append("cmd", override.cmd) if override.env is not None: override_table.append("env", dict(override.env)) if override.cwd is not None: override_table.append("cwd", override.cwd) if override.clean_env is not None: override_table.append("clean-env", override.clean_env) if override.inputs is not None: override_table.append("inputs", list(override.inputs)) if override.outputs is not None: override_table.append("outputs", list(override.outputs)) if override.args is not None: override_table.append("args", [a.to_toml() for a in override.args]) if override.depends_on is not None: override_table.append( "depends-on", [d.to_toml() for d in override.depends_on], ) if len(override_table) == 1 and "cmd" in override_table: targets.setdefault(platform, {})[name] = str(override_table["cmd"]) else: targets.setdefault(platform, {})[name] = override_table for platform, platform_tasks in targets.items(): target_tbl = tomlkit.table(is_super_table=True) tasks_tbl = tomlkit.table() for tname, tval in platform_tasks.items(): tasks_tbl.add(tname, tval) target_tbl.add("tasks", tasks_tbl) doc.setdefault("target", tomlkit.table(is_super_table=True)).add( platform, target_tbl ) return tomlkit.dumps(doc)
[docs] def parse_archive_config(ws: dict[str, Any]) -> ArchiveConfig: """Parse ``[workspace.archive]`` into an ArchiveConfig.""" archive_data = ws.get("archive", {}) return ArchiveConfig( include=tuple(archive_data.get("include", [])), exclude=tuple(archive_data.get("exclude", [])), compression=archive_data.get("compression", "zst"), compression_level=archive_data.get("compression-level"), )
[docs] def parse_channels(raw: list[Any]) -> list[Channel]: """Parse a channels list, handling both strings and dicts.""" channels: list[Channel] = [] for item in raw: if isinstance(item, str): channels.append(Channel(item)) elif isinstance(item, dict): if "priority" in item: log.debug( "Channel priority is not supported by conda; " "ignoring priority=%s for channel '%s'", item["priority"], item["channel"], ) channels.append(Channel(item["channel"])) return channels
[docs] class WorkspaceDependencyResolver: """Resolve conda dependency tables with workspace inheritance. ``[workspace.dependencies]`` is a root-level pool. Entries in regular dependency tables opt in with ``{ workspace = true }``; after parsing, downstream code only sees concrete ``MatchSpec`` objects. """ spec_field_aliases: ClassVar[dict[str, str]] = { "version": "version", "build": "build", "build-number": "build_number", "build_number": "build_number", "channel": "channel", "subdir": "subdir", "md5": "md5", "sha256": "sha256", "url": "url", "fn": "fn", "file-name": "fn", "license": "license", "license-family": "license_family", "license_family": "license_family", "features": "features", "track-features": "track_features", "track_features": "track_features", } source_spec_fields: ClassVar[set[str]] = { "branch", "extras", "flags", "git", "path", "rev", "subdirectory", "tag", } def __init__( self, *, workspace_dependencies: dict[str, Any] | None = None, path: Path | None = None, ) -> None: self.workspace_dependencies_raw = workspace_dependencies or {} self.path = path self.workspace_dependencies = self.parse_dependency_table( self.workspace_dependencies_raw, allow_inheritance=False, table_name="[workspace.dependencies]", )
[docs] def parse_dependency_table( self, raw: dict[str, Any], *, allow_inheritance: bool = True, table_name: str = "[dependencies]", ) -> dict[str, MatchSpec]: """Parse a dependency table into ``MatchSpec`` objects.""" deps: dict[str, MatchSpec] = {} for name, spec in raw.items(): deps[name] = self.parse_dependency( name, spec, allow_inheritance=allow_inheritance, table_name=table_name, ) return deps
[docs] def parse_dependency( self, name: str, spec: Any, *, allow_inheritance: bool, table_name: str, ) -> MatchSpec: """Parse one dependency entry.""" if isinstance(spec, str): return MatchSpec(f"{name} {spec}".strip()) if not isinstance(spec, dict): return MatchSpec(f"{name} {spec}") if "workspace" not in spec: fields = self.spec_fields(name, spec, strict_unsupported=False) return self.match_spec_from_fields(name, fields) if not allow_inheritance: self.error(f"{table_name}.{name} cannot use `workspace = true`.") workspace = spec["workspace"] if workspace is not True: self.error( f"{table_name}.{name} sets `workspace = {workspace!r}`; " "`workspace` can only be true." ) if "version" in spec: self.error( f"{table_name}.{name} cannot set both `workspace = true` and `version`." ) if name not in self.workspace_dependencies_raw: self.error( f"{table_name}.{name} inherits from [workspace.dependencies], " f"but no workspace dependency named '{name}' exists." ) base_spec = self.workspace_dependencies_raw[name] self.reject_source_fields(name, base_spec, "[workspace.dependencies]") self.reject_source_fields(name, spec, table_name) base_fields = self.spec_fields(name, base_spec, strict_unsupported=True) override_fields = self.spec_fields( name, {k: v for k, v in spec.items() if k != "workspace"}, strict_unsupported=True, ) fields = {**base_fields, **override_fields} return self.match_spec_from_fields(name, fields)
[docs] def spec_fields( self, name: str, spec: Any, *, strict_unsupported: bool, ) -> dict[str, Any]: """Return conda ``MatchSpec`` keyword fields for one TOML dependency.""" if isinstance(spec, str): return {"version": spec} if not isinstance(spec, dict): return {"version": str(spec)} unsupported = set(spec) - set(self.spec_field_aliases) - {"workspace"} if strict_unsupported and unsupported: fields = ", ".join(sorted(unsupported)) self.error( f"Conda dependency '{name}' uses unsupported field(s): {fields}." ) fields: dict[str, Any] = {} for raw_key, value in spec.items(): key = self.spec_field_aliases.get(raw_key) if key is None: continue if value is None or value == "": continue if isinstance(value, list): value = tuple(value) fields[key] = value return fields
[docs] def reject_source_fields(self, name: str, spec: Any, table_name: str) -> None: """Reject pixi source-package fields when inheritance would consume them.""" if not isinstance(spec, dict): return unsupported = sorted(set(spec) & self.source_spec_fields) if not unsupported: return fields = ", ".join(unsupported) self.error( f"{table_name}.{name} uses source dependency field(s) unsupported by " f"conda-workspaces inheritance: {fields}." )
[docs] def match_spec_from_fields(self, name: str, fields: dict[str, Any]) -> MatchSpec: """Construct a ``MatchSpec`` and wrap parse errors with manifest context.""" try: return MatchSpec(name=name, **fields) except Exception as exc: self.error(f"Invalid conda dependency '{name}': {exc}")
[docs] def error(self, message: str) -> NoReturn: """Raise a manifest parse error when a path is available.""" if self.path is not None: raise WorkspaceParseError(self.path, message) raise ValueError(message)
[docs] def parse_pypi_dependencies(raw: dict[str, Any]) -> dict[str, PyPIDependency]: """Parse PyPI dependency specs.""" deps: dict[str, PyPIDependency] = {} for name, spec in raw.items(): if isinstance(spec, str): deps[name] = PyPIDependency(name=name, spec=spec) elif isinstance(spec, dict): extras = spec.get("extras", []) deps[name] = PyPIDependency( name=name, spec=spec.get("version", ""), extras=tuple(extras) if extras else (), path=spec.get("path"), editable=spec.get("editable", False), git=spec.get("git"), url=spec.get("url"), ) else: deps[name] = PyPIDependency(name=name, spec=str(spec)) return deps
[docs] def parse_environment(name: str, raw: Any, path: Path) -> Environment: """Parse a single environment entry. Environments can be specified as: - A list of feature names: ``env = ["feat1", "feat2"]`` - A dict with keys: ``env = {features = [...]}`` """ if isinstance(raw, list): return Environment(name=name, features=raw) if isinstance(raw, dict): return Environment( name=name, features=list(raw.get("features", [])), no_default_feature=raw.get("no-default-feature", False), ) raise WorkspaceParseError( path, f"Invalid environment definition for '{name}': " f"expected list or dict, got {type(raw).__name__}", )
[docs] def parse_target_overrides( target_data: dict[str, Any], feature: Feature, resolver: WorkspaceDependencyResolver | None = None, ) -> None: """Parse ``[target.<platform>]`` dep overrides into a feature.""" resolver = resolver or WorkspaceDependencyResolver() for platform, tdata in target_data.items(): if "system-requirements" in tdata: resolver.error( f"[target.{platform}.system-requirements] is not supported; " "use rich [workspace].platforms entries or " "[feature.<name>.system-requirements]." ) conda = resolver.parse_dependency_table( tdata.get("dependencies", {}), table_name=f"[target.{platform}.dependencies]", ) if conda: feature.target_conda_dependencies[platform] = conda pypi = parse_pypi_dependencies(tdata.get("pypi-dependencies", {})) if pypi: feature.target_pypi_dependencies[platform] = pypi
[docs] def parse_feature( name: str, feat_data: dict[str, Any], parser: ManifestParser, resolver: WorkspaceDependencyResolver | None = None, ) -> Feature: """Parse a single ``[feature.<name>]`` table into a Feature. Shared by ``PixiTomlParser`` and ``PyprojectTomlParser`` — the per-feature logic is identical once the data dict is resolved. """ resolver = resolver or WorkspaceDependencyResolver() feature = Feature(name=name) table_name = ( "[dependencies]" if name == Feature.DEFAULT_NAME else f"[feature.{name}.dependencies]" ) feature.conda_dependencies = resolver.parse_dependency_table( feat_data.get("dependencies", {}), table_name=table_name, ) feature.pypi_dependencies = parse_pypi_dependencies( feat_data.get("pypi-dependencies", {}) ) feature.channels = parse_channels(feat_data.get("channels", [])) feature.platforms = list(feat_data.get("platforms", [])) sysreq = feat_data.get("system-requirements", {}) if sysreq: feature.system_requirements = parser.parse_system_requirements(sysreq) activation = feat_data.get("activation", {}) if activation: feature.activation_scripts = list(activation.get("scripts", [])) feature.activation_env = dict(activation.get("env", {})) parse_target_overrides(feat_data.get("target", {}), feature, resolver) return feature
[docs] def parse_features_and_envs( source: dict[str, Any], config: WorkspaceConfig, path: Path, parser: ManifestParser, ) -> None: """Parse features and environments from *source* into *config*. Adds the default feature (from top-level deps/activation/system-reqs), all named features, and all environments. Shared by ``PixiTomlParser`` and ``PyprojectTomlParser``. """ resolver = WorkspaceDependencyResolver( workspace_dependencies=source.get("workspace", {}).get("dependencies", {}), path=path, ) config.workspace_dependencies = resolver.workspace_dependencies config.features[Feature.DEFAULT_NAME] = parse_feature( Feature.DEFAULT_NAME, source, parser, resolver, ) for feat_name, feat_data in source.get("feature", {}).items(): config.features[feat_name] = parse_feature( feat_name, feat_data, parser, resolver, ) envs_data = source.get("environments", {}) if envs_data: for env_name, env_val in envs_data.items(): config.environments[env_name] = parse_environment(env_name, env_val, path) else: config.environments[Environment.DEFAULT_NAME] = Environment( name=Environment.DEFAULT_NAME )