cli/update: refactor machine selection logic into 'get_machines_for_update'

This commit is contained in:
Johannes Kirschbauer
2025-07-06 12:27:28 +02:00
parent a6b8ca06ab
commit 3baa43fd87
2 changed files with 290 additions and 108 deletions

View File

@@ -4,7 +4,9 @@ import sys
from clan_lib.async_run import AsyncContext, AsyncOpts, AsyncRuntime
from clan_lib.errors import ClanError
from clan_lib.machines.list import list_full_machines, query_machines_by_tags
from clan_lib.flake.flake import Flake
from clan_lib.machines.actions import list_machines
from clan_lib.machines.list import instantiate_inventory_to_machines
from clan_lib.machines.machines import Machine
from clan_lib.machines.suggestions import validate_machine_names
from clan_lib.machines.update import deploy_machine
@@ -16,128 +18,141 @@ from clan_cli.completions import (
complete_machines,
complete_tags,
)
from clan_cli.host_key_check import add_host_key_check_arg
log = logging.getLogger(__name__)
def requires_explicit_update(m: Machine) -> bool:
try:
if m.select("config.clan.deployment.requireExplicitUpdate"):
return False
except Exception:
pass
try:
# check if the machine has a target host set
m.target_host # noqa: B018
except ClanError:
return False
return True
def get_machines_for_update(
flake: Flake,
explicit_names: list[str],
filter_tags: list[str],
) -> list[Machine]:
all_machines = list_machines(flake)
machines_with_tags = list_machines(flake, {"filter": {"tags": filter_tags}})
if filter_tags and not machines_with_tags:
msg = f"No machines found with tags: {' AND '.join(filter_tags)}"
raise ClanError(msg)
# Implicit update all machines / with tags
# Using tags is not an explizit update
if not explicit_names:
machines_to_update = list(
filter(
requires_explicit_update,
instantiate_inventory_to_machines(flake, machines_with_tags).values(),
)
)
# all machines that are in the clan but not included in the update list
machine_names_to_update = [m.name for m in machines_to_update]
ignored_machines = {
m_name for m_name in all_machines if m_name not in machine_names_to_update
}
if not machines_to_update and ignored_machines:
print(
"WARNING: No machines to update.\n"
"The following defined machines were ignored because they\n"
"- Require explicit update (see 'requireExplicitUpdate')\n",
file=sys.stderr,
)
for m in ignored_machines:
print(m, file=sys.stderr)
return machines_to_update
# Else: Explicit update
machines_to_update = []
valid_names = validate_machine_names(explicit_names, flake)
for name in valid_names:
inventory_machine = machines_with_tags.get(name)
if not inventory_machine:
msg = "This is an internal bug"
raise ClanError(msg)
machines_to_update.append(
Machine.from_inventory(name, flake, inventory_machine)
)
return machines_to_update
def update_command(args: argparse.Namespace) -> None:
try:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
all_machines: list[Machine] = []
if args.tags:
tag_filtered_machines = query_machines_by_tags(args.flake, args.tags)
if args.machines:
selected_machines = [
name for name in args.machines if name in tag_filtered_machines
]
else:
selected_machines = list(tag_filtered_machines.keys())
else:
selected_machines = (
args.machines
if args.machines
else list(list_full_machines(args.flake).keys())
)
machines_to_update = get_machines_for_update(
args.flake, args.machines, args.tags
)
if args.tags and not selected_machines:
msg = f"No machines found with tags: {', '.join(args.tags)}"
raise ClanError(msg)
if args.machines:
validate_machine_names(args.machines, args.flake)
for machine_name in selected_machines:
machine = Machine(name=machine_name, flake=args.flake)
all_machines.append(machine)
if args.target_host is not None and len(all_machines) > 1:
if args.target_host is not None and len(machines_to_update) > 1:
msg = "Target Host can only be set for one machines"
raise ClanError(msg)
def filter_machine(m: Machine) -> bool:
try:
if m.select("config.clan.deployment.requireExplicitUpdate"):
return False
except Exception:
pass
# Prepopulate the cache
config = nix_config()
system = config["system"]
machine_names = [machine.name for machine in machines_to_update]
args.flake.precache(
[
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.validationHash",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.deployment.requireExplicitUpdate",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.system.clan.deployment.nixosMobileWorkaround",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.services",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.{{share,dependencies,migrateFact,prompts}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.files.*.{{secret,deploy,owner,group,mode,neededFor}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretUploadDirectory",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.vars.password-store.secretLocation",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.passBackend",
]
)
try:
# check if the machine has a target host set
m.target_host # noqa: B018
except ClanError:
return False
return True
machines_to_update = all_machines
implicit_all: bool = len(args.machines) == 0 and not args.tags
if implicit_all:
machines_to_update = list(filter(filter_machine, all_machines))
# machines that are in the list but not included in the update list
ignored_machines = {m.name for m in all_machines if m not in machines_to_update}
if not machines_to_update and ignored_machines:
print(
"WARNING: No machines to update.\n"
"The following defined machines were ignored because they\n"
"- Require explicit update (see 'requireExplicitUpdate')\n",
"- Might not have the `clan.core.networking.targetHost` nixos option set:\n",
file=sys.stderr,
)
for m in ignored_machines:
print(m, file=sys.stderr)
if machines_to_update:
# Prepopulate the cache
config = nix_config()
system = config["system"]
machine_names = [machine.name for machine in machines_to_update]
args.flake.precache(
[
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.validationHash",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.deployment.requireExplicitUpdate",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.system.clan.deployment.nixosMobileWorkaround",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.services",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.{{share,dependencies,migrateFact,prompts}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.files.*.{{secret,deploy,owner,group,mode,neededFor}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretUploadDirectory",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.passBackend",
]
)
host_key_check = args.host_key_check
with AsyncRuntime() as runtime:
for machine in machines_to_update:
if args.target_host:
target_host = Remote.from_ssh_uri(
machine_name=machine.name,
address=args.target_host,
).override(host_key_check=host_key_check)
else:
target_host = machine.target_host().override(
host_key_check=host_key_check
)
runtime.async_run(
AsyncOpts(
tid=machine.name,
async_ctx=AsyncContext(prefix=machine.name),
),
deploy_machine,
machine=machine,
target_host=target_host,
build_host=machine.build_host(),
host_key_check = args.host_key_check
with AsyncRuntime() as runtime:
for machine in machines_to_update:
if args.target_host:
target_host = Remote.from_ssh_uri(
machine_name=machine.name,
address=args.target_host,
).override(host_key_check=host_key_check)
else:
target_host = machine.target_host().override(
host_key_check=host_key_check
)
runtime.join_all()
runtime.check_all()
runtime.async_run(
AsyncOpts(
tid=machine.name,
async_ctx=AsyncContext(prefix=machine.name),
),
deploy_machine,
machine=machine,
target_host=target_host,
build_host=machine.build_host(),
)
runtime.join_all()
runtime.check_all()
except KeyboardInterrupt:
log.warning("Interrupted by user")
@@ -163,7 +178,12 @@ def register_update_parser(parser: argparse.ArgumentParser) -> None:
)
add_dynamic_completer(tag_parser, complete_tags)
add_host_key_check_arg(parser)
parser.add_argument(
"--host-key-check",
choices=["strict", "ask", "tofu", "none"],
default="ask",
help="Host key (.ssh/known_hosts) check mode.",
)
parser.add_argument(
"--target-host",
type=str,

View File

@@ -0,0 +1,162 @@
import pytest
from clan_lib.flake import Flake
from clan_cli.machines.update import get_machines_for_update
# Functions to test
from clan_cli.tests.fixtures_flakes import FlakeForTest
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
["jon"], # explizit names
[], # filter tags
["jon"], # expected
)
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_single_name(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
[], # explizit names
["foo"], # filter tags
["jon", "sara"], # expected
)
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_tags(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
["sara"], # explizit names
["foo"], # filter tags
["sara"], # expected
)
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_tags_and_name(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
[], # no explizit names
[], # no filter tags
["jon", "sara", "vm1", "vm2"], # all machines
),
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_implicit_all(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
# TODO: Add more tests for requireExplicitUpdate