refactor secrets & facts -> secret_facts & public_facts

This commit is contained in:
lassulus
2024-03-23 05:05:31 +01:00
parent ddc28f53df
commit f16667e25a
26 changed files with 116 additions and 154 deletions

View File

@@ -2,7 +2,9 @@
import argparse
from .check import register_check_parser
from .generate import register_generate_parser
from .list import register_list_parser
from .upload import register_upload_parser
# takes a (sub)parser and configures it
@@ -19,3 +21,11 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
list_parser = subparser.add_parser("list", help="list all facts")
register_list_parser(list_parser)
parser_generate = subparser.add_parser(
"generate", help="generate secrets for machines if they don't exist yet"
)
register_generate_parser(parser_generate)
parser_upload = subparser.add_parser("upload", help="upload secrets for machines")
register_upload_parser(parser_upload)

View File

@@ -7,32 +7,55 @@ from ..machines.machines import Machine
log = logging.getLogger(__name__)
def check_facts(machine: Machine) -> bool:
facts_module = importlib.import_module(machine.facts_module)
fact_store = facts_module.FactStore(machine=machine)
def check_secrets(machine: Machine, service: None | str = None) -> bool:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
public_facts_module = importlib.import_module(machine.public_facts_module)
public_facts_store = public_facts_module.FactStore(machine=machine)
existing_facts = fact_store.get_all()
missing_facts = []
for service in machine.secrets_data:
for fact in machine.secrets_data[service]["facts"]:
if fact not in existing_facts.get(service, {}):
log.info(f"Fact {fact} for service {service} is missing")
missing_facts.append((service, fact))
missing_secret_facts = []
missing_public_facts = []
if service:
services = [service]
else:
services = list(machine.secrets_data.keys())
for service in services:
for secret_fact in machine.secrets_data[service]["secrets"]:
if isinstance(secret_fact, str):
secret_name = secret_fact
else:
secret_name = secret_fact["name"]
if not secret_facts_store.exists(service, secret_name):
log.info(f"Secret fact {secret_fact} for service {service} is missing")
missing_secret_facts.append((service, secret_name))
if missing_facts:
for public_fact in machine.secrets_data[service]["facts"]:
if not public_facts_store.exists(service, public_fact):
log.info(f"public Fact {public_fact} for service {service} is missing")
missing_public_facts.append((service, public_fact))
log.debug(f"missing_secret_facts: {missing_secret_facts}")
log.debug(f"missing_public_facts: {missing_public_facts}")
if missing_secret_facts or missing_public_facts:
return False
return True
def check_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake=args.flake)
if check_facts(machine):
print("All facts are present")
machine = Machine(
name=args.machine,
flake=args.flake,
)
check_secrets(machine, service=args.service)
def register_check_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"machine",
help="The machine to check facts for",
help="The machine to check secrets for",
)
parser.add_argument(
"--service",
help="the service to check",
)
parser.set_defaults(func=check_command)

View File

@@ -0,0 +1,173 @@
import argparse
import importlib
import logging
import os
import subprocess
from collections.abc import Callable
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_cli.cmd import run
from ..errors import ClanError
from ..git import commit_files
from ..machines.machines import Machine
from ..nix import nix_shell
from .check import check_secrets
from .public_modules import FactStoreBase
from .secret_modules import SecretStoreBase
log = logging.getLogger(__name__)
def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
"""
Read multi-line input from stdin.
"""
print(prompt, flush=True)
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True)
return proc.stdout
def generate_service_facts(
machine: Machine,
service: str,
secret_facts_store: SecretStoreBase,
public_facts_store: FactStoreBase,
tmpdir: Path,
prompt: Callable[[str], str],
) -> None:
service_dir = tmpdir / service
# check if all secrets exist and generate them if at least one is missing
needs_regeneration = not check_secrets(machine, service=service)
log.debug(f"{service} needs_regeneration: {needs_regeneration}")
if needs_regeneration:
if not isinstance(machine.flake, Path):
msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes"
env = os.environ.copy()
facts_dir = service_dir / "facts"
facts_dir.mkdir(parents=True)
env["facts"] = str(facts_dir)
secrets_dir = service_dir / "secrets"
secrets_dir.mkdir(parents=True)
env["secrets"] = str(secrets_dir)
# compatibility for old outputs.nix users
if isinstance(machine.secrets_data[service]["generator"], str):
generator = machine.secrets_data[service]["generator"]
else:
generator = machine.secrets_data[service]["generator"]["finalScript"]
if machine.secrets_data[service]["generator"]["prompt"]:
prompt_value = prompt(
machine.secrets_data[service]["generator"]["prompt"]
)
env["prompt_value"] = prompt_value
# fmt: off
cmd = nix_shell(
[
"nixpkgs#bash",
"nixpkgs#bubblewrap",
],
[
"bwrap",
"--ro-bind", "/nix/store", "/nix/store",
"--tmpfs", "/usr/lib/systemd",
"--dev", "/dev",
"--bind", str(facts_dir), str(facts_dir),
"--bind", str(secrets_dir), str(secrets_dir),
"--unshare-all",
"--unshare-user",
"--uid", "1000",
"--",
"bash", "-c", generator
],
)
# fmt: on
run(
cmd,
env=env,
)
files_to_commit = []
# store secrets
for secret in machine.secrets_data[service]["secrets"]:
if isinstance(secret, str):
# TODO: This is the old NixOS module, can be dropped everyone has updated.
secret_name = secret
groups = []
else:
secret_name = secret["name"]
groups = secret.get("groups", [])
secret_file = secrets_dir / secret_name
if not secret_file.is_file():
msg = f"did not generate a file for '{secret_name}' when running the following command:\n"
msg += generator
raise ClanError(msg)
secret_path = secret_facts_store.set(
service, secret_name, secret_file.read_bytes(), groups
)
if secret_path:
files_to_commit.append(secret_path)
# store facts
for name in machine.secrets_data[service]["facts"]:
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += machine.secrets_data[service]["generator"]
raise ClanError(msg)
fact_file = public_facts_store.set(service, name, fact_file.read_bytes())
if fact_file:
files_to_commit.append(fact_file)
commit_files(
files_to_commit,
machine.flake_dir,
f"Update facts/secrets for service {service} in machine {machine.name}",
)
def generate_facts(
machine: Machine,
prompt: None | Callable[[str], str] = None,
) -> None:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
public_facts_module = importlib.import_module(machine.public_facts_module)
public_facts_store = public_facts_module.FactStore(machine=machine)
if prompt is None:
def prompt_func(text: str) -> str:
print(f"{text}: ")
return read_multiline_input()
prompt = prompt_func
with TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
for service in machine.secrets_data:
generate_service_facts(
machine=machine,
service=service,
secret_facts_store=secret_facts_store,
public_facts_store=public_facts_store,
tmpdir=tmpdir,
prompt=prompt,
)
print("successfully generated secrets")
def generate_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake=args.flake)
generate_facts(machine)
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"machine",
help="The machine to generate facts for",
)
parser.set_defaults(func=generate_command)

