move secret stores into clan_cli codebase

This commit is contained in:
lassulus
2024-01-17 18:00:30 +01:00
parent b5a12bc4ba
commit 1d4e533438
14 changed files with 135 additions and 250 deletions

View File

@@ -31,27 +31,10 @@
the directory on the deployment server where secrets are uploaded the directory on the deployment server where secrets are uploaded
''; '';
}; };
uploadSecrets = lib.mkOption {
type = lib.types.path;
description = ''
script to upload secrets to the deployment server
'';
default = "${pkgs.coreutils}/bin/true";
};
generateSecrets = lib.mkOption {
type = lib.types.path;
description = ''
script to generate secrets
'';
default = "${pkgs.coreutils}/bin/true";
};
secretsModule = lib.mkOption { secretsModule = lib.mkOption {
type = lib.types.path; type = lib.types.str;
default = "${pkgs.coreutils}/bin/true";
description = '' description = ''
the module that generates secrets. the python import path to the secrets module
A needs to define a python class SecretStore which implements the following methods:
set, get, exists
''; '';
}; };
secretsData = lib.mkOption { secretsData = lib.mkOption {
@@ -82,7 +65,7 @@
# optimization for faster secret generate/upload and machines update # optimization for faster secret generate/upload and machines update
config = { config = {
system.clan.deployment.data = { system.clan.deployment.data = {
inherit (config.system.clan) uploadSecrets generateSecrets secretsModule secretsData; inherit (config.system.clan) secretsModule secretsData;
inherit (config.clan.networking) deploymentAddress; inherit (config.clan.networking) deploymentAddress;
inherit (config.clanCore) secretsUploadDirectory; inherit (config.clanCore) secretsUploadDirectory;
}; };

View File

@@ -6,7 +6,6 @@
description = '' description = ''
method to store secrets method to store secrets
custom can be used to define a custom secret store. custom can be used to define a custom secret store.
one would have to define system.clan.generateSecrets and system.clan.uploadSecrets
''; '';
}; };

View File

@@ -1,4 +1,4 @@
{ config, lib, pkgs, ... }: { config, lib, ... }:
{ {
options.clan.password-store.targetDirectory = lib.mkOption { options.clan.password-store.targetDirectory = lib.mkOption {
type = lib.types.path; type = lib.types.path;
@@ -10,81 +10,7 @@
config = lib.mkIf (config.clanCore.secretStore == "password-store") { config = lib.mkIf (config.clanCore.secretStore == "password-store") {
clanCore.secretsDirectory = config.clan.password-store.targetDirectory; clanCore.secretsDirectory = config.clan.password-store.targetDirectory;
clanCore.secretsUploadDirectory = config.clan.password-store.targetDirectory; clanCore.secretsUploadDirectory = config.clan.password-store.targetDirectory;
system.clan.secretsModule = pkgs.writeText "pass.py" '' system.clan.secretsModule = "clan_cli.secrets.modules.password_store";
import os
import subprocess
from clan_cli.machines.machines import Machine
from pathlib import Path
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(self, service: str, name: str, value: str):
subprocess.run(
["${pkgs.pass}/bin/pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
input=value.encode("utf-8"),
check=True,
)
def get(self, service: str, name: str) -> str:
return subprocess.run(
["${pkgs.pass}/bin/pass", "show", f"machines/{self.machine.name}/{name}"],
check=True,
stdout=subprocess.PIPE,
text=True,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get("PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store")
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
print(f"checking {secret_path}")
return secret_path.exists()
def generate_hash(self) -> str:
password_store = os.environ.get("PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store")
hashes = []
hashes.append(
subprocess.run(
["${pkgs.git}/bin/git", "-C", password_store, "log", "-1", "--format=%H", f"machines/{self.machine.name}"],
stdout=subprocess.PIPE,
text=True,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
["${pkgs.git}/bin/git", "-C", password_store, "log", "-1", "--format=%H", symlink],
stdout=subprocess.PIPE,
text=True,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return "\n".join(hashes)
def update_check(self):
local_hash = self.generate_hash()
remote_hash = self.machine.host.run(
["cat", "${config.clan.password-store.targetDirectory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash == remote_hash
def upload(self, output_dir: Path, secrets: list[str, str]) -> None:
for service, secret in secrets:
(output_dir / secret).write_text(self.get(service, secret))
(output_dir / ".pass_info").write_text(self.generate_hash())
'';
}; };
} }

View File

@@ -25,9 +25,7 @@ in
config = lib.mkIf (config.clanCore.secretStore == "sops") { config = lib.mkIf (config.clanCore.secretStore == "sops") {
clanCore.secretsDirectory = "/run/secrets"; clanCore.secretsDirectory = "/run/secrets";
clanCore.secretsPrefix = config.clanCore.machineName + "-"; clanCore.secretsPrefix = config.clanCore.machineName + "-";
system.clan = lib.mkIf (config.clanCore.secrets != { }) { system.clan.secretsModule = "clan_cli.secrets.modules.sops";
secretsModule = ./sops/sops.py;
};
sops.secrets = builtins.mapAttrs sops.secrets = builtins.mapAttrs
(name: _: { (name: _: {
sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret"; sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret";

View File

@@ -1,93 +0,0 @@
{ config, lib, pkgs, ... }:
let
secretsDir = config.clanCore.clanDir + "/sops/secrets";
groupsDir = config.clanCore.clanDir + "/sops/groups";
# My symlink is in the nixos module detected as a directory also it works in the repl. Is this because of pure evaluation?
containsSymlink = path:
builtins.pathExists path && (builtins.readFileType path == "directory" || builtins.readFileType path == "symlink");
containsMachine = parent: name: type:
type == "directory" && containsSymlink "${parent}/${name}/machines/${config.clanCore.machineName}";
containsMachineOrGroups = name: type:
(containsMachine secretsDir name type) || lib.any (group: type == "directory" && containsSymlink "${secretsDir}/${name}/groups/${group}") groups;
filterDir = filter: dir:
lib.optionalAttrs (builtins.pathExists dir)
(lib.filterAttrs filter (builtins.readDir dir));
groups = builtins.attrNames (filterDir (containsMachine groupsDir) groupsDir);
secrets = filterDir containsMachineOrGroups secretsDir;
in
{
config = lib.mkIf (config.clanCore.secretStore == "sops") {
clanCore.secretsDirectory = "/run/secrets";
clanCore.secretsPrefix = config.clanCore.machineName + "-";
system.clan = lib.mkIf (config.clanCore.secrets != { }) {
secretsModule = pkgs.writeText "sops.py" ''
from pathlib import Path
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
from clan_cli.secrets.machines import has_machine, add_machine
from clan_cli.machines.machines import Machine
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-age.key",
priv_key,
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(self, service: str, name: str, value: str):
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
value,
add_machines=[self.machine.name],
)
def get(self, service: str, name: str) -> bytes:
# TODO: add support for getting a secret
pass
def exists(self, service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path, secrets: list[str, str]) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
(output_dir / "key.txt").write_text(key)
'';
};
sops.secrets = builtins.mapAttrs
(name: _: {
sopsFile = config.clanCore.clanDir + "/sops/secrets/${name}/secret";
format = "binary";
})
secrets;
# To get proper error messages about missing secrets we need a dummy secret file that is always present
sops.defaultSopsFile = lib.mkIf config.sops.validateSopsFiles (lib.mkDefault (builtins.toString (pkgs.writeText "dummy.yaml" "")));
sops.age.keyFile = lib.mkIf (builtins.pathExists (config.clanCore.clanDir + "/sops/secrets/${config.clanCore.machineName}-age.key/secret"))
(lib.mkDefault "/var/lib/sops-nix/key.txt");
clanCore.secretsUploadDirectory = lib.mkDefault "/var/lib/sops-nix";
};
}

View File

@@ -140,8 +140,6 @@ in
toplevel = vmConfig.config.system.build.toplevel; toplevel = vmConfig.config.system.build.toplevel;
regInfo = (pkgs.closureInfo { rootPaths = vmConfig.config.virtualisation.additionalPaths; }); regInfo = (pkgs.closureInfo { rootPaths = vmConfig.config.virtualisation.additionalPaths; });
inherit (config.clan.virtualisation) memorySize cores graphics; inherit (config.clan.virtualisation) memorySize cores graphics;
generateSecrets = config.system.clan.generateSecrets;
uploadSecrets = config.system.clan.uploadSecrets;
}); });
}; };

View File

@@ -1,4 +1,5 @@
import argparse import argparse
import importlib
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -10,6 +11,9 @@ from ..secrets.generate import generate_secrets
def install_nixos(machine: Machine, kexec: str | None = None) -> None: def install_nixos(machine: Machine, kexec: str | None = None) -> None:
secrets_module = importlib.import_module(machine.secrets_module)
secret_store = secrets_module.SecretStore(machine=machine)
h = machine.host h = machine.host
target_host = f"{h.user or 'root'}@{h.host}" target_host = f"{h.user or 'root'}@{h.host}"
@@ -25,7 +29,7 @@ def install_nixos(machine: Machine, kexec: str | None = None) -> None:
upload_dir = upload_dir[1:] upload_dir = upload_dir[1:]
upload_dir = tmpdir / upload_dir upload_dir = tmpdir / upload_dir
upload_dir.mkdir(parents=True) upload_dir.mkdir(parents=True)
machine.run_upload_secrets(upload_dir) secret_store.upload(upload_dir)
cmd = [ cmd = [
"nixos-anywhere", "nixos-anywhere",

View File

@@ -1,6 +1,4 @@
import json import json
import os
import sys
from pathlib import Path from pathlib import Path
from ..cmd import Log, run from ..cmd import Log, run
@@ -47,8 +45,6 @@ class Machine:
self.machine_data = machine_data self.machine_data = machine_data
self.deployment_address = self.machine_data["deploymentAddress"] self.deployment_address = self.machine_data["deploymentAddress"]
self.upload_secrets = self.machine_data["uploadSecrets"]
self.generate_secrets = self.machine_data["generateSecrets"]
self.secrets_module = self.machine_data["secretsModule"] self.secrets_module = self.machine_data["secretsModule"]
self.secrets_data = json.loads( self.secrets_data = json.loads(
Path(self.machine_data["secretsData"]).read_text() Path(self.machine_data["secretsData"]).read_text()
@@ -63,32 +59,6 @@ class Machine:
self.name, self.deployment_address, meta={"machine": self} self.name, self.deployment_address, meta={"machine": self}
) )
def run_upload_secrets(self, secrets_dir: Path) -> bool:
"""
Upload the secrets to the provided directory
@secrets_dir: the directory to store the secrets in
"""
env = os.environ.copy()
env["CLAN_DIR"] = str(self.flake_dir)
env["PYTHONPATH"] = str(
":".join(sys.path)
) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
print(f"uploading secrets... {self.upload_secrets}")
proc = run(
[self.upload_secrets],
env=env,
check=False,
)
if proc.returncode == 23:
print("no secrets to upload")
return False
elif proc.returncode != 0:
print("failed generate secrets directory")
exit(1)
return True
def eval_nix(self, attr: str, refresh: bool = False) -> str: def eval_nix(self, attr: str, refresh: bool = False) -> str:
""" """
eval a nix attribute of the machine eval a nix attribute of the machine

View File

@@ -1,9 +1,8 @@
import argparse import argparse
import importlib
import logging import logging
import os import os
import shutil import shutil
import types
from importlib.machinery import SourceFileLoader
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -17,10 +16,7 @@ log = logging.getLogger(__name__)
def generate_secrets(machine: Machine) -> None: def generate_secrets(machine: Machine) -> None:
# load secrets module from file secrets_module = importlib.import_module(machine.secrets_module)
loader = SourceFileLoader("secret_module", machine.secrets_module)
secrets_module = types.ModuleType(loader.name)
loader.exec_module(secrets_module)
secret_store = secrets_module.SecretStore(machine=machine) secret_store = secrets_module.SecretStore(machine=machine)
with TemporaryDirectory() as d: with TemporaryDirectory() as d:

View File

@@ -0,0 +1,106 @@
import os
import subprocess
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
class SecretStore:
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(self, service: str, name: str, value: str) -> None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value.encode("utf-8"),
check=True,
)
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
print(f"checking {secret_path}")
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
def update_check(self) -> bool:
local_hash = self.generate_hash()
remote_hash = self.machine.host.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash.decode() == remote_hash
def upload(self, output_dir: Path) -> None:
for service in self.machine.secrets_data:
for secret in self.machine.secrets_data[service]["secrets"]:
(output_dir / secret).write_bytes(self.get(service, secret))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -10,6 +10,13 @@ from clan_cli.secrets.sops import generate_private_key
class SecretStore: class SecretStore:
def __init__(self, machine: Machine) -> None: def __init__(self, machine: Machine) -> None:
self.machine = machine self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "secrets_data"):
return
if not self.machine.secrets_data:
return
if has_machine(self.machine.flake_dir, self.machine.name): if has_machine(self.machine.flake_dir, self.machine.name):
return return
priv_key, pub_key = generate_private_key() priv_key, pub_key = generate_private_key()
@@ -21,7 +28,7 @@ class SecretStore:
) )
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False) add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(self, _service: str, name: str, value: str): def set(self, _service: str, name: str, value: str) -> None:
encrypt_secret( encrypt_secret(
self.machine.flake_dir, self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}", sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
@@ -38,7 +45,7 @@ class SecretStore:
f"{self.machine.name}-{name}", f"{self.machine.name}-{name}",
) )
def upload(self, output_dir: Path, _secrets: list[str]) -> None: def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key" key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name): if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us # skip uploading the secret, not managed by us

View File

@@ -1,7 +1,6 @@
import argparse import argparse
import importlib
import logging import logging
import types
from importlib.machinery import SourceFileLoader
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -13,10 +12,7 @@ log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None: def upload_secrets(machine: Machine) -> None:
# load secrets module from file secrets_module = importlib.import_module(machine.secrets_module)
loader = SourceFileLoader("secret_module", machine.secrets_module)
secrets_module = types.ModuleType(loader.name)
loader.exec_module(secrets_module)
secret_store = secrets_module.SecretStore(machine=machine) secret_store = secrets_module.SecretStore(machine=machine)
update_check = getattr(secret_store, "update_check", None) update_check = getattr(secret_store, "update_check", None)
@@ -25,11 +21,7 @@ def upload_secrets(machine: Machine) -> None:
log.info("Secrets already up to date") log.info("Secrets already up to date")
return return
with TemporaryDirectory() as tempdir: with TemporaryDirectory() as tempdir:
secrets = [] secret_store.upload(Path(tempdir))
for service in machine.secrets_data:
for secret in machine.secrets_data[service]["secrets"]:
secrets.append((service, secret))
secret_store.upload(Path(tempdir), secrets)
host = machine.host host = machine.host
ssh_cmd = host.ssh_cmd() ssh_cmd = host.ssh_cmd()

View File

@@ -4,6 +4,7 @@ import logging
import os import os
import sys import sys
import tempfile import tempfile
import importlib
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import IO from typing import IO
@@ -13,6 +14,7 @@ from ..dirs import module_root, specific_groot_dir, vm_state_dir
from ..errors import ClanError from ..errors import ClanError
from ..nix import nix_build, nix_config, nix_shell from ..nix import nix_build, nix_config, nix_shell
from .inspect import VmConfig, inspect_vm from .inspect import VmConfig, inspect_vm
from ..machines.machines import Machine
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -132,11 +134,8 @@ def generate_secrets(
secrets_dir = tmpdir / "secrets" secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(exist_ok=True) secrets_dir.mkdir(exist_ok=True)
env = os.environ.copy() secrets_module = importlib.import_module(machine.secrets_module)
env["CLAN_DIR"] = str(vm.flake_url) secret_store = secrets_module.SecretStore(machine=machine)
env["PYTHONPATH"] = str(":".join(sys.path)) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
# Only generate secrets for local clans # Only generate secrets for local clans
if isinstance(vm.flake_url, Path) and vm.flake_url.is_dir(): if isinstance(vm.flake_url, Path) and vm.flake_url.is_dir():