Merge pull request 'vars/keygen: fix + cleanup tests' (#2201) from DavHau/clan-core:DavHau-vars-migration into main

This commit is contained in:
clan-bot
2024-10-04 15:32:31 +00:00
7 changed files with 31 additions and 57 deletions

View File

@@ -6,6 +6,7 @@ from clan_cli.clan_uri import FlakeId
from clan_cli.errors import ClanError
from clan_cli.secrets.key import generate_key
from clan_cli.secrets.users import add_user
from clan_cli.vars.secret_modules.sops import SecretStore as SopsSecretStore
log = logging.getLogger(__name__)
@@ -16,9 +17,12 @@ def keygen(user: str | None, flake: FlakeId, force: bool) -> None:
if not user:
msg = "No user provided and $USER is not set. Please provide a user via --user."
raise ClanError(msg)
pub_key = generate_key()
pub_key = SopsSecretStore.maybe_get_admin_public_key()
if not pub_key:
pub_key = generate_key()
# TODO set flake_dir=flake.path / "vars"
add_user(
flake_dir=flake.path / "vars",
flake_dir=flake.path,
name=user,
key=pub_key,
force=force,

View File

@@ -99,13 +99,15 @@ class SecretStore(SecretStoreBase):
with os.fdopen(fd, "w") as f:
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
def default_admin_key_path(self) -> Path:
@staticmethod
def default_admin_key_path() -> Path:
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
if raw_path:
return Path(raw_path)
return user_config_dir() / "sops" / "age" / "keys.txt"
def get_public_key(self, privkey: str) -> str:
@staticmethod
def get_public_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try:
res = subprocess.run(
@@ -116,19 +118,20 @@ class SecretStore(SecretStoreBase):
raise ClanError(msg) from e
return res.stdout.strip()
def maybe_get_admin_public_key(self) -> str | None:
@classmethod
def maybe_get_admin_public_key(cls: type["SecretStore"]) -> str | None:
key = os.environ.get("SOPS_AGE_KEY")
if key:
return self.get_public_key(key)
path = self.default_admin_key_path()
return cls.get_public_key(key)
path = cls.default_admin_key_path()
if path.exists():
return self.get_public_key(path.read_text())
return cls.get_public_key(path.read_text())
return None
# TODO: get rid of `clan secrets generate` dependency
def admin_key(self) -> SopsKey:
pub_key = self.maybe_get_admin_public_key()
pub_key = SecretStore.maybe_get_admin_public_key()
if not pub_key:
raise MissingKeyError
return self.ensure_user_or_machine(pub_key)

View File

@@ -21,13 +21,12 @@ class SopsSetup:
self.user = os.environ.get("USER", "user")
cli.run(
[
"secrets",
"users",
"add",
"vars",
"keygen",
"--flake",
str(flake_path),
"--user",
self.user,
self.keys[0].pubkey,
]
)

View File

@@ -11,7 +11,6 @@ from typing import NamedTuple
import pytest
from clan_cli.dirs import nixpkgs_source
from fixture_error import FixtureError
from helpers import cli
from root import CLAN_CORE
log = logging.getLogger(__name__)
@@ -51,9 +50,6 @@ class FlakeForTest(NamedTuple):
path: Path
from age_keys import KEYS, KeyPair
def set_machine_settings(
flake: Path,
machine_name: str,
@@ -67,8 +63,6 @@ def set_machine_settings(
def generate_flake(
temporary_home: Path,
flake_template: Path,
monkeypatch: pytest.MonkeyPatch,
sops_key: KeyPair = KEYS[0],
substitutions: dict[str, str] | None = None,
# define the machines directly including their config
machine_configs: dict[str, dict] | None = None,
@@ -146,20 +140,6 @@ def generate_flake(
sp.run(["git", "config", "user.name", "clan-tool"], cwd=flake, check=True)
sp.run(["git", "config", "user.email", "clan@example.com"], cwd=flake, check=True)
sp.run(["git", "commit", "-a", "-m", "Initial commit"], cwd=flake, check=True)
monkeypatch.setenv("SOPS_AGE_KEY", sops_key.privkey)
cli.run(
[
"secrets",
"users",
"add",
"user1",
sops_key.pubkey,
"--flake",
str(flake),
"--debug",
]
)
return FlakeForTest(flake)

View File

@@ -90,7 +90,6 @@ def test_generate_public_var(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
machine = Machine(name="my_machine", flake=FlakeId(str(flake.path)))
@@ -130,7 +129,6 @@ def test_generate_secret_var_sops(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
@@ -172,7 +170,6 @@ def test_generate_secret_var_sops_with_default_group(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
@@ -210,7 +207,6 @@ def test_generated_shared_secret_sops(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"machine1": m1_config, "machine2": m2_config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
@@ -253,7 +249,6 @@ def test_generate_secret_var_password_store(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
gnupghome = temporary_home / "gpg"
@@ -324,7 +319,6 @@ def test_generate_secret_for_multiple_machines(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"machine1": machine1_config, "machine2": machine2_config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
@@ -370,7 +364,6 @@ def test_dependant_generators(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
@@ -410,7 +403,6 @@ def test_prompt(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
monkeypatch.setattr("sys.stdin", StringIO(input_value))
@@ -449,7 +441,6 @@ def test_share_flag(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
@@ -500,7 +491,6 @@ def test_prompt_create_file(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
@@ -529,7 +519,6 @@ def test_api_get_prompts(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
monkeypatch.setattr("sys.stdin", StringIO("input1"))
@@ -558,7 +547,6 @@ def test_api_set_prompts(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
machine = Machine(name="my_machine", flake=FlakeId(str(flake.path)))
@@ -590,6 +578,7 @@ def test_api_set_prompts(
def test_commit_message(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
@@ -604,9 +593,9 @@ def test_commit_message(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(
[
"vars",
@@ -662,7 +651,6 @@ def test_default_value(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
# ensure evaluating the default value works without generating the value
@@ -692,6 +680,7 @@ def test_stdout_of_generate(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
capture_output: CaptureOutput,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
@@ -706,9 +695,9 @@ def test_stdout_of_generate(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
from clan_cli.vars.generate import generate_vars_for_machine
with capture_output as output:
@@ -769,6 +758,7 @@ def test_stdout_of_generate(
def test_migration_skip(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
my_service = config["clan"]["core"]["facts"]["services"]["my_service"]
@@ -783,9 +773,9 @@ def test_migration_skip(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["facts", "generate", "--flake", str(flake.path), "my_machine"])
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
in_repo_store = in_repo.FactStore(
@@ -799,6 +789,7 @@ def test_migration_skip(
def test_migration(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
my_service = config["clan"]["core"]["facts"]["services"]["my_service"]
@@ -812,9 +803,9 @@ def test_migration(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["facts", "generate", "--flake", str(flake.path), "my_machine"])
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
in_repo_store = in_repo.FactStore(
@@ -845,10 +836,9 @@ def test_fails_when_files_are_left_from_other_backend(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
sops_setup.init()
monkeypatch.chdir(flake.path)
sops_setup.init()
for generator in ["my_secret_generator", "my_value_generator"]:
generate_vars_for_machine(
Machine(name="my_machine", flake=FlakeId(str(flake.path))),
@@ -876,13 +866,13 @@ def test_keygen(
monkeypatch.chdir(temporary_home)
cli.run(["vars", "keygen", "--flake", str(temporary_home), "--user", "user"])
# check public key exists
assert (temporary_home / "vars" / "sops" / "users" / "user").is_dir()
assert (temporary_home / "sops" / "users" / "user").is_dir()
# check private key exists
assert (temporary_home / ".config" / "sops" / "age" / "keys.txt").is_file()
# it should still work, even if the keys already exist
import shutil
shutil.rmtree(temporary_home / "vars" / "sops" / "users" / "user")
shutil.rmtree(temporary_home / "sops" / "users" / "user")
cli.run(["vars", "keygen", "--flake", str(temporary_home), "--user", "user"])
# check public key exists
assert (temporary_home / "vars" / "sops" / "users" / "user").is_dir()
assert (temporary_home / "sops" / "users" / "user").is_dir()

View File

@@ -60,7 +60,6 @@ def test_vm_deployment(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"m1_machine": machine1_config, "m2_machine": machine2_config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()

View File

@@ -82,7 +82,6 @@ def test_vm_persistence(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=config,
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)