Merge pull request 'clan-cli secrets: add secret_store as python class' (#733) from lassulus-HEAD into main

This commit is contained in:
clan-bot
2024-01-24 14:52:14 +00:00
25 changed files with 479 additions and 496 deletions

View File

@@ -27,11 +27,12 @@ in
]; ];
}; };
}; };
test_backup_client = { pkgs, lib, ... }: test_backup_client = { pkgs, lib, config, ... }:
let let
dependencies = [ dependencies = [
self self
pkgs.stdenv.drvPath pkgs.stdenv.drvPath
clan.clanInternals.machines.x86_64-linux.test_backup_client.config.system.clan.deployment.file
] ++ builtins.map (i: i.outPath) (builtins.attrValues self.inputs); ] ++ builtins.map (i: i.outPath) (builtins.attrValues self.inputs);
closureInfo = pkgs.closureInfo { rootPaths = dependencies; }; closureInfo = pkgs.closureInfo { rootPaths = dependencies; };
in in

View File

@@ -41,8 +41,9 @@ in
dependencies = [ dependencies = [
self self
pkgs.stdenv.drvPath pkgs.stdenv.drvPath
self.nixosConfigurations.test_install_machine.config.system.build.toplevel clan.clanInternals.machines.x86_64-linux.test_install_machine.config.system.build.toplevel
self.nixosConfigurations.test_install_machine.config.system.build.diskoScript clan.clanInternals.machines.x86_64-linux.test_install_machine.config.system.build.diskoScript
clan.clanInternals.machines.x86_64-linux.test_install_machine.config.system.clan.deployment.file
pkgs.nixos-anywhere pkgs.nixos-anywhere
] ++ builtins.map (i: i.outPath) (builtins.attrValues self.inputs); ] ++ builtins.map (i: i.outPath) (builtins.attrValues self.inputs);
closureInfo = pkgs.closureInfo { rootPaths = dependencies; }; closureInfo = pkgs.closureInfo { rootPaths = dependencies; };

View File

@@ -31,19 +31,24 @@
the directory on the deployment server where secrets are uploaded the directory on the deployment server where secrets are uploaded
''; '';
}; };
uploadSecrets = lib.mkOption { secretsModule = lib.mkOption {
type = lib.types.path; type = lib.types.str;
description = '' description = ''
script to upload secrets to the deployment server the python import path to the secrets module
''; '';
default = "${pkgs.coreutils}/bin/true";
}; };
generateSecrets = lib.mkOption { secretsData = lib.mkOption {
type = lib.types.path; type = lib.types.path;
description = '' description = ''
script to generate secrets secret data as json for the generator
''; '';
default = "${pkgs.coreutils}/bin/true"; default = pkgs.writers.writeJSON "secrets.json" (lib.mapAttrs
(_name: secret: {
secrets = builtins.attrNames secret.secrets;
facts = lib.mapAttrs (_: secret: secret.path) secret.facts;
generator = secret.generator.finalScript;
})
config.clanCore.secrets);
}; };
vm.create = lib.mkOption { vm.create = lib.mkOption {
type = lib.types.path; type = lib.types.path;
@@ -60,7 +65,7 @@
# optimization for faster secret generate/upload and machines update # optimization for faster secret generate/upload and machines update
config = { config = {
system.clan.deployment.data = { system.clan.deployment.data = {
inherit (config.system.clan) uploadSecrets generateSecrets; inherit (config.system.clan) secretsModule secretsData;
inherit (config.clan.networking) deploymentAddress; inherit (config.clan.networking) deploymentAddress;
inherit (config.clanCore) secretsUploadDirectory; inherit (config.clanCore) secretsUploadDirectory;
}; };

View File

@@ -6,7 +6,6 @@
description = '' description = ''
method to store secrets method to store secrets
custom can be used to define a custom secret store. custom can be used to define a custom secret store.
one would have to define system.clan.generateSecrets and system.clan.uploadSecrets
''; '';
}; };
@@ -71,6 +70,7 @@
internal = true; internal = true;
default = '' default = ''
export PATH="${lib.makeBinPath config.path}" export PATH="${lib.makeBinPath config.path}"
set -efu -o pipefail
${config.script} ${config.script}
''; '';
}; };

View File

