"""Archive creation and extraction for conda workspaces.
Provides functions for collecting workspace files, creating tar archives
(gzip or zstandard), extracting with path traversal protection, bundling
conda packages for offline use, and inspecting archive contents.
"""
from __future__ import annotations
import fnmatch
import hashlib
import importlib
import shutil
import subprocess
import tarfile
import tempfile
from contextlib import contextmanager
from dataclasses import dataclass
from os.path import expanduser
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import TYPE_CHECKING
from urllib.parse import urlsplit
from conda_lockfiles.load_yaml import load_yaml
from .exceptions import (
ArchiveError,
ArchiveHashMismatchError,
ArchivePathTraversalError,
)
from .paths import has_absolute_path_syntax, is_path_segment, parse_relative_posix_path
if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from typing import Any
from .context import WorkspaceContext
from .models import ArchiveConfig
from .receipts import ArchiveReceipt
ARCHIVE_SUFFIXES: tuple[str, ...] = (
".tar.zst",
".tar.zstd",
".tar.gz",
".tgz",
".tar.bz2",
)
"""Recognised archive filename suffixes, longest first."""
MANIFEST_FILENAMES = {"conda.toml", "pixi.toml", "pyproject.toml"}
"""Filenames recognised as workspace manifests inside an archive."""
CONDA_PACKAGE_SUFFIXES: tuple[str, ...] = (".conda", ".tar.bz2")
"""Recognised conda package archive suffixes."""
ALLOWED_TAR_TYPES: frozenset[bytes] = frozenset(
{
tarfile.REGTYPE,
tarfile.AREGTYPE,
tarfile.DIRTYPE,
tarfile.SYMTYPE,
tarfile.LNKTYPE,
}
)
"""Tar member types accepted during extraction."""
BUILTIN_EXCLUDE_DIRS: frozenset[str] = frozenset(
{
".git",
".conda/envs",
".pixi",
"__pycache__",
}
)
"""Directories excluded from archives regardless of user configuration."""
BUILTIN_SENSITIVE_EXCLUDE_PATTERNS: tuple[str, ...] = (
".env",
"*/.env",
".env.*",
"*/.env.*",
".aws",
"*/.aws",
".azure",
"*/.azure",
".config/gcloud",
"*/.config/gcloud",
".docker",
"*/.docker",
".gnupg",
"*/.gnupg",
".kube",
"*/.kube",
".ssh",
"*/.ssh",
".terraform",
"*/.terraform",
".condarc",
"*/.condarc",
".git-credentials",
"*/.git-credentials",
".netrc",
"*/.netrc",
".npmrc",
"*/.npmrc",
".pypirc",
"*/.pypirc",
"id_dsa",
"*/id_dsa",
"id_ecdsa",
"*/id_ecdsa",
"id_ed25519",
"*/id_ed25519",
"id_rsa",
"*/id_rsa",
"kubeconfig",
"*/kubeconfig",
"*.kubeconfig",
"*.key",
"*.keystore",
"*.jks",
"*.p12",
"*.pem",
"*.pfx",
"*.secret",
"*.secrets",
"*.tfstate",
"*.tfstate.*",
"secrets",
"*/secrets",
"secrets.*",
"*/secrets.*",
)
"""Common credential material excluded from archives by default."""
BUILTIN_SENSITIVE_EXCLUDE_EXCEPTIONS: tuple[str, ...] = (
".env.dist",
"*/.env.dist",
".env.example",
"*/.env.example",
".env.sample",
"*/.env.sample",
".env.template",
"*/.env.template",
)
"""Documented dotenv examples that are safe to keep in archives."""
[docs]
@dataclass(frozen=True)
class WorkspaceArchiveInstallResult:
"""Result returned by :meth:`WorkspaceArchive.install`."""
target: Path
environment: str | None
install_prefix: Path | None
runtime_prefix: str | None
receipt_path: Path | None
verified: bool
info: dict[str, object]
return_code: int = 0
primed_packages: int = 0
cache_priming_skipped: bool = False
prefix_reference_matches: tuple[Path, ...] = ()
prefix_reference_matches_truncated: bool = False
[docs]
@dataclass(frozen=True)
class WorkspaceArchive:
"""High-level API for creating, extracting, and installing archives."""
path: Path
receipt: bool | str | Path | None = None
def __init__(self, path: str | Path, receipt: bool | str | Path | None = None):
object.__setattr__(self, "path", Path(path).expanduser().resolve())
object.__setattr__(self, "receipt", receipt)
[docs]
@classmethod
def create(
cls,
*,
workspace: str | Path | None = None,
output: str | Path | None = None,
lock: bool = False,
bundle: bool = False,
exclude: tuple[str, ...] = (),
receipt: bool | str | Path | None = None,
) -> WorkspaceArchive:
"""Create an archive for *workspace* and return its handle."""
from .context import WorkspaceContext
from .lockfile import generate_lockfile, lockfile_path
from .manifests import detect_and_parse
from .models import ArchiveConfig
_, config = detect_and_parse(workspace)
ctx = WorkspaceContext(config)
if lock:
from .resolver import resolve_all_environments
resolved_envs = resolve_all_environments(config, ctx.platform)
generate_lockfile(ctx, resolved_envs, config=config)
archive_config = ArchiveConfig(
include=config.archive.include,
exclude=config.archive.exclude + tuple(exclude),
compression=config.archive.compression,
compression_level=config.archive.compression_level,
)
output_path = cls.default_output_path(ctx, output)
archive = cls(output_path, receipt=receipt)
receipt_path = archive.receipt_path
manifest_path = Path(config.manifest_path)
lock_path = lockfile_path(ctx)
if receipt_path is not None:
archive.validate_receipt_inputs(
root=ctx.root,
output=output_path,
archive_config=archive_config,
manifest_path=manifest_path,
lockfile_path=lock_path,
receipt_path=receipt_path,
)
bundle_packages = None
if bundle:
from conda.base.context import context as conda_context
if not lock_path.is_file():
raise ArchiveError(
"Cannot bundle packages: no conda.lock found.",
hints=["Run 'conda workspace lock' first."],
)
cache_dirs = [Path(d) for d in conda_context.pkgs_dirs]
bundle_packages = collect_bundle_packages(lock_path, cache_dirs)
verify_package_hashes(bundle_packages, lock_path)
archive_path = create_archive(
ctx.root,
output_path,
archive_config,
bundle_packages=bundle_packages,
)
if receipt_path is not None:
receipt_obj = cls.build_receipt(
ctx=ctx,
archive_path=archive_path,
archive_config=archive_config,
manifest_path=manifest_path,
lockfile_path=lock_path,
options={
"bundle": bundle,
"lock": lock,
"include": list(archive_config.include),
"exclude": list(archive_config.exclude),
"compressionLevel": archive_config.compression_level,
},
)
receipt_obj.write(receipt_path)
return cls(archive_path, receipt=receipt_path)
[docs]
@staticmethod
def default_output_path(ctx: WorkspaceContext, output: str | Path | None) -> Path:
"""Return the explicit or workspace-name-derived output path."""
if output is not None:
return Path(output)
name = ctx.config.name or ctx.root.name
if not is_path_segment(name):
raise ArchiveError(
"Workspace name cannot be used as a default archive filename.",
hints=[
"Use a simple workspace name without path separators,",
"or pass -o/--output to choose the archive path explicitly.",
],
)
ext = {"zst": ".tar.zst", "gz": ".tar.gz", "bz2": ".tar.bz2"}.get(
ctx.config.archive.compression,
".tar.zst",
)
return ctx.root / f"{name}{ext}"
[docs]
@staticmethod
def build_receipt(
*,
ctx: WorkspaceContext,
archive_path: Path,
archive_config: ArchiveConfig,
manifest_path: Path,
lockfile_path: Path,
options: dict[str, object],
) -> ArchiveReceipt:
"""Build the external receipt for a newly created archive."""
from .receipts import ArchiveReceipt
return ArchiveReceipt.build(
root=ctx.root,
archive_path=archive_path,
archive_config=archive_config,
manifest_path=manifest_path,
lockfile_path=lockfile_path,
environment_prefixes=receipt_environment_prefixes(
config_environments=list(ctx.config.environments),
ctx_root=ctx.root,
env_prefix=ctx.env_prefix,
),
options=options,
)
@property
def receipt_path(self) -> Path | None:
"""Return the configured external receipt path, if any."""
return resolve_receipt_path(self.path, self.receipt)
[docs]
def default_target(self, cwd: str | Path | None = None) -> Path:
"""Return the default extraction target derived from the archive name."""
stem = self.path.name
for suffix in ARCHIVE_SUFFIXES:
if stem.endswith(suffix):
stem = stem[: -len(suffix)]
break
return Path.cwd() / stem if cwd is None else Path(cwd) / stem
[docs]
def inspect(self) -> dict[str, object]:
"""Return archive metadata without extracting it."""
archive_path = self.require_existing_archive()
return inspect_archive(archive_path)
[docs]
def verify(self) -> ArchiveReceipt:
"""Verify the archive against its external receipt."""
receipt_path = self.receipt_path
if receipt_path is None:
raise ArchiveError("--receipt is required to verify an archive.")
from .receipts import ArchiveReceipt
receipt = ArchiveReceipt.load(receipt_path)
receipt.verify_archive(self.require_existing_archive())
return receipt
[docs]
def install(
self,
*,
target: str | Path | None = None,
environment: str | None = None,
prefix: str | Path | None = None,
dest: str | Path | None = None,
require_sha256: bool = False,
prime_cache: bool = True,
package_cache: str | Path | None = None,
install_handler: Callable[[Path, str | None, Path | None, str | None], int]
| None = None,
) -> WorkspaceArchiveInstallResult:
"""Extract the archive and install environments from its lockfile."""
final_prefix = str(prefix) if prefix is not None else None
if final_prefix is not None and not environment:
raise ArchiveError(
"--prefix requires an explicit environment.",
hints=["Pass -e/--environment with --prefix."],
)
if dest is not None and final_prefix is None:
raise ArchiveError(
"--dest requires --prefix.",
hints=[
"Pass --prefix to declare the final runtime prefix for"
" the selected environment.",
],
)
if final_prefix is not None:
final_prefix = expanduser(final_prefix)
if not is_absolute_runtime_prefix(final_prefix):
raise ArchiveError(
"--prefix must be an absolute path.",
hints=["Pass an absolute runtime prefix such as /opt/runtime."],
)
extract_result = self.extract(
target=target,
require_sha256=require_sha256,
prime_cache=prime_cache,
package_cache=package_cache,
)
install_prefix = Path(final_prefix) if final_prefix is not None else None
runtime_prefix = None
if final_prefix is not None:
if dest is not None:
dest_path = Path(dest).expanduser().resolve()
install_prefix = dest_path / runtime_prefix_relative_path(final_prefix)
runtime_prefix = final_prefix
elif str(install_prefix) != final_prefix:
runtime_prefix = final_prefix
handler = install_handler or self.install_from_lockfile
return_code = handler(
extract_result.target,
environment,
install_prefix,
runtime_prefix,
)
prefix_matches: tuple[Path, ...] = ()
prefix_matches_truncated = False
if (
return_code == 0
and install_prefix is not None
and runtime_prefix is not None
):
matches, prefix_matches_truncated = scan_prefix_references(
install_prefix,
install_prefix,
)
prefix_matches = tuple(matches)
return WorkspaceArchiveInstallResult(
target=extract_result.target,
environment=environment,
install_prefix=install_prefix,
runtime_prefix=runtime_prefix,
receipt_path=extract_result.receipt_path,
verified=extract_result.verified,
info=extract_result.info,
return_code=return_code,
primed_packages=extract_result.primed_packages,
cache_priming_skipped=extract_result.cache_priming_skipped,
prefix_reference_matches=prefix_matches,
prefix_reference_matches_truncated=prefix_matches_truncated,
)
[docs]
@staticmethod
def install_from_lockfile(
workspace: Path,
environment: str | None,
prefix: Path | None,
target_prefix_override: str | None,
) -> int:
"""Install workspace environments from ``conda.lock`` without the CLI."""
from .context import WorkspaceContext
from .lockfile import install_from_lockfile
from .manifests import detect_and_parse
_, config = detect_and_parse(workspace)
ctx = WorkspaceContext(config)
if environment is not None:
install_from_lockfile(
ctx,
environment,
prefix=prefix,
target_prefix_override=target_prefix_override,
)
return 0
for name in config.environments:
install_from_lockfile(ctx, name)
return 0
[docs]
def require_existing_archive(self) -> Path:
"""Return *path* after verifying that it points to an archive file."""
if not self.path.is_file():
raise ArchiveError(f"Archive not found: {self.path}")
return self.path
def is_absolute_runtime_prefix(prefix: str) -> bool:
"""Return whether *prefix* is absolute as a POSIX or Windows path."""
return has_absolute_path_syntax(prefix)
def runtime_prefix_relative_path(prefix: str) -> Path:
"""Return *prefix* relative to its root using host path separators."""
posix_prefix = PurePosixPath(prefix)
if posix_prefix.is_absolute():
return Path(*posix_prefix.relative_to(posix_prefix.anchor).parts)
windows_prefix = PureWindowsPath(prefix)
return Path(*windows_prefix.relative_to(windows_prefix.anchor).parts)
def file_contains_bytes(
path: Path, needle: bytes, *, chunk_size: int = 1024 * 1024
) -> bool:
"""Return whether *path* contains *needle* without loading it all at once."""
if not needle:
return False
overlap = b""
try:
with path.open("rb") as fh:
while chunk := fh.read(chunk_size):
data = overlap + chunk
if needle in data:
return True
overlap = data[-(len(needle) - 1) :] if len(needle) > 1 else b""
except OSError:
return False
return False
def scan_prefix_references(
root: Path,
prefix: Path,
*,
limit: int = 10,
) -> tuple[list[Path], bool]:
"""Find files below *root* that still contain *prefix* as bytes."""
if not root.is_dir():
return [], False
needle = str(prefix).encode()
matches: list[Path] = []
for path in root.rglob("*"):
if path.is_symlink() or not path.is_file():
continue
if file_contains_bytes(path, needle):
matches.append(path)
if len(matches) > limit:
return matches[:limit], True
return matches, False
def resolve_receipt_path(archive_path: Path, receipt: object) -> Path | None:
"""Resolve an optional ``--receipt [PATH]`` style value."""
if receipt in (None, False):
return None
if receipt is True:
from .receipts import ArchiveReceipt
return ArchiveReceipt.default_path(archive_path)
if isinstance(receipt, Path):
return receipt
if isinstance(receipt, str):
return Path(receipt)
raise ArchiveError("Invalid --receipt value.")
def receipt_environment_prefixes(
*,
config_environments: list[str],
ctx_root: Path,
env_prefix: Callable[[str], Path],
) -> dict[str, str]:
"""Return environment prefixes to record in a receipt predicate."""
prefixes: dict[str, str] = {}
for name in config_environments:
prefix = env_prefix(name)
try:
prefixes[name] = prefix.relative_to(ctx_root).as_posix()
except ValueError:
prefixes[name] = prefix.as_posix()
return prefixes
def extract_verified_archive(
archive_path: Path,
target: Path,
receipt: ArchiveReceipt,
*,
require_sha256: bool = False,
) -> Path:
"""Extract to a staging directory, verify, then move into *target*."""
ensure_extract_target_empty(target)
target = target.resolve()
target.parent.mkdir(parents=True, exist_ok=True)
staged = Path(tempfile.mkdtemp(prefix=f".{target.name}.verify-", dir=target.parent))
try:
extract_archive(archive_path, staged)
receipt.verify_extracted(staged, require_sha256=require_sha256)
if target.exists():
target.rmdir()
staged.rename(target)
except BaseException:
shutil.rmtree(staged, ignore_errors=True)
raise
return target
def parse_relative_archive_path(
path: str,
*,
allow_parent: bool = False,
) -> PurePosixPath:
"""Return *path* as a validated POSIX archive path.
Tar members and receipt paths use POSIX separators regardless of the
host OS. Keeping this policy in one helper lets extraction and receipt
verification reject the same ambiguous path syntax while raising their
own domain-specific errors.
"""
try:
return parse_relative_posix_path(
path,
allow_parent=allow_parent,
require_canonical=True,
)
except ValueError as exc:
raise ValueError(f"Invalid relative archive path: {path!r}") from exc
def is_git_repo(root: Path) -> bool:
"""Return True if *root* is inside a git working tree."""
try:
result = subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=root,
capture_output=True,
text=True,
)
return result.returncode == 0 and result.stdout.strip() == "true"
except FileNotFoundError:
return False
def git_tracked_files(root: Path) -> list[Path]:
"""Return absolute paths for all git-tracked files under *root*."""
result = subprocess.run(
["git", "ls-files", "-z"],
cwd=root,
capture_output=True,
text=True,
check=True,
)
paths = []
for entry in result.stdout.split("\0"):
if entry:
full = root / entry
if full.is_file():
paths.append(full)
return paths
def is_excluded_by_builtins(rel_path: str) -> bool:
"""Return True if *rel_path* falls under a builtin-excluded directory."""
for excl in BUILTIN_EXCLUDE_DIRS:
if rel_path == excl or rel_path.startswith(excl + "/"):
return True
if matches_patterns(rel_path, BUILTIN_SENSITIVE_EXCLUDE_EXCEPTIONS):
return False
return matches_patterns(rel_path, BUILTIN_SENSITIVE_EXCLUDE_PATTERNS)
def matches_patterns(rel_path: str, patterns: tuple[str, ...]) -> bool:
"""Return True if *rel_path* or any parent matches one glob pattern."""
for pattern in patterns:
if fnmatch.fnmatch(rel_path, pattern):
return True
parts = rel_path.split("/")
for i in range(len(parts)):
partial = "/".join(parts[: i + 1])
if fnmatch.fnmatch(partial, pattern):
return True
return False
def collect_archive_files(
root: Path,
archive_config: ArchiveConfig,
) -> list[Path]:
"""Collect workspace files eligible for archiving.
In git repos, only tracked files are included. Otherwise all files
under *root* are considered, filtered by builtin and user excludes.
"""
if is_git_repo(root):
candidates = git_tracked_files(root)
else:
candidates = [p for p in root.rglob("*") if p.is_file()]
result: list[Path] = []
for path in candidates:
rel = path.relative_to(root).as_posix()
if is_excluded_by_builtins(rel):
continue
if archive_config.include and not matches_patterns(rel, archive_config.include):
continue
if matches_patterns(rel, archive_config.exclude):
continue
result.append(path)
return sorted(result)
def detect_compression(output: Path) -> str:
"""Infer compression format from the archive filename extension."""
name = output.name
if name.endswith(".tar.zst") or name.endswith(".tar.zstd"):
return "zst"
if name.endswith(".tar.gz") or name.endswith(".tgz"):
return "gz"
if name.endswith(".tar.bz2"):
return "bz2"
return "zst"
def tarfile_supports_zstd() -> bool:
"""Return True when this Python's tarfile module can open zstd archives."""
return "zst" in tarfile.TarFile.OPEN_METH
def zstd_module() -> Any:
"""Return the stdlib or backport zstd module."""
for module_name in ("compression.zstd", "backports.zstd"):
try:
return importlib.import_module(module_name)
except ImportError:
continue
raise ArchiveError(
"Zstandard archive support is not available.",
hints=[
"Install backports.zstd for Python versions before 3.14,",
"or choose an archive name ending in .tar.gz or .tar.bz2.",
],
)
@contextmanager
def open_tar_for_write(
output: Path, compression: str, compression_level: int | None
) -> Iterator[tarfile.TarFile]:
"""Open a tar archive for writing, optionally setting compression level."""
if compression == "zst" and not tarfile_supports_zstd():
with zstd_module().open(output, "wb", level=compression_level) as compressed:
with tarfile.open(fileobj=compressed, mode="w:") as tf:
yield tf
return
mode = f"w:{compression}"
kwargs = {}
if compression_level is not None:
kwargs["compresslevel"] = compression_level
with tarfile.open(output, mode, **kwargs) as tf: # ty: ignore[no-matching-overload]
yield tf
def create_archive(
root: Path,
output: Path,
archive_config: ArchiveConfig,
*,
bundle_packages: list[Path] | None = None,
) -> Path:
"""Create a tar archive of the workspace at *root*.
Writes to *output*, creating parent directories as needed.
If *bundle_packages* is provided, the listed conda package archives
are added under a ``packages/`` prefix inside the archive.
"""
output = output.resolve()
output.parent.mkdir(parents=True, exist_ok=True)
files = collect_archive_files(root, archive_config)
files = [f for f in files if f.resolve() != output]
compression = detect_compression(output)
with open_tar_for_write(
output, compression, archive_config.compression_level
) as tf:
add_files_to_tar(tf, root, files)
if bundle_packages:
add_packages_to_tar(tf, bundle_packages)
return output
def add_files_to_tar(tf: tarfile.TarFile, root: Path, files: list[Path]) -> None:
"""Add workspace *files* to the tar, using paths relative to *root*."""
for path in files:
arcname = path.relative_to(root).as_posix()
tf.add(str(path), arcname=arcname)
def add_packages_to_tar(tf: tarfile.TarFile, packages: list[Path]) -> None:
"""Add conda package archives under the ``packages/`` archive prefix."""
for pkg in packages:
arcname = f"packages/{pkg.name}"
tf.add(str(pkg), arcname=arcname)
def validate_tar_member(member: tarfile.TarInfo, target: Path) -> None:
"""Raise :class:`ArchivePathTraversalError` if *member* escapes *target*.
Checks for disallowed file types (device nodes, FIFOs, etc.),
absolute paths, ``..`` components, and symlink targets.
"""
if member.type not in ALLOWED_TAR_TYPES:
raise ArchivePathTraversalError(member.name)
try:
member_path = parse_relative_archive_path(member.name)
except ValueError:
raise ArchivePathTraversalError(member.name) from None
try:
resolved = target.joinpath(*member_path.parts).resolve()
resolved.relative_to(target.resolve())
except ValueError:
raise ArchivePathTraversalError(member.name)
if member.issym() or member.islnk():
try:
link_target = parse_relative_archive_path(
member.linkname,
allow_parent=True,
)
except ValueError:
raise ArchivePathTraversalError(member.name) from None
resolved_link = target.joinpath(
*member_path.parent.parts,
*link_target.parts,
).resolve()
try:
resolved_link.relative_to(target.resolve())
except ValueError:
raise ArchivePathTraversalError(member.name)
@contextmanager
def open_tar(archive_path: Path) -> Iterator[tarfile.TarFile]:
"""Open a tar archive, handling zstandard decompression transparently."""
compression = detect_compression(archive_path)
if compression == "zst" and not tarfile_supports_zstd():
with zstd_module().open(archive_path, "rb") as compressed:
with tarfile.open(fileobj=compressed, mode="r:") as tf:
yield tf
return
with tarfile.open( # ty: ignore[no-matching-overload]
archive_path, f"r:{compression}"
) as tf:
yield tf
def ensure_extract_target_empty(target: Path) -> None:
"""Reject archive extraction into non-empty or unsafe targets."""
if target.is_symlink():
raise ArchiveError("Cannot extract archive into an existing symlink target.")
if not target.exists():
return
if not target.is_dir():
raise ArchiveError(
"Cannot extract archive into an existing non-directory target."
)
try:
target_has_files = any(target.iterdir())
except OSError as exc:
raise ArchiveError(
f"Cannot inspect target before archive extraction: {target}"
) from exc
if target_has_files:
raise ArchiveError(
"Cannot extract archive into a non-empty target.",
hints=["Choose an empty target directory or remove existing files first."],
)
def extract_archive(archive_path: Path, target: Path) -> Path:
"""Extract *archive_path* into *target* with path traversal protection.
Every member is validated before extraction. On Python 3.12+ the
``filter="data"`` parameter provides additional defense-in-depth.
"""
ensure_extract_target_empty(target)
target = target.resolve()
target.mkdir(parents=True, exist_ok=True)
with open_tar(archive_path) as tf:
members = tf.getmembers()
for member in members:
validate_tar_member(member, target)
if hasattr(tarfile, "data_filter"):
tf.extractall(path=target, members=members, filter="data")
else:
tf.extractall(path=target, members=members)
return target
def parse_lockfile_packages(lockfile_path: Path) -> list[dict]:
"""Parse the ``packages`` list from a conda lockfile."""
data = load_yaml(lockfile_path)
return data.get("packages", []) or []
def url_to_filename(url: str) -> str:
"""Extract the filename from a conda package URL."""
filename = Path(urlsplit(url).path).name
if not filename or not filename.endswith(CONDA_PACKAGE_SUFFIXES):
raise ArchiveError(
f"Cannot determine conda package filename from URL: {url}",
hints=[
"Expected package URLs to end in .conda or .tar.bz2.",
"Regenerate conda.lock and retry the archive command.",
],
)
return filename
def collect_bundle_packages(
lockfile_path: Path,
cache_dirs: list[Path],
) -> list[Path]:
"""Locate conda packages referenced by the lockfile in local caches.
Raises :class:`ArchiveError` if any package is missing from all caches.
"""
packages_data = parse_lockfile_packages(lockfile_path)
result: list[Path] = []
seen: dict[str, str | None] = {}
for pkg in packages_data:
url = pkg.get("conda") or pkg.get("url", "")
if not url:
continue
filename = url_to_filename(url)
sha256 = pkg.get("sha256")
fingerprint = str(sha256) if sha256 is not None else None
if filename in seen:
previous = seen[filename]
if previous is None or fingerprint is None or previous != fingerprint:
raise ArchiveError(
f"Package filename collision in lockfile: {filename}",
hints=[
"The archive bundle stores package archives by filename.",
"Regenerate the lockfile or remove one of the colliding"
" packages before bundling.",
],
)
continue
seen[filename] = fingerprint
found = False
for cache_dir in cache_dirs:
candidate = cache_dir / filename
if candidate.is_file():
result.append(candidate)
found = True
break
if not found:
raise ArchiveError(
f"Package '{filename}' not found in cache.",
hints=[
"Run 'conda workspace install' to populate the package cache,",
"then retry the archive command.",
],
)
return sorted(result, key=lambda p: p.name)
def build_hash_index(lockfile_path: Path) -> dict[str, str]:
"""Build a filename-to-SHA256 mapping from lockfile package entries."""
packages_data = parse_lockfile_packages(lockfile_path)
index: dict[str, str] = {}
for pkg in packages_data:
url = pkg.get("conda") or pkg.get("url", "")
sha256 = pkg.get("sha256")
if url and sha256 is not None:
index[url_to_filename(url)] = str(sha256)
return index
def file_sha256(path: Path) -> str:
"""Return the hex SHA-256 digest of *path* without reading it all at once."""
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def verify_package_hashes(
packages: list[Path],
lockfile_path: Path,
) -> None:
"""Verify SHA256 hashes of *packages* against the lockfile.
Raises :class:`ArchiveHashMismatchError` on the first mismatch.
"""
expected = build_hash_index(lockfile_path)
for pkg_path in packages:
exp_hash = expected.get(pkg_path.name)
if not exp_hash:
raise ArchiveError(
f"Cannot verify bundled package '{pkg_path.name}'.",
hints=[
"No SHA256 entry for this package was found in conda.lock.",
"Regenerate conda.lock with a current conda-workspaces version"
" before bundling or priming package caches.",
],
)
actual_hash = file_sha256(pkg_path)
if actual_hash != exp_hash:
raise ArchiveHashMismatchError(
pkg_path.name, expected=exp_hash, actual=actual_hash
)
def prime_package_cache(
extracted_dir: Path,
cache_dir: Path,
*,
verified: bool = False,
) -> int:
"""Copy bundled packages from an extracted archive into the conda cache.
Only copies packages after the archive has been verified by an
external integrity record. Package SHA256 hashes are still verified
against the extracted lockfile before copying.
Returns the number of packages added to the cache.
"""
packages_dir = extracted_dir / "packages"
if not packages_dir.is_dir():
return 0
packages = sorted(
path
for suffix in CONDA_PACKAGE_SUFFIXES
for path in packages_dir.glob(f"*{suffix}")
)
if not packages:
return 0
if not verified:
raise ArchiveError(
"Cannot prime package cache from unverified archive packages.",
hints=[
"Verify the archive with an external receipt before cache priming.",
"Use 'conda workspace unarchive --receipt ...' or extract without"
" cache priming.",
],
)
lockfile = extracted_dir / "conda.lock"
if not lockfile.is_file():
raise ArchiveError(
"Cannot prime package cache: bundled packages require conda.lock.",
hints=[
"Extract the archive without cache priming using --no-install,",
"or rebuild the archive with its lockfile included.",
],
)
verify_package_hashes(packages, lockfile)
cache_dir.mkdir(parents=True, exist_ok=True)
count = 0
for pkg in packages:
dest = cache_dir / pkg.name
if not dest.exists():
shutil.copy2(pkg, dest)
count += 1
return count
def inspect_archive(archive_path: Path) -> dict[str, object]:
"""Return metadata about an archive without extracting it."""
with open_tar(archive_path) as tf:
names = set(tf.getnames())
package_members = [
n
for n in names
if n.startswith("packages/") and n.endswith(CONDA_PACKAGE_SUFFIXES)
]
return {
"has_manifest": bool(names & MANIFEST_FILENAMES),
"has_lockfile": "conda.lock" in names,
"has_packages": len(package_members) > 0,
"package_count": len(package_members),
}