move secret stores into clan_cli codebase

This commit is contained in:
lassulus
2024-01-17 18:00:30 +01:00
parent 8338944062
commit aee0ee4d5e
14 changed files with 135 additions and 250 deletions

View File

@@ -1,9 +1,8 @@
import argparse
import importlib
import logging
import os
import shutil
import types
from importlib.machinery import SourceFileLoader
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -17,10 +16,7 @@ log = logging.getLogger(__name__)
def generate_secrets(machine: Machine) -> None:
# load secrets module from file
loader = SourceFileLoader("secret_module", machine.secrets_module)
secrets_module = types.ModuleType(loader.name)
loader.exec_module(secrets_module)
secrets_module = importlib.import_module(machine.secrets_module)
secret_store = secrets_module.SecretStore(machine=machine)
with TemporaryDirectory() as d:

View File

@@ -0,0 +1,106 @@
import os
import subprocess
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(self, service: str, name: str, value: str) -> None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value.encode("utf-8"),
check=True,
)
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
print(f"checking {secret_path}")
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
def update_check(self) -> bool:
local_hash = self.generate_hash()
remote_hash = self.machine.host.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash.decode() == remote_hash
def upload(self, output_dir: Path) -> None:
for service in self.machine.secrets_data:
for secret in self.machine.secrets_data[service]["secrets"]:
(output_dir / secret).write_bytes(self.get(service, secret))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -0,0 +1,54 @@
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "secrets_data"):
return
if not self.machine.secrets_data:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(self, _service: str, name: str, value: str) -> None:
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
value,
add_machines=[self.machine.name],
)
def get(self, _service: str, _name: str) -> bytes:
raise NotImplementedError()
def exists(self, _service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
(output_dir / "key.txt").write_text(key)

View File

@@ -1,7 +1,6 @@
import argparse
import importlib
import logging
import types
from importlib.machinery import SourceFileLoader
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -13,10 +12,7 @@ log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None:
# load secrets module from file
loader = SourceFileLoader("secret_module", machine.secrets_module)
secrets_module = types.ModuleType(loader.name)
loader.exec_module(secrets_module)
secrets_module = importlib.import_module(machine.secrets_module)
secret_store = secrets_module.SecretStore(machine=machine)
update_check = getattr(secret_store, "update_check", None)
@@ -25,11 +21,7 @@ def upload_secrets(machine: Machine) -> None:
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
secrets = []
for service in machine.secrets_data:
for secret in machine.secrets_data[service]["secrets"]:
secrets.append((service, secret))
secret_store.upload(Path(tempdir), secrets)
secret_store.upload(Path(tempdir))
host = machine.host
ssh_cmd = host.ssh_cmd()