Merge pull request 'lib/get_host: improve abstraction, turn missconfiguration into a warning' (#4201) from cli-fixup into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4201
This commit is contained in:
hsjobeki
2025-07-06 10:38:03 +00:00
6 changed files with 304 additions and 135 deletions

View File

@@ -34,7 +34,7 @@ Examples:
$ clan machines update [MACHINES]
Will update the specified machines [MACHINES], if [MACHINES] is omitted, the command
will attempt to update every configured machine.
To exclude machines being updated `clan.deployment.requireExplicitUpdate = true;`
To exclude machines being updated `clan.core.deployment.requireExplicitUpdate = true;`
can be set in the machine config.
$ clan machines update --tags [TAGS..]

View File

@@ -4,7 +4,9 @@ import sys
from clan_lib.async_run import AsyncContext, AsyncOpts, AsyncRuntime
from clan_lib.errors import ClanError
from clan_lib.machines.list import list_full_machines, query_machines_by_tags
from clan_lib.flake.flake import Flake
from clan_lib.machines.actions import list_machines
from clan_lib.machines.list import instantiate_inventory_to_machines
from clan_lib.machines.machines import Machine
from clan_lib.machines.suggestions import validate_machine_names
from clan_lib.machines.update import deploy_machine
@@ -16,128 +18,141 @@ from clan_cli.completions import (
complete_machines,
complete_tags,
)
from clan_cli.host_key_check import add_host_key_check_arg
log = logging.getLogger(__name__)
def requires_explicit_update(m: Machine) -> bool:
try:
if m.select("config.clan.deployment.requireExplicitUpdate"):
return False
except Exception:
pass
try:
# check if the machine has a target host set
m.target_host # noqa: B018
except ClanError:
return False
return True
def get_machines_for_update(
flake: Flake,
explicit_names: list[str],
filter_tags: list[str],
) -> list[Machine]:
all_machines = list_machines(flake)
machines_with_tags = list_machines(flake, {"filter": {"tags": filter_tags}})
if filter_tags and not machines_with_tags:
msg = f"No machines found with tags: {' AND '.join(filter_tags)}"
raise ClanError(msg)
# Implicit update all machines / with tags
# Using tags is not an explizit update
if not explicit_names:
machines_to_update = list(
filter(
requires_explicit_update,
instantiate_inventory_to_machines(flake, machines_with_tags).values(),
)
)
# all machines that are in the clan but not included in the update list
machine_names_to_update = [m.name for m in machines_to_update]
ignored_machines = {
m_name for m_name in all_machines if m_name not in machine_names_to_update
}
if not machines_to_update and ignored_machines:
print(
"WARNING: No machines to update.\n"
"The following defined machines were ignored because they\n"
"- Require explicit update (see 'requireExplicitUpdate')\n",
file=sys.stderr,
)
for m in ignored_machines:
print(m, file=sys.stderr)
return machines_to_update
# Else: Explicit update
machines_to_update = []
valid_names = validate_machine_names(explicit_names, flake)
for name in valid_names:
inventory_machine = machines_with_tags.get(name)
if not inventory_machine:
msg = "This is an internal bug"
raise ClanError(msg)
machines_to_update.append(
Machine.from_inventory(name, flake, inventory_machine)
)
return machines_to_update
def update_command(args: argparse.Namespace) -> None:
try:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
all_machines: list[Machine] = []
if args.tags:
tag_filtered_machines = query_machines_by_tags(args.flake, args.tags)
if args.machines:
selected_machines = [
name for name in args.machines if name in tag_filtered_machines
]
else:
selected_machines = list(tag_filtered_machines.keys())
else:
selected_machines = (
args.machines
if args.machines
else list(list_full_machines(args.flake).keys())
)
machines_to_update = get_machines_for_update(
args.flake, args.machines, args.tags
)
if args.tags and not selected_machines:
msg = f"No machines found with tags: {', '.join(args.tags)}"
raise ClanError(msg)
if args.machines:
validate_machine_names(args.machines, args.flake)
for machine_name in selected_machines:
machine = Machine(name=machine_name, flake=args.flake)
all_machines.append(machine)
if args.target_host is not None and len(all_machines) > 1:
if args.target_host is not None and len(machines_to_update) > 1:
msg = "Target Host can only be set for one machines"
raise ClanError(msg)
def filter_machine(m: Machine) -> bool:
try:
if m.select("config.clan.deployment.requireExplicitUpdate"):
return False
except Exception:
pass
# Prepopulate the cache
config = nix_config()
system = config["system"]
machine_names = [machine.name for machine in machines_to_update]
args.flake.precache(
[
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.validationHash",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.deployment.requireExplicitUpdate",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.system.clan.deployment.nixosMobileWorkaround",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.services",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.{{share,dependencies,migrateFact,prompts}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.files.*.{{secret,deploy,owner,group,mode,neededFor}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretUploadDirectory",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.vars.password-store.secretLocation",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.passBackend",
]
)
try:
# check if the machine has a target host set
m.target_host # noqa: B018
except ClanError:
return False
return True
machines_to_update = all_machines
implicit_all: bool = len(args.machines) == 0 and not args.tags
if implicit_all:
machines_to_update = list(filter(filter_machine, all_machines))
# machines that are in the list but not included in the update list
ignored_machines = {m.name for m in all_machines if m not in machines_to_update}
if not machines_to_update and ignored_machines:
print(
"WARNING: No machines to update.\n"
"The following defined machines were ignored because they\n"
"- Require explicit update (see 'requireExplicitUpdate')\n",
"- Might not have the `clan.core.networking.targetHost` nixos option set:\n",
file=sys.stderr,
)
for m in ignored_machines:
print(m, file=sys.stderr)
if machines_to_update:
# Prepopulate the cache
config = nix_config()
system = config["system"]
machine_names = [machine.name for machine in machines_to_update]
args.flake.precache(
[
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.validationHash",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.deployment.requireExplicitUpdate",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.system.clan.deployment.nixosMobileWorkaround",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.secretModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.publicModule",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.services",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.{{share,dependencies,migrateFact,prompts}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.files.*.{{secret,deploy,owner,group,mode,neededFor}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretUploadDirectory",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.passBackend",
]
)
host_key_check = args.host_key_check
with AsyncRuntime() as runtime:
for machine in machines_to_update:
if args.target_host:
target_host = Remote.from_ssh_uri(
machine_name=machine.name,
address=args.target_host,
).override(host_key_check=host_key_check)
else:
target_host = machine.target_host().override(
host_key_check=host_key_check
)
runtime.async_run(
AsyncOpts(
tid=machine.name,
async_ctx=AsyncContext(prefix=machine.name),
),
deploy_machine,
machine=machine,
target_host=target_host,
build_host=machine.build_host(),
host_key_check = args.host_key_check
with AsyncRuntime() as runtime:
for machine in machines_to_update:
if args.target_host:
target_host = Remote.from_ssh_uri(
machine_name=machine.name,
address=args.target_host,
).override(host_key_check=host_key_check)
else:
target_host = machine.target_host().override(
host_key_check=host_key_check
)
runtime.join_all()
runtime.check_all()
runtime.async_run(
AsyncOpts(
tid=machine.name,
async_ctx=AsyncContext(prefix=machine.name),
),
deploy_machine,
machine=machine,
target_host=target_host,
build_host=machine.build_host(),
)
runtime.join_all()
runtime.check_all()
except KeyboardInterrupt:
log.warning("Interrupted by user")
@@ -163,7 +178,12 @@ def register_update_parser(parser: argparse.ArgumentParser) -> None:
)
add_dynamic_completer(tag_parser, complete_tags)
add_host_key_check_arg(parser)
parser.add_argument(
"--host-key-check",
choices=["strict", "ask", "tofu", "none"],
default="ask",
help="Host key (.ssh/known_hosts) check mode.",
)
parser.add_argument(
"--target-host",
type=str,

View File

@@ -0,0 +1,162 @@
import pytest
from clan_lib.flake import Flake
from clan_cli.machines.update import get_machines_for_update
# Functions to test
from clan_cli.tests.fixtures_flakes import FlakeForTest
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
["jon"], # explizit names
[], # filter tags
["jon"], # expected
)
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_single_name(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
[], # explizit names
["foo"], # filter tags
["jon", "sara"], # expected
)
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_tags(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
["sara"], # explizit names
["foo"], # filter tags
["sara"], # expected
)
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_tags_and_name(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
@pytest.mark.parametrize(
("test_flake_with_core", "explicit_names", "filter_tags", "expected_names"),
[
(
{
"inventory_expr": r"""{
machines.jon = { tags = [ "foo" "bar" ]; };
machines.sara = { tags = [ "foo" "baz" ]; };
}"""
},
[], # no explizit names
[], # no filter tags
["jon", "sara", "vm1", "vm2"], # all machines
),
],
# Important!
# tells pytest to pass these values to the fixture
# So we can write it to the flake fixtures
indirect=["test_flake_with_core"],
)
@pytest.mark.with_core
def test_get_machines_for_update_implicit_all(
test_flake_with_core: FlakeForTest,
explicit_names: list[str],
filter_tags: list[str],
expected_names: list[str],
) -> None:
selected_for_update = get_machines_for_update(
Flake(str(test_flake_with_core.path)),
explicit_names=explicit_names,
filter_tags=filter_tags,
)
names = [m.name for m in selected_for_update]
print(explicit_names, filter_tags)
assert names == expected_names
# TODO: Add more tests for requireExplicitUpdate

View File

@@ -16,12 +16,12 @@ from clan_lib.nix_models.clan import InventoryMachine
log = logging.getLogger(__name__)
def convert_inventory_to_machines(
def instantiate_inventory_to_machines(
flake: Flake, machines: dict[str, InventoryMachine]
) -> dict[str, Machine]:
return {
name: Machine.from_inventory(name, flake, inventory_machine)
for name, inventory_machine in machines.items()
name: Machine.from_inventory(name, flake, _inventory_machine)
for name, _inventory_machine in machines.items()
}
@@ -31,26 +31,7 @@ def list_full_machines(flake: Flake) -> dict[str, Machine]:
"""
machines = list_machines(flake)
return convert_inventory_to_machines(flake, machines)
def query_machines_by_tags(
flake: Flake, tags: list[str]
) -> dict[str, InventoryMachine]:
"""
Query machines by their respective tags, if multiple tags are specified
then only machines that have those respective tags specified will be listed.
It is an intersection of the tags and machines.
"""
machines = list_machines(flake)
filtered_machines = {}
for machine_name, machine in machines.items():
machine_tags = machine.get("tags", [])
if all(tag in machine_tags for tag in tags):
filtered_machines[machine_name] = machine
return filtered_machines
return instantiate_inventory_to_machines(flake, machines)
@dataclass

View File

@@ -188,7 +188,7 @@ def get_host(
if host_str is None:
machine.warn(
f"'{field}' is not set in `inventory.machines.${name}.deploy.targetHost` - falling back to _slower_ nixos option: `clan.core.networking.targetHost`"
f"'{field}' is not set in `inventory.machines.${name}.deploy.targetHost` - falling back to _slower_ nixos option: `clan.core.networking.{field}`"
)
host_str = machine.select(f'config.clan.core.networking."{field}"')
source = "machine"

View File

@@ -48,9 +48,13 @@ def get_available_machines(flake: Flake) -> list[str]:
return list(machines.keys())
def validate_machine_names(machine_names: list[str], flake: Flake) -> None:
def validate_machine_names(machine_names: list[str], flake: Flake) -> list[str]:
"""
Returns a list of valid machine names
that are guaranteed to exist in the referenced clan
"""
if not machine_names:
return
return []
available_machines = get_available_machines(flake)
invalid_machines = [
@@ -70,3 +74,5 @@ def validate_machine_names(machine_names: list[str], flake: Flake) -> None:
error_lines.append(f"Machine '{machine_name}' not found. {suggestion_text}")
raise ClanError("\n".join(error_lines))
return machine_names