WIP: vars: copy python code from facts
This commit is contained in:
132
pkgs/clan-cli/clan_cli/vars/__init__.py
Normal file
132
pkgs/clan-cli/clan_cli/vars/__init__.py
Normal file
@@ -0,0 +1,132 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
from ..hyperlink import help_hyperlink
|
||||
from .check import register_check_parser
|
||||
from .generate import register_generate_parser
|
||||
from .list import register_list_parser
|
||||
from .upload import register_upload_parser
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
subparser = parser.add_subparsers(
|
||||
title="command",
|
||||
description="the command to run",
|
||||
help="the command to run",
|
||||
required=True,
|
||||
)
|
||||
|
||||
check_parser = subparser.add_parser(
|
||||
"check",
|
||||
help="check if facts are up to date",
|
||||
epilog=(
|
||||
f"""
|
||||
This subcommand allows checking if all facts are up to date.
|
||||
|
||||
Examples:
|
||||
|
||||
$ clan facts check [MACHINE]
|
||||
Will check facts for the specified machine.
|
||||
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
)
|
||||
register_check_parser(check_parser)
|
||||
|
||||
list_parser = subparser.add_parser(
|
||||
"list",
|
||||
help="list all facts",
|
||||
epilog=(
|
||||
f"""
|
||||
This subcommand allows listing all public facts for a specific machine.
|
||||
|
||||
The resulting list will be a json string with the name of the fact as its key
|
||||
and the fact itself as it's value.
|
||||
|
||||
This is how an example output might look like:
|
||||
```
|
||||
\u007b
|
||||
"[FACT_NAME]": "[FACT]"
|
||||
\u007d
|
||||
```
|
||||
|
||||
Examples:
|
||||
|
||||
$ clan facts list [MACHINE]
|
||||
Will list facts for the specified machine.
|
||||
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
)
|
||||
register_list_parser(list_parser)
|
||||
|
||||
parser_generate = subparser.add_parser(
|
||||
"generate",
|
||||
help="generate public and secret facts for machines",
|
||||
epilog=(
|
||||
f"""
|
||||
This subcommand allows control of the generation of facts.
|
||||
Often this function will be invoked automatically on deploying machines,
|
||||
but there are situations the user may want to have more granular control,
|
||||
especially for the regeneration of certain services.
|
||||
|
||||
A service is an included clan-module that implements facts generation functionality.
|
||||
For example the zerotier module will generate private and public facts.
|
||||
In this case the public fact will be the resulting zerotier-ip of the machine.
|
||||
The secret fact will be the zerotier-identity-secret, which is used by zerotier
|
||||
to prove the machine has control of the zerotier-ip.
|
||||
|
||||
|
||||
Examples:
|
||||
|
||||
$ clan facts generate
|
||||
Will generate facts for all machines.
|
||||
|
||||
$ clan facts generate [MACHINE]
|
||||
Will generate facts for the specified machine.
|
||||
|
||||
$ clan facts generate [MACHINE] --service [SERVICE]
|
||||
Will generate facts for the specified machine for the specified service.
|
||||
|
||||
$ clan facts generate --service [SERVICE] --regenerate
|
||||
Will regenerate facts, if they are already generated for a specific service.
|
||||
This is especially useful for resetting certain passwords while leaving the rest
|
||||
of the facts for a machine in place.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
)
|
||||
register_generate_parser(parser_generate)
|
||||
|
||||
parser_upload = subparser.add_parser(
|
||||
"upload",
|
||||
help="upload secrets for machines",
|
||||
epilog=(
|
||||
f"""
|
||||
This subcommand allows uploading secrets to remote machines.
|
||||
|
||||
If using sops as a secret backend it will upload the private key to the machine.
|
||||
If using password store it uploads all the secrets you manage to the machine.
|
||||
|
||||
The default backend is sops.
|
||||
|
||||
Examples:
|
||||
|
||||
$ clan facts upload [MACHINE]
|
||||
Will upload secrets to a specific machine.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
)
|
||||
register_upload_parser(parser_upload)
|
||||
68
pkgs/clan-cli/clan_cli/vars/check.py
Normal file
68
pkgs/clan-cli/clan_cli/vars/check.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import argparse
|
||||
import importlib
|
||||
import logging
|
||||
|
||||
from ..completions import add_dynamic_completer, complete_machines
|
||||
from ..machines.machines import Machine
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def check_secrets(machine: Machine, service: None | str = None) -> bool:
|
||||
secret_facts_module = importlib.import_module(machine.secret_facts_module)
|
||||
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
|
||||
public_facts_module = importlib.import_module(machine.public_facts_module)
|
||||
public_facts_store = public_facts_module.FactStore(machine=machine)
|
||||
|
||||
missing_secret_facts = []
|
||||
missing_public_facts = []
|
||||
if service:
|
||||
services = [service]
|
||||
else:
|
||||
services = list(machine.facts_data.keys())
|
||||
for service in services:
|
||||
for secret_fact in machine.facts_data[service]["secret"]:
|
||||
if isinstance(secret_fact, str):
|
||||
secret_name = secret_fact
|
||||
else:
|
||||
secret_name = secret_fact["name"]
|
||||
if not secret_facts_store.exists(service, secret_name):
|
||||
log.info(
|
||||
f"Secret fact '{secret_fact}' for service '{service}' in machine {machine.name} is missing."
|
||||
)
|
||||
missing_secret_facts.append((service, secret_name))
|
||||
|
||||
for public_fact in machine.facts_data[service]["public"]:
|
||||
if not public_facts_store.exists(service, public_fact):
|
||||
log.info(
|
||||
f"Public fact '{public_fact}' for service '{service}' in machine {machine.name} is missing."
|
||||
)
|
||||
missing_public_facts.append((service, public_fact))
|
||||
|
||||
log.debug(f"missing_secret_facts: {missing_secret_facts}")
|
||||
log.debug(f"missing_public_facts: {missing_public_facts}")
|
||||
if missing_secret_facts or missing_public_facts:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def check_command(args: argparse.Namespace) -> None:
|
||||
machine = Machine(
|
||||
name=args.machine,
|
||||
flake=args.flake,
|
||||
)
|
||||
check_secrets(machine, service=args.service)
|
||||
|
||||
|
||||
def register_check_parser(parser: argparse.ArgumentParser) -> None:
|
||||
machines_parser = parser.add_argument(
|
||||
"machine",
|
||||
help="The machine to check secrets for",
|
||||
)
|
||||
add_dynamic_completer(machines_parser, complete_machines)
|
||||
|
||||
parser.add_argument(
|
||||
"--service",
|
||||
help="the service to check",
|
||||
)
|
||||
parser.set_defaults(func=check_command)
|
||||
257
pkgs/clan-cli/clan_cli/vars/generate.py
Normal file
257
pkgs/clan-cli/clan_cli/vars/generate.py
Normal file
@@ -0,0 +1,257 @@
|
||||
import argparse
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from clan_cli.cmd import run
|
||||
|
||||
from ..completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
complete_services_for_machine,
|
||||
)
|
||||
from ..errors import ClanError
|
||||
from ..git import commit_files
|
||||
from ..machines.inventory import get_all_machines, get_selected_machines
|
||||
from ..machines.machines import Machine
|
||||
from ..nix import nix_shell
|
||||
from .check import check_secrets
|
||||
from .public_modules import FactStoreBase
|
||||
from .secret_modules import SecretStoreBase
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
|
||||
"""
|
||||
Read multi-line input from stdin.
|
||||
"""
|
||||
print(prompt, flush=True)
|
||||
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True)
|
||||
log.info("Input received. Processing...")
|
||||
return proc.stdout
|
||||
|
||||
|
||||
def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]:
|
||||
# fmt: off
|
||||
return nix_shell(
|
||||
[
|
||||
"nixpkgs#bash",
|
||||
"nixpkgs#bubblewrap",
|
||||
],
|
||||
[
|
||||
"bwrap",
|
||||
"--ro-bind", "/nix/store", "/nix/store",
|
||||
"--tmpfs", "/usr/lib/systemd",
|
||||
"--dev", "/dev",
|
||||
"--bind", str(facts_dir), str(facts_dir),
|
||||
"--bind", str(secrets_dir), str(secrets_dir),
|
||||
"--unshare-all",
|
||||
"--unshare-user",
|
||||
"--uid", "1000",
|
||||
"--",
|
||||
"bash", "-c", generator
|
||||
],
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
def generate_service_facts(
|
||||
machine: Machine,
|
||||
service: str,
|
||||
regenerate: bool,
|
||||
secret_facts_store: SecretStoreBase,
|
||||
public_facts_store: FactStoreBase,
|
||||
tmpdir: Path,
|
||||
prompt: Callable[[str], str],
|
||||
) -> bool:
|
||||
service_dir = tmpdir / service
|
||||
# check if all secrets exist and generate them if at least one is missing
|
||||
needs_regeneration = not check_secrets(machine, service=service)
|
||||
log.debug(f"{service} needs_regeneration: {needs_regeneration}")
|
||||
if not (needs_regeneration or regenerate):
|
||||
return False
|
||||
if not isinstance(machine.flake, Path):
|
||||
msg = f"flake is not a Path: {machine.flake}"
|
||||
msg += "fact/secret generation is only supported for local flakes"
|
||||
|
||||
env = os.environ.copy()
|
||||
facts_dir = service_dir / "facts"
|
||||
facts_dir.mkdir(parents=True)
|
||||
env["facts"] = str(facts_dir)
|
||||
secrets_dir = service_dir / "secrets"
|
||||
secrets_dir.mkdir(parents=True)
|
||||
env["secrets"] = str(secrets_dir)
|
||||
# compatibility for old outputs.nix users
|
||||
if isinstance(machine.facts_data[service]["generator"], str):
|
||||
generator = machine.facts_data[service]["generator"]
|
||||
else:
|
||||
generator = machine.facts_data[service]["generator"]["finalScript"]
|
||||
if machine.facts_data[service]["generator"]["prompt"]:
|
||||
prompt_value = prompt(machine.facts_data[service]["generator"]["prompt"])
|
||||
env["prompt_value"] = prompt_value
|
||||
if sys.platform == "linux":
|
||||
cmd = bubblewrap_cmd(generator, facts_dir, secrets_dir)
|
||||
else:
|
||||
cmd = ["bash", "-c", generator]
|
||||
run(
|
||||
cmd,
|
||||
env=env,
|
||||
)
|
||||
files_to_commit = []
|
||||
# store secrets
|
||||
for secret in machine.facts_data[service]["secret"]:
|
||||
if isinstance(secret, str):
|
||||
# TODO: This is the old NixOS module, can be dropped everyone has updated.
|
||||
secret_name = secret
|
||||
groups = []
|
||||
else:
|
||||
secret_name = secret["name"]
|
||||
groups = secret.get("groups", [])
|
||||
|
||||
secret_file = secrets_dir / secret_name
|
||||
if not secret_file.is_file():
|
||||
msg = f"did not generate a file for '{secret_name}' when running the following command:\n"
|
||||
msg += generator
|
||||
raise ClanError(msg)
|
||||
secret_path = secret_facts_store.set(
|
||||
service, secret_name, secret_file.read_bytes(), groups
|
||||
)
|
||||
if secret_path:
|
||||
files_to_commit.append(secret_path)
|
||||
|
||||
# store facts
|
||||
for name in machine.facts_data[service]["public"]:
|
||||
fact_file = facts_dir / name
|
||||
if not fact_file.is_file():
|
||||
msg = f"did not generate a file for '{name}' when running the following command:\n"
|
||||
msg += machine.facts_data[service]["generator"]
|
||||
raise ClanError(msg)
|
||||
fact_file = public_facts_store.set(service, name, fact_file.read_bytes())
|
||||
if fact_file:
|
||||
files_to_commit.append(fact_file)
|
||||
commit_files(
|
||||
files_to_commit,
|
||||
machine.flake_dir,
|
||||
f"Update facts/secrets for service {service} in machine {machine.name}",
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def prompt_func(text: str) -> str:
|
||||
print(f"{text}: ")
|
||||
return read_multiline_input()
|
||||
|
||||
|
||||
def _generate_facts_for_machine(
|
||||
machine: Machine,
|
||||
service: str | None,
|
||||
regenerate: bool,
|
||||
tmpdir: Path,
|
||||
prompt: Callable[[str], str] = prompt_func,
|
||||
) -> bool:
|
||||
local_temp = tmpdir / machine.name
|
||||
local_temp.mkdir()
|
||||
secret_facts_module = importlib.import_module(machine.secret_facts_module)
|
||||
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
|
||||
|
||||
public_facts_module = importlib.import_module(machine.public_facts_module)
|
||||
public_facts_store = public_facts_module.FactStore(machine=machine)
|
||||
|
||||
machine_updated = False
|
||||
|
||||
if service and service not in machine.facts_data:
|
||||
services = list(machine.facts_data.keys())
|
||||
raise ClanError(
|
||||
f"Could not find service with name: {service}. The following services are available: {services}"
|
||||
)
|
||||
|
||||
if service:
|
||||
machine_service_facts = {service: machine.facts_data[service]}
|
||||
else:
|
||||
machine_service_facts = machine.facts_data
|
||||
|
||||
for service in machine_service_facts:
|
||||
machine_updated |= generate_service_facts(
|
||||
machine=machine,
|
||||
service=service,
|
||||
regenerate=regenerate,
|
||||
secret_facts_store=secret_facts_store,
|
||||
public_facts_store=public_facts_store,
|
||||
tmpdir=local_temp,
|
||||
prompt=prompt,
|
||||
)
|
||||
if machine_updated:
|
||||
# flush caches to make sure the new secrets are available in evaluation
|
||||
machine.flush_caches()
|
||||
return machine_updated
|
||||
|
||||
|
||||
def generate_facts(
|
||||
machines: list[Machine],
|
||||
service: str | None,
|
||||
regenerate: bool,
|
||||
prompt: Callable[[str], str] = prompt_func,
|
||||
) -> bool:
|
||||
was_regenerated = False
|
||||
with TemporaryDirectory() as tmp:
|
||||
tmpdir = Path(tmp)
|
||||
|
||||
for machine in machines:
|
||||
errors = 0
|
||||
try:
|
||||
was_regenerated |= _generate_facts_for_machine(
|
||||
machine, service, regenerate, tmpdir, prompt
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(f"Failed to generate facts for {machine.name}: {exc}")
|
||||
errors += 1
|
||||
if errors > 0:
|
||||
raise ClanError(
|
||||
f"Failed to generate facts for {errors} hosts. Check the logs above"
|
||||
)
|
||||
|
||||
if not was_regenerated:
|
||||
print("All secrets and facts are already up to date")
|
||||
return was_regenerated
|
||||
|
||||
|
||||
def generate_command(args: argparse.Namespace) -> None:
|
||||
if len(args.machines) == 0:
|
||||
machines = get_all_machines(args.flake, args.option)
|
||||
else:
|
||||
machines = get_selected_machines(args.flake, args.option, args.machines)
|
||||
generate_facts(machines, args.service, args.regenerate)
|
||||
|
||||
|
||||
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
||||
machines_parser = parser.add_argument(
|
||||
"machines",
|
||||
type=str,
|
||||
help="machine to generate facts for. if empty, generate facts for all machines",
|
||||
nargs="*",
|
||||
default=[],
|
||||
)
|
||||
add_dynamic_completer(machines_parser, complete_machines)
|
||||
|
||||
service_parser = parser.add_argument(
|
||||
"--service",
|
||||
type=str,
|
||||
help="service to generate facts for, if empty, generate facts for every service",
|
||||
default=None,
|
||||
)
|
||||
add_dynamic_completer(service_parser, complete_services_for_machine)
|
||||
|
||||
parser.add_argument(
|
||||
"--regenerate",
|
||||
type=bool,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="whether to regenerate facts for the specified machine",
|
||||
default=None,
|
||||
)
|
||||
parser.set_defaults(func=generate_command)
|
||||
47
pkgs/clan-cli/clan_cli/vars/list.py
Normal file
47
pkgs/clan-cli/clan_cli/vars/list.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import argparse
|
||||
import importlib
|
||||
import json
|
||||
import logging
|
||||
|
||||
from ..completions import add_dynamic_completer, complete_machines
|
||||
from ..machines.machines import Machine
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO get also secret facts
|
||||
def get_all_facts(machine: Machine) -> dict:
|
||||
public_facts_module = importlib.import_module(machine.public_facts_module)
|
||||
public_facts_store = public_facts_module.FactStore(machine=machine)
|
||||
|
||||
# for service in machine.secrets_data:
|
||||
# facts[service] = {}
|
||||
# for fact in machine.secrets_data[service]["facts"]:
|
||||
# fact_content = fact_store.get(service, fact)
|
||||
# if fact_content:
|
||||
# facts[service][fact] = fact_content.decode()
|
||||
# else:
|
||||
# log.error(f"Fact {fact} for service {service} is missing")
|
||||
return public_facts_store.get_all()
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
machine = Machine(name=args.machine, flake=args.flake)
|
||||
|
||||
# the raw_facts are bytestrings making them not json serializable
|
||||
raw_facts = get_all_facts(machine)
|
||||
facts = dict()
|
||||
for key in raw_facts["TODO"]:
|
||||
facts[key] = raw_facts["TODO"][key].decode("utf8")
|
||||
|
||||
print(json.dumps(facts, indent=4))
|
||||
|
||||
|
||||
def register_list_parser(parser: argparse.ArgumentParser) -> None:
|
||||
machines_parser = parser.add_argument(
|
||||
"machine",
|
||||
help="The machine to print facts for",
|
||||
)
|
||||
add_dynamic_completer(machines_parser, complete_machines)
|
||||
|
||||
parser.set_defaults(func=get_command)
|
||||
28
pkgs/clan-cli/clan_cli/vars/public_modules/__init__.py
Normal file
28
pkgs/clan-cli/clan_cli/vars/public_modules/__init__.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
|
||||
class FactStoreBase(ABC):
|
||||
@abstractmethod
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set(self, service: str, name: str, value: bytes) -> Path | None:
|
||||
pass
|
||||
|
||||
# get a single fact
|
||||
@abstractmethod
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
pass
|
||||
|
||||
# get all facts
|
||||
@abstractmethod
|
||||
def get_all(self) -> dict[str, dict[str, bytes]]:
|
||||
pass
|
||||
49
pkgs/clan-cli/clan_cli/vars/public_modules/in_repo.py
Normal file
49
pkgs/clan-cli/clan_cli/vars/public_modules/in_repo.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
from . import FactStoreBase
|
||||
|
||||
|
||||
class FactStore(FactStoreBase):
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
self.works_remotely = False
|
||||
|
||||
def set(self, service: str, name: str, value: bytes) -> Path | None:
|
||||
if isinstance(self.machine.flake, Path):
|
||||
fact_path = (
|
||||
self.machine.flake / "machines" / self.machine.name / "facts" / name
|
||||
)
|
||||
fact_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fact_path.touch()
|
||||
fact_path.write_bytes(value)
|
||||
return fact_path
|
||||
else:
|
||||
raise ClanError(
|
||||
f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
|
||||
)
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
fact_path = (
|
||||
self.machine.flake_dir / "machines" / self.machine.name / "facts" / name
|
||||
)
|
||||
return fact_path.exists()
|
||||
|
||||
# get a single fact
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
fact_path = (
|
||||
self.machine.flake_dir / "machines" / self.machine.name / "facts" / name
|
||||
)
|
||||
return fact_path.read_bytes()
|
||||
|
||||
# get all facts
|
||||
def get_all(self) -> dict[str, dict[str, bytes]]:
|
||||
facts_folder = self.machine.flake_dir / "machines" / self.machine.name / "facts"
|
||||
facts: dict[str, dict[str, bytes]] = {}
|
||||
facts["TODO"] = {}
|
||||
if facts_folder.exists():
|
||||
for fact_path in facts_folder.iterdir():
|
||||
facts["TODO"][fact_path.name] = fact_path.read_bytes()
|
||||
return facts
|
||||
46
pkgs/clan-cli/clan_cli/vars/public_modules/vm.py
Normal file
46
pkgs/clan-cli/clan_cli/vars/public_modules/vm.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.dirs import vm_state_dir
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
from . import FactStoreBase
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FactStore(FactStoreBase):
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
self.works_remotely = False
|
||||
self.dir = vm_state_dir(str(machine.flake), machine.name) / "facts"
|
||||
log.debug(f"FactStore initialized with dir {self.dir}")
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
fact_path = self.dir / service / name
|
||||
return fact_path.exists()
|
||||
|
||||
def set(self, service: str, name: str, value: bytes) -> Path | None:
|
||||
fact_path = self.dir / service / name
|
||||
fact_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fact_path.write_bytes(value)
|
||||
return None
|
||||
|
||||
# get a single fact
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
fact_path = self.dir / service / name
|
||||
if fact_path.exists():
|
||||
return fact_path.read_bytes()
|
||||
raise ClanError(f"Fact {name} for service {service} not found")
|
||||
|
||||
# get all facts
|
||||
def get_all(self) -> dict[str, dict[str, bytes]]:
|
||||
facts: dict[str, dict[str, bytes]] = {}
|
||||
if self.dir.exists():
|
||||
for service in self.dir.iterdir():
|
||||
facts[service.name] = {}
|
||||
for fact in service.iterdir():
|
||||
facts[service.name][fact.name] = fact.read_bytes()
|
||||
|
||||
return facts
|
||||
31
pkgs/clan-cli/clan_cli/vars/secret_modules/__init__.py
Normal file
31
pkgs/clan-cli/clan_cli/vars/secret_modules/__init__.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
|
||||
class SecretStoreBase(ABC):
|
||||
@abstractmethod
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set(
|
||||
self, service: str, name: str, value: bytes, groups: list[str]
|
||||
) -> Path | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
pass
|
||||
|
||||
def update_check(self) -> bool:
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def upload(self, output_dir: Path) -> None:
|
||||
pass
|
||||
117
pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py
Normal file
117
pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
from . import SecretStoreBase
|
||||
|
||||
|
||||
class SecretStore(SecretStoreBase):
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
|
||||
def set(
|
||||
self, service: str, name: str, value: bytes, groups: list[str]
|
||||
) -> Path | None:
|
||||
subprocess.run(
|
||||
nix_shell(
|
||||
["nixpkgs#pass"],
|
||||
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
|
||||
),
|
||||
input=value,
|
||||
check=True,
|
||||
)
|
||||
return None # we manage the files outside of the git repo
|
||||
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
return subprocess.run(
|
||||
nix_shell(
|
||||
["nixpkgs#pass"],
|
||||
["pass", "show", f"machines/{self.machine.name}/{name}"],
|
||||
),
|
||||
check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
).stdout
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
password_store = os.environ.get(
|
||||
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
|
||||
)
|
||||
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
|
||||
return secret_path.exists()
|
||||
|
||||
def generate_hash(self) -> bytes:
|
||||
password_store = os.environ.get(
|
||||
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
|
||||
)
|
||||
hashes = []
|
||||
hashes.append(
|
||||
subprocess.run(
|
||||
nix_shell(
|
||||
["nixpkgs#git"],
|
||||
[
|
||||
"git",
|
||||
"-C",
|
||||
password_store,
|
||||
"log",
|
||||
"-1",
|
||||
"--format=%H",
|
||||
f"machines/{self.machine.name}",
|
||||
],
|
||||
),
|
||||
stdout=subprocess.PIPE,
|
||||
).stdout.strip()
|
||||
)
|
||||
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
|
||||
if symlink.is_symlink():
|
||||
hashes.append(
|
||||
subprocess.run(
|
||||
nix_shell(
|
||||
["nixpkgs#git"],
|
||||
[
|
||||
"git",
|
||||
"-C",
|
||||
password_store,
|
||||
"log",
|
||||
"-1",
|
||||
"--format=%H",
|
||||
str(symlink),
|
||||
],
|
||||
),
|
||||
stdout=subprocess.PIPE,
|
||||
).stdout.strip()
|
||||
)
|
||||
|
||||
# we sort the hashes to make sure that the order is always the same
|
||||
hashes.sort()
|
||||
return b"\n".join(hashes)
|
||||
|
||||
# FIXME: add this when we switch to python3.12
|
||||
# @override
|
||||
def update_check(self) -> bool:
|
||||
local_hash = self.generate_hash()
|
||||
remote_hash = self.machine.target_host.run(
|
||||
# TODO get the path to the secrets from the machine
|
||||
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
).stdout.strip()
|
||||
|
||||
if not remote_hash:
|
||||
print("remote hash is empty")
|
||||
return False
|
||||
|
||||
return local_hash.decode() == remote_hash
|
||||
|
||||
def upload(self, output_dir: Path) -> None:
|
||||
for service in self.machine.facts_data:
|
||||
for secret in self.machine.facts_data[service]["secret"]:
|
||||
if isinstance(secret, dict):
|
||||
secret_name = secret["name"]
|
||||
else:
|
||||
# TODO: drop old format soon
|
||||
secret_name = secret
|
||||
(output_dir / secret_name).write_bytes(self.get(service, secret_name))
|
||||
(output_dir / ".pass_info").write_bytes(self.generate_hash())
|
||||
66
pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py
Normal file
66
pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.secrets.folders import sops_secrets_folder
|
||||
from clan_cli.secrets.machines import add_machine, has_machine
|
||||
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
|
||||
from clan_cli.secrets.sops import generate_private_key
|
||||
|
||||
from . import SecretStoreBase
|
||||
|
||||
|
||||
class SecretStore(SecretStoreBase):
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
|
||||
# no need to generate keys if we don't manage secrets
|
||||
if not hasattr(self.machine, "facts_data"):
|
||||
return
|
||||
|
||||
if not self.machine.facts_data:
|
||||
return
|
||||
|
||||
if has_machine(self.machine.flake_dir, self.machine.name):
|
||||
return
|
||||
priv_key, pub_key = generate_private_key()
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
sops_secrets_folder(self.machine.flake_dir)
|
||||
/ f"{self.machine.name}-age.key",
|
||||
priv_key,
|
||||
)
|
||||
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
|
||||
|
||||
def set(
|
||||
self, service: str, name: str, value: bytes, groups: list[str]
|
||||
) -> Path | None:
|
||||
path = (
|
||||
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}"
|
||||
)
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
path,
|
||||
value,
|
||||
add_machines=[self.machine.name],
|
||||
add_groups=groups,
|
||||
)
|
||||
return path
|
||||
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
return decrypt_secret(
|
||||
self.machine.flake_dir, f"{self.machine.name}-{name}"
|
||||
).encode("utf-8")
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
return has_secret(
|
||||
self.machine.flake_dir,
|
||||
f"{self.machine.name}-{name}",
|
||||
)
|
||||
|
||||
def upload(self, output_dir: Path) -> None:
|
||||
key_name = f"{self.machine.name}-age.key"
|
||||
if not has_secret(self.machine.flake_dir, key_name):
|
||||
# skip uploading the secret, not managed by us
|
||||
return
|
||||
key = decrypt_secret(self.machine.flake_dir, key_name)
|
||||
(output_dir / "key.txt").write_text(key)
|
||||
35
pkgs/clan-cli/clan_cli/vars/secret_modules/vm.py
Normal file
35
pkgs/clan-cli/clan_cli/vars/secret_modules/vm.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.dirs import vm_state_dir
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
from . import SecretStoreBase
|
||||
|
||||
|
||||
class SecretStore(SecretStoreBase):
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
self.dir = vm_state_dir(str(machine.flake), machine.name) / "secrets"
|
||||
self.dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def set(
|
||||
self, service: str, name: str, value: bytes, groups: list[str]
|
||||
) -> Path | None:
|
||||
secret_file = self.dir / service / name
|
||||
secret_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
secret_file.write_bytes(value)
|
||||
return None # we manage the files outside of the git repo
|
||||
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
secret_file = self.dir / service / name
|
||||
return secret_file.read_bytes()
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
return (self.dir / service / name).exists()
|
||||
|
||||
def upload(self, output_dir: Path) -> None:
|
||||
if os.path.exists(output_dir):
|
||||
shutil.rmtree(output_dir)
|
||||
shutil.copytree(self.dir, output_dir)
|
||||
58
pkgs/clan-cli/clan_cli/vars/upload.py
Normal file
58
pkgs/clan-cli/clan_cli/vars/upload.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import argparse
|
||||
import importlib
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from ..cmd import Log, run
|
||||
from ..completions import add_dynamic_completer, complete_machines
|
||||
from ..machines.machines import Machine
|
||||
from ..nix import nix_shell
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def upload_secrets(machine: Machine) -> None:
|
||||
secret_facts_module = importlib.import_module(machine.secret_facts_module)
|
||||
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
|
||||
|
||||
if secret_facts_store.update_check():
|
||||
log.info("Secrets already up to date")
|
||||
return
|
||||
with TemporaryDirectory() as tempdir:
|
||||
secret_facts_store.upload(Path(tempdir))
|
||||
host = machine.target_host
|
||||
|
||||
ssh_cmd = host.ssh_cmd()
|
||||
run(
|
||||
nix_shell(
|
||||
["nixpkgs#rsync"],
|
||||
[
|
||||
"rsync",
|
||||
"-e",
|
||||
" ".join(["ssh"] + ssh_cmd[2:]),
|
||||
"-az",
|
||||
"--delete",
|
||||
"--chown=root:root",
|
||||
"--chmod=D700,F600",
|
||||
f"{tempdir!s}/",
|
||||
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
|
||||
],
|
||||
),
|
||||
log=Log.BOTH,
|
||||
)
|
||||
|
||||
|
||||
def upload_command(args: argparse.Namespace) -> None:
|
||||
machine = Machine(name=args.machine, flake=args.flake)
|
||||
upload_secrets(machine)
|
||||
|
||||
|
||||
def register_upload_parser(parser: argparse.ArgumentParser) -> None:
|
||||
machines_parser = parser.add_argument(
|
||||
"machine",
|
||||
help="The machine to upload secrets to",
|
||||
)
|
||||
add_dynamic_completer(machines_parser, complete_machines)
|
||||
|
||||
parser.set_defaults(func=upload_command)
|
||||
Reference in New Issue
Block a user