refactor(vars): move migration logic to extra file

This commit is contained in:
DavHau
2025-05-03 14:10:57 +07:00
parent d0e3c3a76c
commit 6e9382b942
2 changed files with 137 additions and 124 deletions

View File

@@ -19,6 +19,7 @@ from clan_cli.git import commit_files
from clan_cli.machines.inventory import get_all_machines, get_selected_machines from clan_cli.machines.inventory import get_all_machines, get_selected_machines
from clan_cli.nix import nix_config, nix_shell, nix_test_store from clan_cli.nix import nix_config, nix_shell, nix_test_store
from clan_cli.vars._types import StoreBase from clan_cli.vars._types import StoreBase
from clan_cli.vars.migration import _check_can_migrate, _migrate_files
from .check import check_vars from .check import check_vars
from .graph import ( from .graph import (
@@ -308,130 +309,6 @@ def get_closure(
return minimal_closure([generator_name], generators) return minimal_closure([generator_name], generators)
def _migration_file_exists(
machine: "Machine",
generator: Generator,
fact_name: str,
) -> bool:
for file in generator.files:
if file.name == fact_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
is_secret = file.secret
if is_secret:
if machine.secret_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the secret fact store"
)
if not is_secret:
if machine.public_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the public fact store"
)
return False
def _migrate_file(
machine: "Machine",
generator: Generator,
var_name: str,
service_name: str,
fact_name: str,
) -> list[Path]:
for file in generator.files:
if file.name == var_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
paths = []
if file.secret:
old_value = machine.secret_facts_store.get(service_name, fact_name)
maybe_path = machine.secret_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
else:
old_value = machine.public_facts_store.get(service_name, fact_name)
maybe_path = machine.public_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
return paths
def _migrate_files(
machine: "Machine",
generator: Generator,
) -> None:
not_found = []
files_to_commit = []
for file in generator.files:
if _migration_file_exists(machine, generator, file.name):
assert generator.migrate_fact is not None
files_to_commit += _migrate_file(
machine, generator, file.name, generator.migrate_fact, file.name
)
else:
not_found.append(file.name)
if len(not_found) > 0:
msg = f"Could not migrate the following files for generator {generator.name}, as no fact or secret exists with the same name: {not_found}"
raise ClanError(msg)
commit_files(
files_to_commit,
machine.flake_dir,
f"migrated facts to vars for generator {generator.name} for machine {machine.name}",
)
def _check_can_migrate(
machine: "Machine",
generator: Generator,
) -> bool:
service_name = generator.migrate_fact
if not service_name:
return False
# ensure that none of the generated vars already exist in the store
all_files_missing = True
all_files_present = True
for file in generator.files:
if file.secret:
if machine.secret_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
else:
if machine.public_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
if not all_files_present and not all_files_missing:
msg = f"Cannot migrate facts for generator {generator.name} as some files already exist in the store"
raise ClanError(msg)
if all_files_present:
# all files already migrated, no need to run migration again
return False
# ensure that all files can be migrated (exists in the corresponding fact store)
return bool(
all(
_migration_file_exists(machine, generator, file.name)
for file in generator.files
)
)
def generate_vars_for_machine( def generate_vars_for_machine(
machine: "Machine", machine: "Machine",
generator_name: str | None, generator_name: str | None,

View File

@@ -0,0 +1,136 @@
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from clan_cli.errors import ClanError
from clan_cli.git import commit_files
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from clan_cli.machines.machines import Machine
from clan_cli.vars.generate import Generator
def _migration_file_exists(
machine: "Machine",
generator: "Generator",
fact_name: str,
) -> bool:
for file in generator.files:
if file.name == fact_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
is_secret = file.secret
if is_secret:
if machine.secret_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the secret fact store"
)
if not is_secret:
if machine.public_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the public fact store"
)
return False
def _migrate_file(
machine: "Machine",
generator: "Generator",
var_name: str,
service_name: str,
fact_name: str,
) -> list[Path]:
for file in generator.files:
if file.name == var_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
paths = []
if file.secret:
old_value = machine.secret_facts_store.get(service_name, fact_name)
maybe_path = machine.secret_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
else:
old_value = machine.public_facts_store.get(service_name, fact_name)
maybe_path = machine.public_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
return paths
def _migrate_files(
machine: "Machine",
generator: "Generator",
) -> None:
not_found = []
files_to_commit = []
for file in generator.files:
if _migration_file_exists(machine, generator, file.name):
assert generator.migrate_fact is not None
files_to_commit += _migrate_file(
machine, generator, file.name, generator.migrate_fact, file.name
)
else:
not_found.append(file.name)
if len(not_found) > 0:
msg = f"Could not migrate the following files for generator {generator.name}, as no fact or secret exists with the same name: {not_found}"
raise ClanError(msg)
commit_files(
files_to_commit,
machine.flake_dir,
f"migrated facts to vars for generator {generator.name} for machine {machine.name}",
)
def _check_can_migrate(
machine: "Machine",
generator: "Generator",
) -> bool:
service_name = generator.migrate_fact
if not service_name:
return False
# ensure that none of the generated vars already exist in the store
all_files_missing = True
all_files_present = True
for file in generator.files:
if file.secret:
if machine.secret_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
else:
if machine.public_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
if not all_files_present and not all_files_missing:
msg = f"Cannot migrate facts for generator {generator.name} as some files already exist in the store"
raise ClanError(msg)
if all_files_present:
# all files already migrated, no need to run migration again
return False
# ensure that all files can be migrated (exists in the corresponding fact store)
return bool(
all(
_migration_file_exists(machine, generator, file.name)
for file in generator.files
)
)