@@ -1,7 +1,4 @@
{ config, lib, pkgs, ... }: { config, lib, ... }:
let
passwordstoreDir = "\${PASSWORD_STORE_DIR:-$HOME/.password-store}";
in
{ {
options.clan.password-store.targetDirectory = lib.mkOption { options.clan.password-store.targetDirectory = lib.mkOption {
type = lib.types.path; type = lib.types.path;
@@ -13,104 +10,7 @@ in
config = lib.mkIf (config.clanCore.secretStore == "password-store") { config = lib.mkIf (config.clanCore.secretStore == "password-store") {
clanCore.secretsDirectory = config.clan.password-store.targetDirectory; clanCore.secretsDirectory = config.clan.password-store.targetDirectory;
clanCore.secretsUploadDirectory = config.clan.password-store.targetDirectory; clanCore.secretsUploadDirectory = config.clan.password-store.targetDirectory;
system.clan.generateSecrets = lib.mkIf (config.clanCore.secrets != { }) ( system.clan.secretsModule = "clan_cli.secrets.modules.password_store";
pkgs.writeScript "generate-secrets" ''
#!/bin/sh
set -efu
test -d "$CLAN_DIR"
PATH=${lib.makeBinPath [
pkgs.pass
]}:$PATH
# TODO maybe initialize password store if it doesn't exist yet
${lib.foldlAttrs (acc: n: v: ''
${acc}
# ${n}
# if any of the secrets are missing, we regenerate all connected facts/secrets
(if ! (${lib.concatMapStringsSep " && " (x: "test -e ${passwordstoreDir}/machines/${config.clanCore.machineName}/${x.name}.gpg >/dev/null") (lib.attrValues v.secrets)}); then
tmpdir=$(mktemp -d)
trap "rm -rf $tmpdir" EXIT
cd $tmpdir
facts=$(mktemp -d)
trap "rm -rf $facts" EXIT
secrets=$(mktemp -d)
trap "rm -rf $secrets" EXIT
( ${v.generator.finalScript} )
${lib.concatMapStrings (fact: ''
mkdir -p "$CLAN_DIR"/"$(dirname ${fact.path})"
cp "$facts"/${fact.name} "$CLAN_DIR"/${fact.path}
'') (lib.attrValues v.facts)}
${lib.concatMapStrings (secret: ''
cat "$secrets"/${secret.name} | pass insert -m machines/${config.clanCore.machineName}/${secret.name}
'') (lib.attrValues v.secrets)}
fi)
'') "" config.clanCore.secrets}
''
);
system.clan.uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!/bin/sh
set -efu
umask 0077
PATH=${lib.makeBinPath [
pkgs.pass
pkgs.git
pkgs.findutils
pkgs.rsync
]}:$PATH:${lib.getBin pkgs.openssh}
if test -e ${passwordstoreDir}/.git; then
local_pass_info=$(
git -C ${passwordstoreDir} log -1 --format=%H machines/${config.clanCore.machineName}
# we append a hash for every symlink, otherwise we would miss updates on
# files where the symlink points to
find ${passwordstoreDir}/machines/${config.clanCore.machineName} -type l \
-exec realpath {} + |
sort |
xargs -r -n 1 git -C ${passwordstoreDir} log -1 --format=%H
)
remote_pass_info=$(ssh ${config.clan.networking.deploymentAddress} -- ${lib.escapeShellArg ''
cat ${config.clan.password-store.targetDirectory}/.pass_info || :
''} || :)
if test "$local_pass_info" = "$remote_pass_info"; then
echo secrets already match
exit 23
fi
fi
find ${passwordstoreDir}/machines/${config.clanCore.machineName} -type f -follow ! -name .gpg-id |
while read -r gpg_path; do
rel_name=''${gpg_path#${passwordstoreDir}}
rel_name=''${rel_name%.gpg}
pass_date=$(
if test -e ${passwordstoreDir}/.git; then
git -C ${passwordstoreDir} log -1 --format=%aI "$gpg_path"
fi
)
pass_name=$rel_name
tmp_path="$SECRETS_DIR"/$(basename $rel_name)
mkdir -p "$(dirname "$tmp_path")"
pass show "$pass_name" > "$tmp_path"
if [ -n "$pass_date" ]; then
touch -d "$pass_date" "$tmp_path"
fi
done
if test -n "''${local_pass_info-}"; then
echo "$local_pass_info" > "$SECRETS_DIR"/.pass_info
fi
'';
}; };
} }

View File

@@ -25,33 +25,7 @@ in
config = lib.mkIf (config.clanCore.secretStore == "sops") { config = lib.mkIf (config.clanCore.secretStore == "sops") {
clanCore.secretsDirectory = "/run/secrets"; clanCore.secretsDirectory = "/run/secrets";
clanCore.secretsPrefix = config.clanCore.machineName + "-"; clanCore.secretsPrefix = config.clanCore.machineName + "-";
system.clan = lib.mkIf (config.clanCore.secrets != { }) { system.clan.secretsModule = "clan_cli.secrets.modules.sops";
generateSecrets = pkgs.writeScript "generate-secrets" ''
#!${pkgs.python3}/bin/python
import json
import sys
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
args = json.loads(${builtins.toJSON (builtins.toJSON {
machine_name = config.clanCore.machineName;
secret_submodules = lib.mapAttrs (_name: secret: {
secrets = builtins.attrNames secret.secrets;
facts = lib.mapAttrs (_: secret: secret.path) secret.facts;
generator = secret.generator.finalScript;
}) config.clanCore.secrets;
})})
generate_secrets_from_nix(**args)
'';
uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!${pkgs.python3}/bin/python
import json
import sys
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
# the second toJSON is needed to escape the string for the python
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
upload_age_key_from_nix(**args)
'';
};
sops.secrets = builtins.mapAttrs sops.secrets = builtins.mapAttrs
(name: _: { (name: _: {
sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret"; sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret";

View File

@@ -140,8 +140,6 @@ in
toplevel = vmConfig.config.system.build.toplevel; toplevel = vmConfig.config.system.build.toplevel;
regInfo = (pkgs.closureInfo { rootPaths = vmConfig.config.virtualisation.additionalPaths; }); regInfo = (pkgs.closureInfo { rootPaths = vmConfig.config.virtualisation.additionalPaths; });
inherit (config.clan.virtualisation) memorySize cores graphics; inherit (config.clan.virtualisation) memorySize cores graphics;
generateSecrets = config.system.clan.generateSecrets;
uploadSecrets = config.system.clan.uploadSecrets;
}); });
}; };

View File

@@ -57,21 +57,15 @@ def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
default=[], default=[],
) )
def flake_path(arg: str) -> Path: def flake_path(arg: str) -> str | Path:
flake_dir = Path(arg).resolve() flake_dir = Path(arg).resolve()
if not flake_dir.exists(): if flake_dir.exists() and flake_dir.is_dir():
raise argparse.ArgumentTypeError( return flake_dir
f"flake directory {flake_dir} does not exist" return arg
)
if not flake_dir.is_dir():
raise argparse.ArgumentTypeError(
f"flake directory {flake_dir} is not a directory"
)
return flake_dir
parser.add_argument( parser.add_argument(
"--flake", "--flake",
help="path to the flake where the clan resides in", help="path to the flake where the clan resides in, can be a remote flake or local",
default=get_clan_flake_toplevel(), default=get_clan_flake_toplevel(),
type=flake_path, type=flake_path,
) )

View File

@@ -1,14 +1,16 @@
import argparse import argparse
import json import json
import logging
from ..errors import ClanError from ..errors import ClanError
from ..machines.machines import Machine from ..machines.machines import Machine
log = logging.getLogger(__name__)
def create_backup(machine: Machine, provider: str | None = None) -> None: def create_backup(machine: Machine, provider: str | None = None) -> None:
backup_scripts = json.loads( log.info(f"creating backup for {machine.name}")
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups") backup_scripts = json.loads(machine.eval_nix("config.clanCore.backups"))
)
if provider is None: if provider is None:
for provider in backup_scripts["providers"]: for provider in backup_scripts["providers"]:
proc = machine.host.run( proc = machine.host.run(
@@ -31,7 +33,7 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
def create_command(args: argparse.Namespace) -> None: def create_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake) machine = Machine(name=args.machine, flake=args.flake)
create_backup(machine=machine, provider=args.provider) create_backup(machine=machine, provider=args.provider)

View File

@@ -18,9 +18,7 @@ class Backup:
def list_provider(machine: Machine, provider: str) -> list[Backup]: def list_provider(machine: Machine, provider: str) -> list[Backup]:
results = [] results = []
backup_metadata = json.loads( backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups")
)
proc = machine.host.run( proc = machine.host.run(
["bash", "-c", backup_metadata["providers"][provider]["list"]], ["bash", "-c", backup_metadata["providers"][provider]["list"]],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@@ -45,9 +43,7 @@ def list_provider(machine: Machine, provider: str) -> list[Backup]:
def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]: def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
backup_metadata = json.loads( backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups")
)
results = [] results = []
if provider is None: if provider is None:
for _provider in backup_metadata["providers"]: for _provider in backup_metadata["providers"]:
@@ -60,7 +56,7 @@ def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake) machine = Machine(name=args.machine, flake=args.flake)
backups = list_backups(machine=machine, provider=args.provider) backups = list_backups(machine=machine, provider=args.provider)
print(backups) print(backups)

View File

@@ -11,12 +11,8 @@ from .list import Backup, list_backups
def restore_service( def restore_service(
machine: Machine, backup: Backup, provider: str, service: str machine: Machine, backup: Backup, provider: str, service: str
) -> None: ) -> None:
backup_metadata = json.loads( backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.backups") backup_folders = json.loads(machine.eval_nix("config.clanCore.state"))
)
backup_folders = json.loads(
machine.eval_nix(f"nixosConfigurations.{machine.name}.config.clanCore.state")
)
folders = backup_folders[service]["folders"] folders = backup_folders[service]["folders"]
env = os.environ.copy() env = os.environ.copy()
env["ARCHIVE_ID"] = backup.archive_id env["ARCHIVE_ID"] = backup.archive_id
@@ -77,11 +73,7 @@ def restore_backup(
if service is None: if service is None:
for backup in backups: for backup in backups:
if backup.archive_id == archive_id: if backup.archive_id == archive_id:
backup_folders = json.loads( backup_folders = json.loads(machine.eval_nix("config.clanCore.state"))
machine.eval_nix(
f"nixosConfigurations.{machine.name}.config.clanCore.state"
)
)
for _service in backup_folders: for _service in backup_folders:
restore_service(machine, backup, provider, _service) restore_service(machine, backup, provider, _service)
else: else:
@@ -91,7 +83,7 @@ def restore_backup(
def restore_command(args: argparse.Namespace) -> None: def restore_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake) machine = Machine(name=args.machine, flake=args.flake)
backups = list_backups(machine=machine, provider=args.provider) backups = list_backups(machine=machine, provider=args.provider)
restore_backup( restore_backup(
machine=machine, machine=machine,

View File

@@ -47,7 +47,7 @@ def user_gcroot_dir() -> Path:
return p return p
def specific_groot_dir(*, clan_name: str, flake_url: str) -> Path: def machine_gcroot(*, clan_name: str, flake_url: str) -> Path:
# Always build icon so that we can symlink it to the gcroot # Always build icon so that we can symlink it to the gcroot
gcroot_dir = user_gcroot_dir() gcroot_dir = user_gcroot_dir()
clan_gcroot = gcroot_dir / clan_key_safe(clan_name, flake_url) clan_gcroot = gcroot_dir / clan_key_safe(clan_name, flake_url)

View File

@@ -3,9 +3,10 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from ..cmd import run from ..cmd import run
from ..dirs import specific_groot_dir from ..dirs import machine_gcroot
from ..errors import ClanError from ..errors import ClanError
from ..machines.list import list_machines from ..machines.list import list_machines
from ..machines.machines import Machine
from ..nix import nix_build, nix_config, nix_eval, nix_metadata from ..nix import nix_build, nix_config, nix_eval, nix_metadata
from ..vms.inspect import VmConfig, inspect_vm from ..vms.inspect import VmConfig, inspect_vm
@@ -29,23 +30,24 @@ def run_cmd(cmd: list[str]) -> str:
return proc.stdout.strip() return proc.stdout.strip()
def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig: def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
config = nix_config() config = nix_config()
system = config["system"] system = config["system"]
# Check if the machine exists # Check if the machine exists
machines = list_machines(flake_url) machines = list_machines(flake_url)
if flake_attr not in machines: if machine_name not in machines:
raise ClanError( raise ClanError(
f"Machine {flake_attr} not found in {flake_url}. Available machines: {', '.join(machines)}" f"Machine {machine_name} not found in {flake_url}. Available machines: {', '.join(machines)}"
) )
vm = inspect_vm(flake_url, flake_attr) machine = Machine(machine_name, flake_url)
vm = inspect_vm(machine)
# Get the cLAN name # Get the cLAN name
cmd = nix_eval( cmd = nix_eval(
[ [
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanName' f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clanCore.clanName'
] ]
) )
res = run_cmd(cmd) res = run_cmd(cmd)
@@ -54,7 +56,7 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
# Get the clan icon path # Get the clan icon path
cmd = nix_eval( cmd = nix_eval(
[ [
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanIcon' f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clanCore.clanIcon'
] ]
) )
res = run_cmd(cmd) res = run_cmd(cmd)
@@ -67,10 +69,9 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
cmd = nix_build( cmd = nix_build(
[ [
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.clanIcon' f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clanCore.clanIcon'
], ],
specific_groot_dir(clan_name=clan_name, flake_url=str(flake_url)) machine_gcroot(clan_name=clan_name, flake_url=str(flake_url)) / "clanIcon",
/ "clanIcon",
) )
run_cmd(cmd) run_cmd(cmd)
@@ -81,7 +82,7 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
vm=vm, vm=vm,
flake_url=flake_url, flake_url=flake_url,
clan_name=clan_name, clan_name=clan_name,
flake_attr=flake_attr, flake_attr=machine_name,
nar_hash=meta["locked"]["narHash"], nar_hash=meta["locked"]["narHash"],
icon=icon_path, icon=icon_path,
description=meta.get("description"), description=meta.get("description"),
@@ -102,7 +103,7 @@ def inspect_command(args: argparse.Namespace) -> None:
flake=args.flake or Path.cwd(), flake=args.flake or Path.cwd(),
) )
res = inspect_flake( res = inspect_flake(
flake_url=inspect_options.flake, flake_attr=inspect_options.machine flake_url=inspect_options.flake, machine_name=inspect_options.machine
) )
print("cLAN name:", res.clan_name) print("cLAN name:", res.clan_name)
print("Icon:", res.icon) print("Icon:", res.icon)

View File

@@ -1,4 +1,6 @@
import argparse import argparse
import importlib
import logging
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -8,10 +10,20 @@ from ..machines.machines import Machine
from ..nix import nix_shell from ..nix import nix_shell
from ..secrets.generate import generate_secrets from ..secrets.generate import generate_secrets
log = logging.getLogger(__name__)
def install_nixos(machine: Machine, kexec: str | None = None) -> None: def install_nixos(machine: Machine, kexec: str | None = None) -> None:
log.info(f"deployment address1: {machine.deployment_info['deploymentAddress']}")
secrets_module = importlib.import_module(machine.secrets_module)
log.info(f"installing {machine.name}")
log.info(f"using secret store: {secrets_module.SecretStore}")
secret_store = secrets_module.SecretStore(machine=machine)
h = machine.host h = machine.host
log.info(f"deployment address2: {machine.deployment_info['deploymentAddress']}")
target_host = f"{h.user or 'root'}@{h.host}" target_host = f"{h.user or 'root'}@{h.host}"
log.info(f"target host: {target_host}")
flake_attr = h.meta.get("flake_attr", "") flake_attr = h.meta.get("flake_attr", "")
@@ -19,18 +31,18 @@ def install_nixos(machine: Machine, kexec: str | None = None) -> None:
with TemporaryDirectory() as tmpdir_: with TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_) tmpdir = Path(tmpdir_)
upload_dir = machine.secrets_upload_directory upload_dir_ = machine.secrets_upload_directory
if upload_dir.startswith("/"): if upload_dir_.startswith("/"):
upload_dir = upload_dir[1:] upload_dir_ = upload_dir_[1:]
upload_dir = tmpdir / upload_dir upload_dir = tmpdir / upload_dir_
upload_dir.mkdir(parents=True) upload_dir.mkdir(parents=True)
machine.run_upload_secrets(upload_dir) secret_store.upload(upload_dir)
cmd = [ cmd = [
"nixos-anywhere", "nixos-anywhere",
"-f", "-f",
f"{machine.flake_dir}#{flake_attr}", f"{machine.flake}#{flake_attr}",
"-t", "-t",
"--no-reboot", "--no-reboot",
"--extra-files", "--extra-files",
@@ -64,8 +76,11 @@ def install_command(args: argparse.Namespace) -> None:
target_host=args.target_host, target_host=args.target_host,
kexec=args.kexec, kexec=args.kexec,
) )
machine = Machine(opts.machine, flake_dir=opts.flake) machine = Machine(opts.machine, flake=opts.flake)
machine.deployment_address = opts.target_host machine.get_deployment_info()
machine.deployment_info["deploymentAddress"] = opts.target_host
log.info(f"target host: {opts.target_host}")
log.info(f"deployment address: {machine.deployment_info['deploymentAddress']}")
install_nixos(machine, kexec=opts.kexec) install_nixos(machine, kexec=opts.kexec)

View File

@@ -1,36 +1,20 @@
import json import json
import os import logging
import sys
from pathlib import Path from pathlib import Path
from ..cmd import Log, run from ..cmd import run
from ..nix import nix_build, nix_config, nix_eval from ..nix import nix_build, nix_config, nix_eval
from ..ssh import Host, parse_deployment_address from ..ssh import Host, parse_deployment_address
log = logging.getLogger(__name__)
def build_machine_data(machine_name: str, clan_dir: Path) -> dict:
config = nix_config()
system = config["system"]
proc = run(
nix_build(
[
f'{clan_dir}#clanInternals.machines."{system}"."{machine_name}".config.system.clan.deployment.file'
]
),
log=Log.BOTH,
error_msg="failed to build machine data",
)
return json.loads(Path(proc.stdout.strip()).read_text())
class Machine: class Machine:
def __init__( def __init__(
self, self,
name: str, name: str,
flake_dir: Path, flake: Path | str,
machine_data: dict | None = None, deployment_info: dict | None = None,
) -> None: ) -> None:
""" """
Creates a Machine Creates a Machine
@@ -38,64 +22,93 @@ class Machine:
@clan_dir: the directory of the clan, optional, if not set it will be determined from the current working directory @clan_dir: the directory of the clan, optional, if not set it will be determined from the current working directory
@machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data @machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data
""" """
self.name = name self.name: str = name
self.flake_dir = flake_dir self.flake: str | Path = flake
if machine_data is None:
self.machine_data = build_machine_data(name, self.flake_dir)
else:
self.machine_data = machine_data
self.deployment_address = self.machine_data["deploymentAddress"]
self.upload_secrets = self.machine_data["uploadSecrets"]
self.generate_secrets = self.machine_data["generateSecrets"]
self.secrets_upload_directory = self.machine_data["secretsUploadDirectory"]
self.eval_cache: dict[str, str] = {} self.eval_cache: dict[str, str] = {}
self.build_cache: dict[str, Path] = {} self.build_cache: dict[str, Path] = {}
if deployment_info is not None:
self.deployment_info = deployment_info
def get_deployment_info(self) -> None:
self.deployment_info = json.loads(
self.build_nix("config.system.clan.deployment.file").read_text()
)
@property
def deployment_address(self) -> str:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
return self.deployment_info["deploymentAddress"]
@property
def secrets_module(self) -> str:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
return self.deployment_info["secretsModule"]
@property
def secrets_data(self) -> dict:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
if self.deployment_info["secretsData"]:
try:
return json.loads(Path(self.deployment_info["secretsData"]).read_text())
except json.JSONDecodeError:
log.error(
f"Failed to parse secretsData for machine {self.name} as json"
)
return {}
return {}
@property
def secrets_upload_directory(self) -> str:
if not hasattr(self, "deployment_info"):
self.get_deployment_info()
return self.deployment_info["secretsUploadDirectory"]
@property
def flake_dir(self) -> Path:
if isinstance(self.flake, Path):
return self.flake
if hasattr(self, "flake_path"):
return Path(self.flake_path)
print(nix_eval([f"{self.flake}"]))
self.flake_path = run(nix_eval([f"{self.flake}"])).stdout.strip()
return Path(self.flake_path)
@property @property
def host(self) -> Host: def host(self) -> Host:
return parse_deployment_address( return parse_deployment_address(
self.name, self.deployment_address, meta={"machine": self} self.name, self.deployment_address, meta={"machine": self}
) )
def run_upload_secrets(self, secrets_dir: Path) -> bool:
"""
Upload the secrets to the provided directory
@secrets_dir: the directory to store the secrets in
"""
env = os.environ.copy()
env["CLAN_DIR"] = str(self.flake_dir)
env["PYTHONPATH"] = str(
":".join(sys.path)
) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
print(f"uploading secrets... {self.upload_secrets}")
proc = run(
[self.upload_secrets],
env=env,
check=False,
)
if proc.returncode == 23:
print("no secrets to upload")
return False
elif proc.returncode != 0:
print("failed generate secrets directory")
exit(1)
return True
def eval_nix(self, attr: str, refresh: bool = False) -> str: def eval_nix(self, attr: str, refresh: bool = False) -> str:
""" """
eval a nix attribute of the machine eval a nix attribute of the machine
@attr: the attribute to get @attr: the attribute to get
""" """
config = nix_config()
system = config["system"]
attr = f'clanInternals.machines."{system}".{self.name}.{attr}'
if attr in self.eval_cache and not refresh: if attr in self.eval_cache and not refresh:
return self.eval_cache[attr] return self.eval_cache[attr]
output = run( if isinstance(self.flake, Path):
nix_eval([f"path:{self.flake_dir}#{attr}"]), if (self.flake / ".git").exists():
).stdout.strip() flake = f"git+file://{self.flake}"
else:
flake = f"path:{self.flake}"
else:
flake = self.flake
log.info(f"evaluating {flake}#{attr}")
output = run(nix_eval([f"{flake}#{attr}"])).stdout.strip()
self.eval_cache[attr] = output self.eval_cache[attr] = output
return output return output
@@ -104,10 +117,21 @@ class Machine:
build a nix attribute of the machine build a nix attribute of the machine
@attr: the attribute to get @attr: the attribute to get
""" """
config = nix_config()
system = config["system"]
attr = f'clanInternals.machines."{system}".{self.name}.{attr}'
if attr in self.build_cache and not refresh: if attr in self.build_cache and not refresh:
return self.build_cache[attr] return self.build_cache[attr]
outpath = run(
nix_build([f"path:{self.flake_dir}#{attr}"]), if isinstance(self.flake, Path):
).stdout.strip() flake = f"path:{self.flake}"
else:
flake = self.flake
log.info(f"building {flake}#{attr}")
outpath = run(nix_build([f"{flake}#{attr}"])).stdout.strip()
self.build_cache[attr] = Path(outpath) self.build_cache[attr] = Path(outpath)
return Path(outpath) return Path(outpath)

View File

@@ -94,7 +94,7 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
machine_data["deploymentAddress"], machine_data["deploymentAddress"],
meta={ meta={
"machine": Machine( "machine": Machine(
name=name, flake_dir=clan_dir, machine_data=machine_data name=name, flake=clan_dir, deployment_info=machine_data
) )
}, },
) )
@@ -105,7 +105,7 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGroup: def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGroup:
hosts = [] hosts = []
for name in machine_names: for name in machine_names:
machine = Machine(name=name, flake_dir=flake_dir) machine = Machine(name=name, flake=flake_dir)
hosts.append(machine.host) hosts.append(machine.host)
return HostGroup(hosts) return HostGroup(hosts)
@@ -115,8 +115,8 @@ def update(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") raise ClanError("Could not find clan flake toplevel directory")
if len(args.machines) == 1 and args.target_host is not None: if len(args.machines) == 1 and args.target_host is not None:
machine = Machine(name=args.machines[0], flake_dir=args.flake) machine = Machine(name=args.machines[0], flake=args.flake)
machine.deployment_address = args.target_host machine.deployment_info["deploymentAddress"] = args.target_host
host = parse_deployment_address( host = parse_deployment_address(
args.machines[0], args.machines[0],
args.target_host, args.target_host,

View File

@@ -1,33 +1,80 @@
import argparse import argparse
import importlib
import logging import logging
import os import os
import sys import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, run from clan_cli.cmd import run
from ..errors import ClanError
from ..machines.machines import Machine from ..machines.machines import Machine
from ..nix import nix_shell
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def generate_secrets(machine: Machine) -> None: def generate_secrets(machine: Machine) -> None:
env = os.environ.copy() secrets_module = importlib.import_module(machine.secrets_module)
env["CLAN_DIR"] = str(machine.flake_dir) secret_store = secrets_module.SecretStore(machine=machine)
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
print(f"generating secrets... {machine.generate_secrets}") with TemporaryDirectory() as d:
run( for service in machine.secrets_data:
[machine.generate_secrets], print(service)
env=env, tmpdir = Path(d) / service
error_msg="failed to generate secrets", # check if all secrets exist and generate them if at least one is missing
log=Log.BOTH, needs_regeneration = any(
) not secret_store.exists(service, secret)
for secret in machine.secrets_data[service]["secrets"]
) or any(
not (machine.flake / fact).exists()
for fact in machine.secrets_data[service]["facts"].values()
)
for fact in machine.secrets_data[service]["facts"].values():
if not (machine.flake / fact).exists():
print(f"fact {fact} is missing")
if needs_regeneration:
env = os.environ.copy()
facts_dir = tmpdir / "facts"
facts_dir.mkdir(parents=True)
env["facts"] = str(facts_dir)
secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(parents=True)
env["secrets"] = str(secrets_dir)
# TODO use bubblewrap here
cmd = nix_shell(
["nixpkgs#bash"],
["bash", "-c", machine.secrets_data[service]["generator"]],
)
run(
cmd,
env=env,
)
# store secrets
for secret in machine.secrets_data[service]["secrets"]:
secret_file = secrets_dir / secret
if not secret_file.is_file():
msg = f"did not generate a file for '{secret}' when running the following command:\n"
msg += machine.secrets_data[service]["generator"]
raise ClanError(msg)
secret_store.set(service, secret, secret_file.read_text())
# store facts
for name, fact_path in machine.secrets_data[service]["facts"].items():
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += machine.secrets_data[service]["generator"]
raise ClanError(msg)
fact_path = machine.flake / fact_path
fact_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(fact_file, fact_path)
print("successfully generated secrets") print("successfully generated secrets")
def generate_command(args: argparse.Namespace) -> None: def generate_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake) machine = Machine(name=args.machine, flake=args.flake)
generate_secrets(machine) generate_secrets(machine)

View File

@@ -0,0 +1,106 @@
import os
import subprocess
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(self, service: str, name: str, value: str) -> None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value.encode("utf-8"),
check=True,
)
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
print(f"checking {secret_path}")
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
def update_check(self) -> bool:
local_hash = self.generate_hash()
remote_hash = self.machine.host.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash.decode() == remote_hash
def upload(self, output_dir: Path) -> None:
for service in self.machine.secrets_data:
for secret in self.machine.secrets_data[service]["secrets"]:
(output_dir / secret).write_bytes(self.get(service, secret))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -0,0 +1,54 @@
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "secrets_data"):
return
if not self.machine.secrets_data:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(self, _service: str, name: str, value: str) -> None:
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
value,
add_machines=[self.machine.name],
)
def get(self, _service: str, _name: str) -> bytes:
raise NotImplementedError()
def exists(self, _service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
(output_dir / "key.txt").write_text(key)

View File

@@ -1,128 +0,0 @@
import logging
import os
import shlex
import shutil
import sys
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from clan_cli.cmd import Log, run
from clan_cli.nix import nix_shell
from ..errors import ClanError
from .folders import sops_secrets_folder
from .machines import add_machine, has_machine
from .secrets import decrypt_secret, encrypt_secret, has_secret
from .sops import generate_private_key
log = logging.getLogger(__name__)
def generate_host_key(flake_dir: Path, machine_name: str) -> None:
if has_machine(flake_dir, machine_name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
flake_dir,
sops_secrets_folder(flake_dir) / f"{machine_name}-age.key",
priv_key,
)
add_machine(flake_dir, machine_name, pub_key, False)
def generate_secrets_group(
flake_dir: Path,
secret_group: str,
machine_name: str,
tempdir: Path,
secret_options: dict[str, Any],
) -> None:
clan_dir = flake_dir
secrets = secret_options["secrets"]
needs_regeneration = any(
not has_secret(flake_dir, f"{machine_name}-{name}") for name in secrets
) or any(
not (flake_dir / fact).exists() for fact in secret_options["facts"].values()
)
generator = secret_options["generator"]
subdir = tempdir / secret_group
if needs_regeneration:
facts_dir = subdir / "facts"
facts_dir.mkdir(parents=True)
secrets_dir = subdir / "secrets"
secrets_dir.mkdir(parents=True)
text = f"""
set -euo pipefail
export facts={shlex.quote(str(facts_dir))}
export secrets={shlex.quote(str(secrets_dir))}
{generator}
"""
cmd = nix_shell(["nixpkgs#bash"], ["bash", "-c", text])
run(cmd, log=Log.BOTH)
for name in secrets:
secret_file = secrets_dir / name
if not secret_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += text
raise ClanError(msg)
encrypt_secret(
flake_dir,
sops_secrets_folder(flake_dir) / f"{machine_name}-{name}",
secret_file.read_text(),
add_machines=[machine_name],
)
for name, fact_path in secret_options["facts"].items():
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += text
raise ClanError(msg)
fact_path = clan_dir / fact_path
fact_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(fact_file, fact_path)
# this is called by the sops.nix clan core module
def generate_secrets_from_nix(
machine_name: str,
secret_submodules: dict[str, Any],
) -> None:
flake_dir = Path(os.environ["CLAN_DIR"])
generate_host_key(flake_dir, machine_name)
errors = {}
log.debug("Generating secrets for machine %s and flake %s", machine_name, flake_dir)
with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items():
try:
generate_secrets_group(
flake_dir, secret_group, machine_name, Path(d), secret_options
)
except ClanError as e:
errors[secret_group] = e
for secret_group, error in errors.items():
print(f"failed to generate secrets for {machine_name}/{secret_group}:")
print(error, file=sys.stderr)
if len(errors) > 0:
sys.exit(1)
# this is called by the sops.nix clan core module
def upload_age_key_from_nix(
machine_name: str,
) -> None:
flake_dir = Path(os.environ["CLAN_DIR"])
log.debug("Uploading secrets for machine %s and flake %s", machine_name, flake_dir)
secret_name = f"{machine_name}-age.key"
if not has_secret(
flake_dir, secret_name
): # skip uploading the secret, not managed by us
return
secret = decrypt_secret(flake_dir, secret_name)
secrets_dir = Path(os.environ["SECRETS_DIR"])
(secrets_dir / "key.txt").write_text(secret)

View File

@@ -1,4 +1,5 @@
import argparse import argparse
import importlib
import logging import logging
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -11,33 +12,38 @@ log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None: def upload_secrets(machine: Machine) -> None:
with TemporaryDirectory() as tempdir_: secrets_module = importlib.import_module(machine.secrets_module)
tempdir = Path(tempdir_) secret_store = secrets_module.SecretStore(machine=machine)
should_upload = machine.run_upload_secrets(tempdir)
if should_upload: update_check = getattr(secret_store, "update_check", None)
host = machine.host if callable(update_check):
if update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
secret_store.upload(Path(tempdir))
host = machine.host
ssh_cmd = host.ssh_cmd() ssh_cmd = host.ssh_cmd()
run( run(
nix_shell( nix_shell(
["nixpkgs#rsync"], ["nixpkgs#rsync"],
[ [
"rsync", "rsync",
"-e", "-e",
" ".join(["ssh"] + ssh_cmd[2:]), " ".join(["ssh"] + ssh_cmd[2:]),
"-az", "-az",
"--delete", "--delete",
f"{tempdir!s}/", f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/", f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
], ],
), ),
log=Log.BOTH, log=Log.BOTH,
) )
def upload_command(args: argparse.Namespace) -> None: def upload_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=args.flake) machine = Machine(name=args.machine, flake=args.flake)
upload_secrets(machine) upload_secrets(machine)

