Source code for conda_project.utils

# Copyright (C) 2022-2024 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause

# Copyright (C) 2022 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause

import itertools
import os
import platform
import sys
import threading
import time
from collections import ChainMap
from collections.abc import Generator
from contextlib import contextmanager
from inspect import Traceback
from itertools import groupby
from pathlib import Path
from subprocess import Popen
from typing import Callable, Dict, List, NoReturn, Optional, Type, Union

import shellingham
from dotenv import dotenv_values

from .exceptions import CondaProjectError


[docs] @contextmanager def env_variable(key: str, value: str) -> Generator: """Temporarily set environment variable in a context manager.""" old = os.environ.get(key, None) os.environ[key] = value yield if old is None: os.environ.pop(key, None) else: os.environ[key] = old
[docs] class Spinner: """Multithreaded CLI spinner context manager Attributes: prefix: Text to display at the start of the line Args: prefix: Text to display at the start of the line """ def __init__(self, prefix: str): self.prefix = prefix self._event = threading.Event() self._thread = threading.Thread(target=self._spin) def _spin(self) -> None: spinner = itertools.cycle(["◜", "◠", "◝", "◞", "◡", "◟"]) while not self._event.is_set(): sys.stdout.write("\r") sys.stdout.write("\033[K") sys.stdout.write(f"{self.prefix}: {next(spinner)} ") sys.stdout.flush() time.sleep(0.10)
[docs] def start(self) -> None: self._thread.start()
[docs] def stop(self) -> None: self._event.set() self._thread.join() sys.stdout.write("\r") sys.stdout.write("\033[K") sys.stdout.write(f"{self.prefix}: done\n") sys.stdout.flush()
def __enter__(self) -> None: self.start() def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], exc_tb: Optional[Traceback], ) -> None: self.stop()
[docs] def find_file(directory: Path, options: tuple) -> Optional[Path]: """Search for a file in a directory from a tuple of variants. Returns: The path to the file if found else None Raises: CondaProjectError if more than one of the options is found """ found = [] for filename in options: path = directory / filename if path.exists(): found.append(path.resolve()) if len(found) == 1: return found[0] elif len(found) > 1: _found_files = "\n".join([str(p) for p in found]) raise CondaProjectError( f"Multiple variants of the same file were found.\n{_found_files}\nConsider using one of them." ) else: return None
[docs] def merge_dicts(*dicts): return dict(ChainMap(*reversed(dicts)))
[docs] def prepare_variables(project_directory: Path, *variable_dicts) -> Dict[str, str]: variables = [{} if vars is None else vars for vars in variable_dicts] dotenv = dotenv_values(project_directory / ".env") env = merge_dicts(*variables, dotenv, os.environ) missing_vars = [k for k, v in env.items() if v is None] if missing_vars: errs = "\n".join(missing_vars) msg = ( "The following variables do not have a default value and values\n" "were not provided in the .env file or set on the command line" f" when executing 'conda project run':\n{errs}" ) raise CondaProjectError(msg) return env
[docs] def is_windows(): return platform.system() == "Windows"
[docs] def execvped( file: Union[Path, str], args: List[str], env: Dict[str, str], cwd: Union[Path, str] ) -> NoReturn: """A cross-platform os.execvpe - like executor The goal is the be able to launch a command in a working directory, with environment variables and exit to the shell with the return code of the command and ensure that on error the previous working directory is restored. The "d" in the function name refers to the requirement that the working directory (cwd) flag be used. """ sys.stdout.flush() sys.stderr.flush() if is_windows(): sys.exit(Popen(args=[file, *args], env=env, cwd=cwd).wait()) else: old_dir = Path.cwd() try: os.chdir(cwd) os.execvpe(file, args, env) finally: os.chdir(old_dir)
[docs] def detect_shell(): try: shell_name, shell_path = shellingham.detect_shell() except shellingham.ShellDetectionFailure: if os.name == "posix": shell_name = shell_path = os.environ.get("SHELL", "/bin/sh") elif os.name == "nt": shell_name = shell_path = os.environ.get("COMSPEC", "cmd.exe") else: raise RuntimeError( "Could not determine an appropriate shell to activate for your OS." ) return shell_name, shell_path
[docs] def dedupe_list_of_dicts(data: list, key: Callable, keep: Callable) -> list: deduped = [] for _, g in groupby(sorted(data, key=key), key=key): values = list(g) if len(values) > 1: deduped.extend(list(filter(keep, values))) else: deduped.extend(values) return deduped
[docs] def get_envs_paths() -> List[Path]: specified_path = os.environ.get("CONDA_PROJECT_ENVS_PATH", "") env_paths = specified_path.split(os.pathsep) if specified_path else [] expanded_paths = [Path(os.path.expandvars(path)) for path in env_paths] return expanded_paths