Merge pull request 'vars password-store: fix secret mangling due to string encoding' (#4227) from pass-fix-bytes into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4227
This commit is contained in:
lassulus
2025-07-07 00:50:58 +00:00

View File

@@ -1,6 +1,7 @@
import io
import logging
import tarfile
import subprocess
from collections.abc import Iterable
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -8,7 +9,6 @@ from tempfile import TemporaryDirectory
from clan_cli.ssh.upload import upload
from clan_cli.vars._types import StoreBase
from clan_cli.vars.generate import Generator, Var
from clan_lib.cmd import CmdOut, Log, RunOpts, run
from clan_lib.machines.machines import Machine
from clan_lib.ssh.remote import Remote
@@ -33,13 +33,11 @@ class SecretStore(StoreBase):
def store_dir(self) -> Path:
"""Get the password store directory, cached after first access."""
if self._store_dir is None:
result = self._run_pass(
"git", "rev-parse", "--show-toplevel", options=RunOpts(check=False)
)
result = self._run_pass("git", "rev-parse", "--show-toplevel", check=False)
if result.returncode != 0:
msg = "Password store must be a git repository"
raise ValueError(msg)
self._store_dir = Path(result.stdout.strip())
self._store_dir = Path(result.stdout.strip().decode())
return self._store_dir
@property
@@ -79,9 +77,20 @@ class SecretStore(StoreBase):
def entry_dir(self, generator: Generator, name: str) -> Path:
return Path(self.entry_prefix) / self.rel_dir(generator, name)
def _run_pass(self, *args: str, options: RunOpts | None = None) -> CmdOut:
def _run_pass(
self, *args: str, input: bytes | None = None, check: bool = True
) -> subprocess.CompletedProcess[bytes]:
cmd = [self._pass_command, *args]
return run(cmd, options)
# We need bytes support here, so we can not use clan cmd.
# If you change this to run( add bytes support to it first!
# otherwise we mangle binary secrets (which is annoying to debug)
return subprocess.run(
cmd,
input=input,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=check,
)
def _set(
self,
@@ -90,12 +99,12 @@ class SecretStore(StoreBase):
value: bytes,
) -> Path | None:
pass_call = ["insert", "-m", str(self.entry_dir(generator, var.name))]
self._run_pass(*pass_call, options=RunOpts(input=value, check=True))
self._run_pass(*pass_call, input=value, check=True)
return None # we manage the files outside of the git repo
def get(self, generator: Generator, name: str) -> bytes:
pass_name = str(self.entry_dir(generator, name))
return self._run_pass("show", pass_name).stdout.encode()
return self._run_pass("show", pass_name).stdout
def exists(self, generator: Generator, name: str) -> bool:
pass_name = str(self.entry_dir(generator, name))
@@ -106,33 +115,22 @@ class SecretStore(StoreBase):
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
pass_name = str(self.entry_dir(generator, name))
self._run_pass("rm", "--force", pass_name, options=RunOpts(check=True))
self._run_pass("rm", "--force", pass_name, check=True)
return []
def delete_store(self) -> Iterable[Path]:
machine_dir = Path(self.entry_prefix) / "per-machine" / self.machine.name
# Check if the directory exists in the password store before trying to delete
result = self._run_pass("ls", str(machine_dir), options=RunOpts(check=False))
result = self._run_pass("ls", str(machine_dir), check=False)
if result.returncode == 0:
self._run_pass(
"rm",
"--force",
"--recursive",
str(machine_dir),
options=RunOpts(check=True),
)
self._run_pass("rm", "--force", "--recursive", str(machine_dir), check=True)
return []
def generate_hash(self) -> bytes:
result = self._run_pass(
"git",
"log",
"-1",
"--format=%H",
self.entry_prefix,
options=RunOpts(check=False),
"git", "log", "-1", "--format=%H", self.entry_prefix, check=False
)
git_hash = result.stdout.strip().encode()
git_hash = result.stdout.strip()
if not git_hash:
return b""
@@ -155,6 +153,8 @@ class SecretStore(StoreBase):
if not local_hash:
return True
from clan_lib.cmd import RunOpts, Log
remote_hash = host.run(
[
"cat",
@@ -166,7 +166,7 @@ class SecretStore(StoreBase):
if not remote_hash:
return True
return local_hash.decode() != remote_hash
return local_hash != remote_hash.encode()
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
from clan_cli.vars.generate import Generator