View File

@@ -3,15 +3,14 @@ import json
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from ..cmd import run from ..machines.machines import Machine
from ..nix import nix_config, nix_eval
@dataclass @dataclass
class VmConfig: class VmConfig:
clan_name: str machine_name: str
flake_url: str | Path flake_url: str | Path
flake_attr: str clan_name: str
cores: int cores: int
memory_size: int memory_size: int
@@ -19,21 +18,9 @@ class VmConfig:
wayland: bool = False wayland: bool = False
def inspect_vm(flake_url: str | Path, flake_attr: str) -> VmConfig: def inspect_vm(machine: Machine) -> VmConfig:
config = nix_config() data = json.loads(machine.eval_nix("config.clanCore.vm.inspect"))
system = config["system"] return VmConfig(machine_name=machine.name, flake_url=machine.flake, **data)
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.vm.inspect',
"--refresh",
]
)
proc = run(cmd)
data = json.loads(proc.stdout)
return VmConfig(flake_url=flake_url, flake_attr=flake_attr, **data)
@dataclass @dataclass
@@ -47,9 +34,9 @@ def inspect_command(args: argparse.Namespace) -> None:
machine=args.machine, machine=args.machine,
flake=args.flake or Path.cwd(), flake=args.flake or Path.cwd(),
) )
res = inspect_vm(
flake_url=inspect_options.flake, flake_attr=inspect_options.machine machine = Machine(inspect_options.machine, inspect_options.flake)
) res = inspect_vm(machine)
print("Cores:", res.cores) print("Cores:", res.cores)
print("Memory size:", res.memory_size) print("Memory size:", res.memory_size)
print("Graphics:", res.graphics) print("Graphics:", res.graphics)

