132 lines
4.5 KiB
Python
132 lines
4.5 KiB
Python
import argparse
|
|
import logging
|
|
|
|
from clan_cli.completions import add_dynamic_completer, complete_machines
|
|
from clan_cli.errors import ClanError
|
|
from clan_cli.machines.machines import Machine
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from .generate import Var
|
|
|
|
|
|
class VarStatus:
|
|
def __init__(
|
|
self,
|
|
missing_secret_vars: list["Var"],
|
|
missing_public_vars: list["Var"],
|
|
unfixed_secret_vars: list["Var"],
|
|
invalid_generators: list[str],
|
|
) -> None:
|
|
self.missing_secret_vars = missing_secret_vars
|
|
self.missing_public_vars = missing_public_vars
|
|
self.unfixed_secret_vars = unfixed_secret_vars
|
|
self.invalid_generators = invalid_generators
|
|
|
|
|
|
def vars_status(machine: Machine, generator_name: None | str = None) -> VarStatus:
|
|
missing_secret_vars = []
|
|
missing_public_vars = []
|
|
# signals if a var needs to be updated (eg. needs re-encryption due to new users added)
|
|
unfixed_secret_vars = []
|
|
invalid_generators = []
|
|
generators = machine.vars_generators()
|
|
if generator_name:
|
|
for generator in generators:
|
|
if generator_name == generator.name:
|
|
generators = [generator]
|
|
break
|
|
else:
|
|
err_msg = (
|
|
f"Generator '{generator_name}' not found in machine {machine.name}"
|
|
)
|
|
raise ClanError(err_msg)
|
|
|
|
for generator in generators:
|
|
generator.machine(machine)
|
|
for file in generator.files:
|
|
file.store(
|
|
machine.secret_vars_store if file.secret else machine.public_vars_store
|
|
)
|
|
file.generator(generator)
|
|
|
|
if file.secret:
|
|
if not machine.secret_vars_store.exists(generator, file.name):
|
|
machine.info(
|
|
f"Secret var '{file.name}' for service '{generator.name}' in machine {machine.name} is missing."
|
|
)
|
|
missing_secret_vars.append(file)
|
|
else:
|
|
msg = machine.secret_vars_store.health_check(
|
|
generator=generator,
|
|
file_name=file.name,
|
|
)
|
|
if msg:
|
|
machine.info(
|
|
f"Secret var '{file.name}' for service '{generator.name}' in machine {machine.name} needs update: {msg}"
|
|
)
|
|
unfixed_secret_vars.append(file)
|
|
|
|
elif not machine.public_vars_store.exists(generator, file.name):
|
|
machine.info(
|
|
f"Public var '{file.name}' for service '{generator.name}' in machine {machine.name} is missing."
|
|
)
|
|
missing_public_vars.append(file)
|
|
# check if invalidation hash is up to date
|
|
if not (
|
|
machine.secret_vars_store.hash_is_valid(generator)
|
|
and machine.public_vars_store.hash_is_valid(generator)
|
|
):
|
|
invalid_generators.append(generator.name)
|
|
machine.info(
|
|
f"Generator '{generator.name}' in machine {machine.name} has outdated invalidation hash."
|
|
)
|
|
machine.debug(f"missing_secret_vars: {missing_secret_vars}")
|
|
machine.debug(f"missing_public_vars: {missing_public_vars}")
|
|
machine.debug(f"unfixed_secret_vars: {unfixed_secret_vars}")
|
|
machine.debug(f"invalid_generators: {invalid_generators}")
|
|
return VarStatus(
|
|
missing_secret_vars,
|
|
missing_public_vars,
|
|
unfixed_secret_vars,
|
|
invalid_generators,
|
|
)
|
|
|
|
|
|
def check_vars(machine: Machine, generator_name: None | str = None) -> bool:
|
|
status = vars_status(machine, generator_name=generator_name)
|
|
return not (
|
|
status.missing_secret_vars
|
|
or status.missing_public_vars
|
|
or status.unfixed_secret_vars
|
|
or status.invalid_generators
|
|
)
|
|
|
|
|
|
def check_command(args: argparse.Namespace) -> None:
|
|
machine = Machine(
|
|
name=args.machine,
|
|
flake=args.flake,
|
|
)
|
|
ok = check_vars(machine, generator_name=args.generator)
|
|
if not ok:
|
|
raise SystemExit(1)
|
|
|
|
|
|
def register_check_parser(parser: argparse.ArgumentParser) -> None:
|
|
machines_parser = parser.add_argument(
|
|
"machine",
|
|
help="The machine to check secrets for",
|
|
)
|
|
add_dynamic_completer(machines_parser, complete_machines)
|
|
|
|
parser.add_argument(
|
|
"--generator",
|
|
"-g",
|
|
help="the generator to check",
|
|
)
|
|
parser.set_defaults(func=check_command)
|