WIP: clan-cli secrets: add secret_store as python class
This commit is contained in:
@@ -27,29 +27,55 @@ in
|
||||
clanCore.secretsPrefix = config.clanCore.machineName + "-";
|
||||
system.clan = lib.mkIf (config.clanCore.secrets != { }) {
|
||||
|
||||
generateSecrets = pkgs.writeScript "generate-secrets" ''
|
||||
#!${pkgs.python3}/bin/python
|
||||
import json
|
||||
import sys
|
||||
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
|
||||
args = json.loads(${builtins.toJSON (builtins.toJSON {
|
||||
machine_name = config.clanCore.machineName;
|
||||
secret_submodules = lib.mapAttrs (_name: secret: {
|
||||
secrets = builtins.attrNames secret.secrets;
|
||||
facts = lib.mapAttrs (_: secret: secret.path) secret.facts;
|
||||
generator = secret.generator.finalScript;
|
||||
}) config.clanCore.secrets;
|
||||
})})
|
||||
generate_secrets_from_nix(**args)
|
||||
'';
|
||||
uploadSecrets = pkgs.writeScript "upload-secrets" ''
|
||||
#!${pkgs.python3}/bin/python
|
||||
import json
|
||||
import sys
|
||||
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
|
||||
# the second toJSON is needed to escape the string for the python
|
||||
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
|
||||
upload_age_key_from_nix(**args)
|
||||
secretsModule = pkgs.writeText "sops.py" ''
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.secrets.folders import sops_secrets_folder
|
||||
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
|
||||
from clan_cli.secrets.sops import generate_private_key
|
||||
from clan_cli.secrets.machines import has_machine, add_machine
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
|
||||
class SecretStore:
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
if has_machine(self.machine.flake_dir, self.machine.name):
|
||||
return
|
||||
priv_key, pub_key = generate_private_key()
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-age.key",
|
||||
priv_key,
|
||||
)
|
||||
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
|
||||
|
||||
def set(self, service: str, name: str, value: str):
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
|
||||
value,
|
||||
add_machines=[self.machine.name],
|
||||
)
|
||||
|
||||
def get(self, service: str, name: str) -> bytes:
|
||||
# TODO: add support for getting a secret
|
||||
pass
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
return has_secret(
|
||||
self.machine.flake_dir,
|
||||
f"{self.machine.name}-{name}",
|
||||
)
|
||||
|
||||
def upload(self, output_dir: Path, secrets: list[str, str]) -> None:
|
||||
key_name = f"{self.machine.name}-age.key"
|
||||
if not has_secret(self.machine.flake_dir, key_name):
|
||||
# skip uploading the secret, not managed by us
|
||||
return
|
||||
key = decrypt_secret(self.machine.flake_dir, key_name)
|
||||
|
||||
(output_dir / "key.txt").write_text(key)
|
||||
'';
|
||||
};
|
||||
sops.secrets = builtins.mapAttrs
|
||||
|
||||
Reference in New Issue
Block a user