View File

@@ -8,9 +8,10 @@ from ..machines.machines import Machine
log = logging.getLogger(__name__)
# TODO get also secret facts
def get_all_facts(machine: Machine) -> dict:
facts_module = importlib.import_module(machine.facts_module)
fact_store = facts_module.FactStore(machine=machine)
public_facts_module = importlib.import_module(machine.public_facts_module)
public_facts_store = public_facts_module.FactStore(machine=machine)
# for service in machine.secrets_data:
# facts[service] = {}
@@ -20,7 +21,7 @@ def get_all_facts(machine: Machine) -> dict:
# facts[service][fact] = fact_content.decode()
# else:
# log.error(f"Fact {fact} for service {service} is missing")
return fact_store.get_all()
return public_facts_store.get_all()
def get_command(args: argparse.Namespace) -> None:

View File

@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from pathlib import Path
from clan_cli.machines.machines import Machine
class SecretStoreBase(ABC):
@abstractmethod
def __init__(self, machine: Machine) -> None:
pass
@abstractmethod
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
pass
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
def update_check(self) -> bool:
return False
@abstractmethod
def upload(self, output_dir: Path) -> None:
pass

View File

@@ -0,0 +1,117 @@
import os
import subprocess
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value,
check=True,
)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
# FIXME: add this when we switch to python3.12
# @override
def update_check(self) -> bool:
local_hash = self.generate_hash()
remote_hash = self.machine.target_host.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash.decode() == remote_hash
def upload(self, output_dir: Path) -> None:
for service in self.machine.secrets_data:
for secret in self.machine.secrets_data[service]["secrets"]:
if isinstance(secret, dict):
secret_name = secret["name"]
else:
# TODO: drop old format soon
secret_name = secret
(output_dir / secret_name).write_bytes(self.get(service, secret_name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -0,0 +1,63 @@
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "secrets_data"):
return
if not self.machine.secrets_data:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
path = (
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}"
)
encrypt_secret(
self.machine.flake_dir,
path,
value,
add_machines=[self.machine.name],
add_groups=groups,
)
return path
def get(self, service: str, name: str) -> bytes:
raise NotImplementedError()
def exists(self, service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
(output_dir / "key.txt").write_text(key)

View File

@@ -0,0 +1,35 @@
import os
import shutil
from pathlib import Path
from clan_cli.dirs import vm_state_dir
from clan_cli.machines.machines import Machine
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.dir = vm_state_dir(str(machine.flake), machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
secret_file = self.dir / service / name
secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
secret_file = self.dir / service / name
return secret_file.read_bytes()
def exists(self, service: str, name: str) -> bool:
return (self.dir / service / name).exists()
def upload(self, output_dir: Path) -> None:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
shutil.copytree(self.dir, output_dir)

View File

@@ -0,0 +1,53 @@
import argparse
import importlib
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from ..cmd import Log, run
from ..machines.machines import Machine
from ..nix import nix_shell
log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
if secret_facts_store.update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
secret_facts_store.upload(Path(tempdir))
host = machine.target_host
ssh_cmd = host.ssh_cmd()
run(
nix_shell(
["nixpkgs#rsync"],
[
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
"-az",
"--delete",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
],
),
log=Log.BOTH,
)
def upload_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake=args.flake)
upload_secrets(machine)
def register_upload_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"machine",
help="The machine to upload secrets to",
)
parser.set_defaults(func=upload_command)