View File

@@ -1,17 +1,19 @@
import argparse import argparse
import importlib
import json import json
import logging import logging
import os import os
import sys
import tempfile import tempfile
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import IO from typing import IO
from ..cmd import Log, run from ..cmd import Log, run
from ..dirs import module_root, specific_groot_dir, vm_state_dir from ..dirs import machine_gcroot, module_root, vm_state_dir
from ..errors import ClanError from ..errors import ClanError
from ..machines.machines import Machine
from ..nix import nix_build, nix_config, nix_shell from ..nix import nix_build, nix_config, nix_shell
from ..secrets.generate import generate_secrets
from .inspect import VmConfig, inspect_vm from .inspect import VmConfig, inspect_vm
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -73,7 +75,7 @@ def qemu_command(
# fmt: off # fmt: off
command = [ command = [
"qemu-kvm", "qemu-kvm",
"-name", vm.flake_attr, "-name", vm.machine_name,
"-m", f'{nixos_config["memorySize"]}M', "-m", f'{nixos_config["memorySize"]}M',
"-smp", str(nixos_config["cores"]), "-smp", str(nixos_config["cores"]),
"-cpu", "max", "-cpu", "max",
@@ -102,56 +104,48 @@ def qemu_command(
return command return command
def get_vm_create_info(vm: VmConfig, nix_options: list[str]) -> dict[str, str]: # TODO move this to the Machines class
def get_vm_create_info(
machine: Machine, vm: VmConfig, nix_options: list[str]
) -> dict[str, str]:
config = nix_config() config = nix_config()
system = config["system"] system = config["system"]
clan_dir = vm.flake_url clan_dir = machine.flake
machine = vm.flake_attr
cmd = nix_build( cmd = nix_build(
[ [
f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.system.clan.vm.create', f'{clan_dir}#clanInternals.machines."{system}"."{machine.name}".config.system.clan.vm.create',
*nix_options, *nix_options,
], ],
specific_groot_dir(clan_name=vm.clan_name, flake_url=str(vm.flake_url)) machine_gcroot(clan_name=vm.clan_name, flake_url=str(vm.flake_url))
/ f"vm-{machine}", / f"vm-{machine.name}",
)
proc = run(
cmd, log=Log.BOTH, error_msg=f"Could not build vm config for {machine.name}"
) )
proc = run(cmd, log=Log.BOTH, error_msg=f"Could not build vm config for {machine}")
try: try:
return json.loads(Path(proc.stdout.strip()).read_text()) return json.loads(Path(proc.stdout.strip()).read_text())
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ClanError(f"Failed to parse vm config: {e}") raise ClanError(f"Failed to parse vm config: {e}")
def generate_secrets( def get_secrets(
vm: VmConfig, machine: Machine,
nixos_config: dict[str, str],
tmpdir: Path, tmpdir: Path,
log_fd: IO[str] | None,
) -> Path: ) -> Path:
secrets_dir = tmpdir / "secrets" secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(exist_ok=True) secrets_dir.mkdir(exist_ok=True)
env = os.environ.copy() secrets_module = importlib.import_module(machine.secrets_module)
env["CLAN_DIR"] = str(vm.flake_url) secret_store = secrets_module.SecretStore(machine=machine)
env["PYTHONPATH"] = str(":".join(sys.path)) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
# Only generate secrets for local clans # Only generate secrets for local clans
if isinstance(vm.flake_url, Path) and vm.flake_url.is_dir(): if isinstance(machine.flake, Path) and machine.flake.is_dir():
if Path(vm.flake_url).is_dir(): generate_secrets(machine)
run([nixos_config["generateSecrets"], vm.clan_name], env=env) else:
else: log.warning("won't generate secrets for non local clan")
log.warning("won't generate secrets for non local clan")
cmd = [nixos_config["uploadSecrets"]] secret_store.upload(secrets_dir)
run(
cmd,
env=env,
log=Log.BOTH,
error_msg=f"Could not upload secrets for {vm.flake_attr}",
)
return secrets_dir return secrets_dir
@@ -192,26 +186,28 @@ def prepare_disk(tmpdir: Path, log_fd: IO[str] | None) -> Path:
def run_vm( def run_vm(
vm: VmConfig, nix_options: list[str] = [], log_fd: IO[str] | None = None vm: VmConfig,
nix_options: list[str] = [],
log_fd: IO[str] | None = None,
) -> None: ) -> None:
""" """
log_fd can be used to stream the output of all commands to a UI log_fd can be used to stream the output of all commands to a UI
""" """
machine = vm.flake_attr machine = Machine(vm.machine_name, vm.flake_url)
log.debug(f"Creating VM for {machine}") log.debug(f"Creating VM for {machine}")
# TODO: We should get this from the vm argument # TODO: We should get this from the vm argument
nixos_config = get_vm_create_info(vm, nix_options) nixos_config = get_vm_create_info(machine, vm, nix_options)
with tempfile.TemporaryDirectory() as tmpdir_: with tempfile.TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_) tmpdir = Path(tmpdir_)
xchg_dir = tmpdir / "xchg" xchg_dir = tmpdir / "xchg"
xchg_dir.mkdir(exist_ok=True) xchg_dir.mkdir(exist_ok=True)
secrets_dir = generate_secrets(vm, nixos_config, tmpdir, log_fd) secrets_dir = get_secrets(machine, tmpdir)
disk_img = prepare_disk(tmpdir, log_fd) disk_img = prepare_disk(tmpdir, log_fd)
state_dir = vm_state_dir(vm.clan_name, str(vm.flake_url), machine) state_dir = vm_state_dir(vm.clan_name, str(machine.flake), machine.name)
state_dir.mkdir(parents=True, exist_ok=True) state_dir.mkdir(parents=True, exist_ok=True)
qemu_cmd = qemu_command( qemu_cmd = qemu_command(
@@ -244,7 +240,6 @@ def run_vm(
@dataclass @dataclass
class RunOptions: class RunOptions:
machine: str machine: str
flake_url: str | None
flake: Path flake: Path
nix_options: list[str] = field(default_factory=list) nix_options: list[str] = field(default_factory=list)
wayland: bool = False wayland: bool = False
@@ -253,14 +248,14 @@ class RunOptions:
def run_command(args: argparse.Namespace) -> None: def run_command(args: argparse.Namespace) -> None:
run_options = RunOptions( run_options = RunOptions(
machine=args.machine, machine=args.machine,
flake_url=args.flake_url, flake=args.flake,
flake=args.flake or Path.cwd(),
nix_options=args.option, nix_options=args.option,
wayland=args.wayland, wayland=args.wayland,
) )
flake_url = run_options.flake_url or run_options.flake machine = Machine(run_options.machine, run_options.flake)
vm = inspect_vm(flake_url=flake_url, flake_attr=run_options.machine)
vm = inspect_vm(machine=machine)
# TODO: allow to set this in the config # TODO: allow to set this in the config
vm.wayland = run_options.wayland vm.wayland = run_options.wayland

View File

@@ -52,13 +52,16 @@ def test_run(
def test_vm_persistence( def test_vm_persistence(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
temporary_home: Path, temporary_home: Path,
age_keys: list["KeyPair"],
) -> None: ) -> None:
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "new-clan", flake_template=CLAN_CORE / "templates" / "new-clan",
substitutions=dict( substitutions={
__CHANGE_ME__="_test_vm_persistence", "__CHANGE_ME__": "_test_vm_persistence",
), "git+https://git.clan.lol/clan/clan-core": "path://" + str(CLAN_CORE),
},
machine_configs=dict( machine_configs=dict(
my_machine=dict( my_machine=dict(
clanCore=dict(state=dict(my_state=dict(folders=["/var/my-state"]))), clanCore=dict(state=dict(my_state=dict(folders=["/var/my-state"]))),
@@ -90,7 +93,17 @@ def test_vm_persistence(
), ),
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
Cli().run(["vms", "run", "my_machine"]) cli = Cli()
cli.run(
[
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
)
cli.run(["vms", "run", "my_machine"])
test_file = ( test_file = (
vm_state_dir("_test_vm_persistence", str(flake.path), "my_machine") vm_state_dir("_test_vm_persistence", str(flake.path), "my_machine")
/ "var" / "var"