diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py index 071729681..2358d838e 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py @@ -1,6 +1,7 @@ import io import logging import tarfile +import subprocess from collections.abc import Iterable from pathlib import Path from tempfile import TemporaryDirectory @@ -8,7 +9,6 @@ from tempfile import TemporaryDirectory from clan_cli.ssh.upload import upload from clan_cli.vars._types import StoreBase from clan_cli.vars.generate import Generator, Var -from clan_lib.cmd import CmdOut, Log, RunOpts, run from clan_lib.machines.machines import Machine from clan_lib.ssh.remote import Remote @@ -33,13 +33,11 @@ class SecretStore(StoreBase): def store_dir(self) -> Path: """Get the password store directory, cached after first access.""" if self._store_dir is None: - result = self._run_pass( - "git", "rev-parse", "--show-toplevel", options=RunOpts(check=False) - ) + result = self._run_pass("git", "rev-parse", "--show-toplevel", check=False) if result.returncode != 0: msg = "Password store must be a git repository" raise ValueError(msg) - self._store_dir = Path(result.stdout.strip()) + self._store_dir = Path(result.stdout.strip().decode()) return self._store_dir @property @@ -79,9 +77,20 @@ class SecretStore(StoreBase): def entry_dir(self, generator: Generator, name: str) -> Path: return Path(self.entry_prefix) / self.rel_dir(generator, name) - def _run_pass(self, *args: str, options: RunOpts | None = None) -> CmdOut: + def _run_pass( + self, *args: str, input: bytes | None = None, check: bool = True + ) -> subprocess.CompletedProcess[bytes]: cmd = [self._pass_command, *args] - return run(cmd, options) + # We need bytes support here, so we can not use clan cmd. + # If you change this to run( add bytes support to it first! + # otherwise we mangle binary secrets (which is annoying to debug) + return subprocess.run( + cmd, + input=input, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=check, + ) def _set( self, @@ -90,12 +99,12 @@ class SecretStore(StoreBase): value: bytes, ) -> Path | None: pass_call = ["insert", "-m", str(self.entry_dir(generator, var.name))] - self._run_pass(*pass_call, options=RunOpts(input=value, check=True)) + self._run_pass(*pass_call, input=value, check=True) return None # we manage the files outside of the git repo def get(self, generator: Generator, name: str) -> bytes: pass_name = str(self.entry_dir(generator, name)) - return self._run_pass("show", pass_name).stdout.encode() + return self._run_pass("show", pass_name).stdout def exists(self, generator: Generator, name: str) -> bool: pass_name = str(self.entry_dir(generator, name)) @@ -106,33 +115,22 @@ class SecretStore(StoreBase): def delete(self, generator: Generator, name: str) -> Iterable[Path]: pass_name = str(self.entry_dir(generator, name)) - self._run_pass("rm", "--force", pass_name, options=RunOpts(check=True)) + self._run_pass("rm", "--force", pass_name, check=True) return [] def delete_store(self) -> Iterable[Path]: machine_dir = Path(self.entry_prefix) / "per-machine" / self.machine.name # Check if the directory exists in the password store before trying to delete - result = self._run_pass("ls", str(machine_dir), options=RunOpts(check=False)) + result = self._run_pass("ls", str(machine_dir), check=False) if result.returncode == 0: - self._run_pass( - "rm", - "--force", - "--recursive", - str(machine_dir), - options=RunOpts(check=True), - ) + self._run_pass("rm", "--force", "--recursive", str(machine_dir), check=True) return [] def generate_hash(self) -> bytes: result = self._run_pass( - "git", - "log", - "-1", - "--format=%H", - self.entry_prefix, - options=RunOpts(check=False), + "git", "log", "-1", "--format=%H", self.entry_prefix, check=False ) - git_hash = result.stdout.strip().encode() + git_hash = result.stdout.strip() if not git_hash: return b"" @@ -155,6 +153,8 @@ class SecretStore(StoreBase): if not local_hash: return True + from clan_lib.cmd import RunOpts, Log + remote_hash = host.run( [ "cat", @@ -166,7 +166,7 @@ class SecretStore(StoreBase): if not remote_hash: return True - return local_hash.decode() != remote_hash + return local_hash != remote_hash.encode() def populate_dir(self, output_dir: Path, phases: list[str]) -> None: from clan_cli.vars.generate import Generator