Revert "vars: refactor - copy logic to sops secret module"
This reverts commit 83d850dac4.
This commit is contained in:
@@ -5,8 +5,8 @@ import os
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.secrets.key import generate_key
|
||||
from clan_cli.secrets.sops import maybe_get_admin_public_key
|
||||
from clan_cli.secrets.users import add_user
|
||||
from clan_cli.vars.secret_modules.sops import SecretStore as SopsSecretStore
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -17,7 +17,7 @@ def keygen(user: str | None, flake: FlakeId, force: bool) -> None:
|
||||
if not user:
|
||||
msg = "No user provided and $USER is not set. Please provide a user via --user."
|
||||
raise ClanError(msg)
|
||||
pub_key = SopsSecretStore.maybe_get_admin_public_key()
|
||||
pub_key = maybe_get_admin_public_key()
|
||||
if not pub_key:
|
||||
pub_key = generate_key()
|
||||
# TODO set flake_dir=flake.path / "vars"
|
||||
|
||||
@@ -1,32 +1,13 @@
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import IO
|
||||
|
||||
from clan_cli.dirs import user_config_dir
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_files
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.secrets.folders import sops_machines_folder, sops_secrets_folder
|
||||
from clan_cli.secrets.machines import add_secret
|
||||
from clan_cli.secrets.secrets import (
|
||||
allow_member,
|
||||
collect_keys_for_path,
|
||||
decrypt_secret,
|
||||
groups_folder,
|
||||
has_secret,
|
||||
machines_folder,
|
||||
users_folder,
|
||||
)
|
||||
from clan_cli.secrets.sops import (
|
||||
decrypt_file,
|
||||
encrypt_file,
|
||||
generate_private_key,
|
||||
read_key,
|
||||
)
|
||||
from clan_cli.secrets.machines import add_machine, add_secret, has_machine
|
||||
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
|
||||
from clan_cli.secrets.sops import generate_private_key
|
||||
|
||||
from . import SecretStoreBase
|
||||
|
||||
@@ -59,171 +40,21 @@ class SecretStore(SecretStoreBase):
|
||||
if not has_secrets:
|
||||
return
|
||||
|
||||
# exit early if the machine already exists
|
||||
if (self.sops_dir / "machines" / self.machine.name / "key.json").exists():
|
||||
if has_machine(self.machine.flake_dir, self.machine.name):
|
||||
return
|
||||
|
||||
priv_key, pub_key = generate_private_key()
|
||||
self.encrypt_secret(
|
||||
self.sops_dir / "secrets" / f"{self.machine.name}-age.key",
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
sops_secrets_folder(self.machine.flake_dir)
|
||||
/ f"{self.machine.name}-age.key",
|
||||
priv_key,
|
||||
)
|
||||
self.add_machine(self.machine.name, pub_key)
|
||||
|
||||
@property
|
||||
def sops_dir(self) -> Path:
|
||||
return self.machine.flake_dir / "sops"
|
||||
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
|
||||
|
||||
@property
|
||||
def store_name(self) -> str:
|
||||
return "sops"
|
||||
|
||||
def add_machine(self, machine: str, pubkey: str) -> None:
|
||||
machine_path = self.sops_dir / "machines" / machine
|
||||
self.write_key(machine_path, pubkey)
|
||||
paths = [machine_path]
|
||||
commit_files(
|
||||
paths,
|
||||
self.machine.flake_dir,
|
||||
f"Add machine {machine} to secrets",
|
||||
)
|
||||
|
||||
def write_key(self, machine_path: Path, publickey: str) -> None:
|
||||
machine_path.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_EXCL
|
||||
fd = os.open(machine_path / "key.json", flags)
|
||||
except FileExistsError as e:
|
||||
msg = f"{machine_path.name} already exists in {machine_path}"
|
||||
raise ClanError(msg) from e
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
|
||||
|
||||
@staticmethod
|
||||
def default_admin_key_path() -> Path:
|
||||
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
||||
if raw_path:
|
||||
return Path(raw_path)
|
||||
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||
|
||||
@staticmethod
|
||||
def get_public_key(privkey: str) -> str:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
|
||||
try:
|
||||
res = subprocess.run(
|
||||
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
msg = "Failed to get public key for age private key. Is the key malformed?"
|
||||
raise ClanError(msg) from e
|
||||
return res.stdout.strip()
|
||||
|
||||
@classmethod
|
||||
def maybe_get_admin_public_key(cls: type["SecretStore"]) -> str | None:
|
||||
key = os.environ.get("SOPS_AGE_KEY")
|
||||
if key:
|
||||
return cls.get_public_key(key)
|
||||
path = cls.default_admin_key_path()
|
||||
if path.exists():
|
||||
return cls.get_public_key(path.read_text())
|
||||
|
||||
return None
|
||||
|
||||
# TODO: get rid of `clan secrets generate` dependency
|
||||
def admin_key(self) -> SopsKey:
|
||||
pub_key = SecretStore.maybe_get_admin_public_key()
|
||||
if not pub_key:
|
||||
raise MissingKeyError
|
||||
return self.ensure_user_or_machine(pub_key)
|
||||
|
||||
# TODO: find alternative to `clan secrets users add`
|
||||
def ensure_user_or_machine(self, pub_key: str) -> SopsKey:
|
||||
key = self.maybe_get_user(pub_key)
|
||||
if not key:
|
||||
raise MissingKeyError
|
||||
return key
|
||||
|
||||
def maybe_get_user(self, pub_key: str) -> SopsKey | None:
|
||||
key = SopsKey(pub_key, username="")
|
||||
folders = [self.sops_dir / "users"]
|
||||
|
||||
for folder in folders:
|
||||
if folder.exists():
|
||||
for user in folder.iterdir():
|
||||
if not (user / "key.json").exists():
|
||||
continue
|
||||
if read_key(user) == pub_key:
|
||||
key.username = user.name
|
||||
return key
|
||||
|
||||
return None
|
||||
|
||||
def encrypt_secret(
|
||||
self,
|
||||
secret_path: Path,
|
||||
value: IO[str] | str | bytes | None,
|
||||
add_machines: list[str] | None = None,
|
||||
add_groups: list[str] | None = None,
|
||||
git_commit: bool = True,
|
||||
) -> None:
|
||||
if add_groups is None:
|
||||
add_groups = []
|
||||
if add_machines is None:
|
||||
add_machines = []
|
||||
key = self.admin_key()
|
||||
recipient_keys = set()
|
||||
|
||||
files_to_commit = []
|
||||
for machine in add_machines:
|
||||
files_to_commit.extend(
|
||||
allow_member(
|
||||
machines_folder(secret_path),
|
||||
self.sops_dir / "machines",
|
||||
machine,
|
||||
False,
|
||||
)
|
||||
)
|
||||
|
||||
for group in add_groups:
|
||||
files_to_commit.extend(
|
||||
allow_member(
|
||||
groups_folder(secret_path),
|
||||
self.sops_dir / "groups",
|
||||
group,
|
||||
False,
|
||||
)
|
||||
)
|
||||
|
||||
recipient_keys = collect_keys_for_path(secret_path)
|
||||
|
||||
if key.pubkey not in recipient_keys:
|
||||
recipient_keys.add(key.pubkey)
|
||||
files_to_commit.extend(
|
||||
allow_member(
|
||||
users_folder(secret_path),
|
||||
self.sops_dir / "users",
|
||||
key.username,
|
||||
False,
|
||||
)
|
||||
)
|
||||
|
||||
secret_path = secret_path / "secret"
|
||||
encrypt_file(secret_path, value, sorted(recipient_keys))
|
||||
files_to_commit.append(secret_path)
|
||||
if git_commit:
|
||||
commit_files(
|
||||
files_to_commit,
|
||||
self.machine.flake_dir,
|
||||
f"Update secret {secret_path.parent.name}",
|
||||
)
|
||||
|
||||
def decrypt_secret(self, secret_path: Path) -> str:
|
||||
path = secret_path / "secret"
|
||||
if not path.exists():
|
||||
msg = f"Secret '{secret_path!s}' does not exist"
|
||||
raise ClanError(msg)
|
||||
return decrypt_file(path)
|
||||
|
||||
def machine_has_access(
|
||||
self, generator_name: str, secret_name: str, shared: bool
|
||||
) -> bool:
|
||||
@@ -262,7 +93,8 @@ class SecretStore(SecretStoreBase):
|
||||
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
|
||||
else:
|
||||
# initialize the secret
|
||||
self.encrypt_secret(
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
secret_folder,
|
||||
value,
|
||||
add_machines=[self.machine.name] if deployed else [],
|
||||
|
||||
Reference in New Issue
Block a user