Merge remote-tracking branch 'origin/main' into rework-installation

This commit is contained in:
Jörg Thalheim
2024-08-21 13:33:27 +02:00
196 changed files with 10069 additions and 2432 deletions

View File

@@ -1,26 +1,24 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Clan Webui",
"type": "python",
"request": "launch",
"module": "clan_cli.webui",
"justMyCode": false,
"args": [ "--reload", "--no-open", "--log-level", "debug" ],
},
{
"name": "Clan Cli VMs",
"type": "python",
"request": "launch",
"module": "clan_cli",
"justMyCode": false,
"args": [ "vms" ],
}
]
}
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Clan Webui",
"type": "python",
"request": "launch",
"module": "clan_cli.webui",
"justMyCode": false,
"args": ["--reload", "--no-open", "--log-level", "debug"]
},
{
"name": "Clan Cli VMs",
"type": "python",
"request": "launch",
"module": "clan_cli",
"justMyCode": false,
"args": ["vms"]
}
]
}

View File

@@ -1,22 +1,22 @@
{
"python.testing.pytestArgs": [
// Coverage is not supported by vscode:
// https://github.com/Microsoft/vscode-python/issues/693
// Note that this will make pytest fail if pytest-cov is not installed,
// if that's the case, then this option needs to be be removed (overrides
// can be set at a workspace level, it's up to you to decide what's the
// best approach). You might also prefer to only set this option
// per-workspace (wherever coverage is used).
"--no-cov",
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"search.exclude": {
"**/.direnv": true
},
"python.linting.mypyPath": "mypy",
"python.linting.mypyEnabled": true,
"python.linting.enabled": true,
"python.defaultInterpreterPath": "python"
}
"python.testing.pytestArgs": [
// Coverage is not supported by vscode:
// https://github.com/Microsoft/vscode-python/issues/693
// Note that this will make pytest fail if pytest-cov is not installed,
// if that's the case, then this option needs to be be removed (overrides
// can be set at a workspace level, it's up to you to decide what's the
// best approach). You might also prefer to only set this option
// per-workspace (wherever coverage is used).
"--no-cov",
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"search.exclude": {
"**/.direnv": true
},
"python.linting.mypyPath": "mypy",
"python.linting.mypyEnabled": true,
"python.linting.enabled": true,
"python.defaultInterpreterPath": "python"
}

View File

@@ -5,20 +5,18 @@ from pathlib import Path
from types import ModuleType
# These imports are unused, but necessary for @API.register to run once.
from clan_cli.api import directory, mdns_discovery, modules
from clan_cli.api import directory, disk, mdns_discovery, modules
from clan_cli.arg_actions import AppendOptionAction
from clan_cli.clan import show, update
# API endpoints that are not used in the cli.
__all__ = ["directory", "mdns_discovery", "modules", "update"]
__all__ = ["directory", "mdns_discovery", "modules", "update", "disk"]
from . import (
backups,
clan,
config,
facts,
flash,
flatpak,
history,
machines,
secrets,
@@ -178,18 +176,6 @@ For more detailed information, visit: {help_hyperlink("getting-started", "https:
clan.register_parser(parser_flake)
parser_config = subparsers.add_parser(
"config",
help="read a nixos configuration option",
description="read a nixos configuration option",
epilog=(
"""
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
config.register_parser(parser_config)
parser_ssh = subparsers.add_parser(
"ssh",
help="ssh to a remote machine",
@@ -408,8 +394,6 @@ def main() -> None:
if getattr(args, "debug", False):
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
log.debug("Debug log activated")
if flatpak.is_flatpak():
log.debug("Running inside a flatpak sandbox")
else:
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])

View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python3
import argparse
import json
from clan_cli.api import API
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Debug the API.")
args = parser.parse_args()
schema = API.to_json_schema()
print(json.dumps(schema, indent=4))

View File

@@ -12,24 +12,26 @@ from . import API
@dataclass
class FileFilter:
title: str | None
mime_types: list[str] | None
patterns: list[str] | None
suffixes: list[str] | None
title: str | None = field(default=None)
mime_types: list[str] | None = field(default=None)
patterns: list[str] | None = field(default=None)
suffixes: list[str] | None = field(default=None)
@dataclass
class FileRequest:
# Mode of the os dialog window
mode: Literal["open_file", "select_folder", "save"]
mode: Literal["open_file", "select_folder", "save", "open_multiple_files"]
# Title of the os dialog window
title: str | None = None
title: str | None = field(default=None)
# Pre-applied filters for the file dialog
filters: FileFilter | None = None
filters: FileFilter | None = field(default=None)
initial_file: str | None = field(default=None)
initial_folder: str | None = field(default=None)
@API.register_abstract
def open_file(file_request: FileRequest) -> str | None:
def open_file(file_request: FileRequest) -> list[str] | None:
"""
Abstract api method to open a file dialog window.
It must return the name of the selected file or None if no file was selected.
@@ -88,6 +90,8 @@ def get_directory(current_path: str) -> Directory:
@dataclass
class BlkInfo:
name: str
id_link: str
path: str
rm: str
size: str
ro: bool
@@ -103,21 +107,53 @@ class Blockdevices:
def blk_from_dict(data: dict) -> BlkInfo:
return BlkInfo(
name=data["name"],
path=data["path"],
rm=data["rm"],
size=data["size"],
ro=data["ro"],
mountpoints=data["mountpoints"],
type_=data["type"], # renamed here
type_=data["type"], # renamed
id_link=data["id-link"], # renamed
)
@dataclass
class BlockDeviceOptions:
hostname: str | None = None
keyfile: str | None = None
@API.register
def show_block_devices() -> Blockdevices:
def show_block_devices(options: BlockDeviceOptions) -> Blockdevices:
"""
Abstract api method to show block devices.
It must return a list of block devices.
"""
cmd = nix_shell(["nixpkgs#util-linux"], ["lsblk", "--json"])
keyfile = options.keyfile
remote = (
[
"ssh",
*(["-i", f"{keyfile}"] if keyfile else []),
# Disable strict host key checking
"-o StrictHostKeyChecking=no",
# Disable known hosts file
"-o UserKnownHostsFile=/dev/null",
f"{options.hostname}",
]
if options.hostname
else []
)
cmd = nix_shell(
["nixpkgs#util-linux", *(["nixpkgs#openssh"] if options.hostname else [])],
[
*remote,
"lsblk",
"--json",
"--output",
"PATH,NAME,RM,SIZE,RO,MOUNTPOINTS,TYPE,ID-LINK",
],
)
proc = run_no_stdout(cmd)
res = proc.stdout.strip()

View File

@@ -0,0 +1,65 @@
from clan_cli.api import API
from clan_cli.inventory import (
ServiceMeta,
ServiceSingleDisk,
ServiceSingleDiskRole,
ServiceSingleDiskRoleDefault,
SingleDiskConfig,
load_inventory_eval,
load_inventory_json,
save_inventory,
)
def get_instance_name(machine_name: str) -> str:
return f"{machine_name}-single-disk"
@API.register
def set_single_disk_uuid(
base_path: str,
machine_name: str,
disk_uuid: str,
) -> None:
"""
Set the disk UUID of single disk machine
"""
inventory = load_inventory_json(base_path)
instance_name = get_instance_name(machine_name)
single_disk_config: ServiceSingleDisk = ServiceSingleDisk(
meta=ServiceMeta(name=instance_name),
roles=ServiceSingleDiskRole(
default=ServiceSingleDiskRoleDefault(
config=SingleDiskConfig(device=disk_uuid)
)
),
)
inventory.services.single_disk[instance_name] = single_disk_config
save_inventory(
inventory,
base_path,
f"Set disk UUID: '{disk_uuid}' on machine: '{machine_name}'",
)
@API.register
def get_single_disk_uuid(
base_path: str,
machine_name: str,
) -> str | None:
"""
Get the disk UUID of single disk machine
"""
inventory = load_inventory_eval(base_path)
instance_name = get_instance_name(machine_name)
single_disk_config: ServiceSingleDisk = inventory.services.single_disk[
instance_name
]
return single_disk_config.roles.default.config.device

View File

@@ -3,13 +3,16 @@ import re
import tomllib
from dataclasses import dataclass
from pathlib import Path
from typing import Any, get_args, get_type_hints
from clan_cli.cmd import run_no_stdout
from clan_cli.errors import ClanCmdError, ClanError
from clan_cli.inventory import Inventory, load_inventory_json
from clan_cli.inventory import Inventory, load_inventory_json, save_inventory
from clan_cli.inventory.classes import Service
from clan_cli.nix import nix_eval
from . import API
from .serde import from_dict
@dataclass
@@ -153,3 +156,35 @@ def get_module_info(
@API.register
def get_inventory(base_path: str) -> Inventory:
return load_inventory_json(base_path)
@API.register
def set_service_instance(
base_path: str, module_name: str, instance_name: str, config: dict[str, Any]
) -> None:
"""
A function that allows to set any service instance in the inventory.
Takes any untyped dict. The dict is then checked and converted into the correct type using the type hints of the service.
If any conversion error occurs, the function will raise an error.
"""
service_keys = get_type_hints(Service).keys()
if module_name not in service_keys:
raise ValueError(
f"{module_name} is not a valid Service attribute. Expected one of {', '.join(service_keys)}."
)
inventory = load_inventory_json(base_path)
target_type = get_args(get_type_hints(Service)[module_name])[1]
module_instance_map: dict[str, Any] = getattr(inventory.services, module_name, {})
module_instance_map[instance_name] = from_dict(target_type, config)
setattr(inventory.services, module_name, module_instance_map)
save_inventory(
inventory, base_path, f"Update {module_name} instance {instance_name}"
)
# TODO: Add a check that rolls back the inventory if the service config is not valid or causes conflicts.

View File

@@ -29,17 +29,21 @@ Dependencies:
Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module.
"""
import dataclasses
import json
from dataclasses import dataclass, fields, is_dataclass
from pathlib import Path
from types import UnionType
from typing import (
Annotated,
Any,
Literal,
TypeVar,
Union,
get_args,
get_origin,
)
from pydantic import TypeAdapter, ValidationError
from pydantic_core import ErrorDetails
from clan_cli.errors import ClanError
@@ -64,7 +68,8 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
field.metadata.get("alias", field.name) if use_alias else field.name
): _to_dict(getattr(obj, field.name))
for field in fields(obj)
if not field.name.startswith("_") # type: ignore
if not field.name.startswith("_")
and getattr(obj, field.name) is not None # type: ignore
}
elif isinstance(obj, list | tuple):
return [_to_dict(item) for item in obj]
@@ -81,26 +86,169 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
T = TypeVar("T", bound=dataclass) # type: ignore
G = TypeVar("G") # type: ignore
def from_dict(t: type[T], data: Any) -> T:
def is_union_type(type_hint: type | UnionType) -> bool:
return (
type(type_hint) is UnionType
or isinstance(type_hint, UnionType)
or get_origin(type_hint) is Union
)
def is_type_in_union(union_type: type | UnionType, target_type: type) -> bool:
if get_origin(union_type) is UnionType:
return any(issubclass(arg, target_type) for arg in get_args(union_type))
return union_type == target_type
def unwrap_none_type(type_hint: type | UnionType) -> type:
"""
Takes a type union and returns the first non-None type.
None | str
=>
str
"""
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint # type: ignore
JsonValue = str | float | dict[str, Any] | list[Any] | None
def construct_value(t: type, field_value: JsonValue, loc: list[str] = []) -> Any:
"""
Construct a field value from a type hint and a field value.
"""
if t is None and field_value:
raise ClanError(f"Expected None but got: {field_value}", location=f"{loc}")
# If the field is another dataclass
# Field_value must be a dictionary
if is_dataclass(t) and isinstance(field_value, dict):
return construct_dataclass(t, field_value)
# If the field expects a path
# Field_value must be a string
elif is_type_in_union(t, Path):
if not isinstance(field_value, str):
raise ClanError(
f"Expected string, cannot construct pathlib.Path() from: {field_value} ",
location=f"{loc}",
)
return Path(field_value)
# Trivial values
elif t is str:
if not isinstance(field_value, str):
raise ClanError(f"Expected string, got {field_value}", location=f"{loc}")
return field_value
elif t is int and not isinstance(field_value, str):
return int(field_value) # type: ignore
elif t is float and not isinstance(field_value, str):
return float(field_value) # type: ignore
elif t is bool and isinstance(field_value, bool):
return field_value # type: ignore
# Union types construct the first non-None type
elif is_union_type(t):
# Unwrap the union type
t = unwrap_none_type(t)
# Construct the field value
return construct_value(t, field_value)
# Nested types
# list
# dict
elif get_origin(t) is list:
if not isinstance(field_value, list):
raise ClanError(f"Expected list, got {field_value}", location=f"{loc}")
return [construct_value(get_args(t)[0], item) for item in field_value]
elif get_origin(t) is dict and isinstance(field_value, dict):
return {
key: construct_value(get_args(t)[1], value)
for key, value in field_value.items()
}
elif get_origin(t) is Literal:
valid_values = get_args(t)
if field_value not in valid_values:
raise ClanError(
f"Expected one of {valid_values}, got {field_value}", location=f"{loc}"
)
return field_value
elif get_origin(t) is Annotated:
(base_type,) = get_args(t)
return construct_value(base_type, field_value)
# elif get_origin(t) is Union:
# Unhandled
else:
raise ClanError(f"Unhandled field type {t} with value {field_value}")
def construct_dataclass(t: type[T], data: dict[str, Any], path: list[str] = []) -> T:
"""
type t MUST be a dataclass
Dynamically instantiate a data class from a dictionary, handling nested data classes.
We use dataclasses. But the deserialization logic of pydantic takes a lot of complexity.
"""
adapter = TypeAdapter(t)
try:
return adapter.validate_python(
data,
)
except ValidationError as e:
fst_error: ErrorDetails = e.errors()[0]
if not fst_error:
raise ClanError(msg=str(e))
if not is_dataclass(t):
raise ClanError(f"{t.__name__} is not a dataclass")
msg = fst_error.get("msg")
loc = fst_error.get("loc")
field_path = "Unknown"
if loc:
field_path = str(loc)
raise ClanError(msg=msg, location=f"{t!s}: {field_path}", description=str(e))
# Attempt to create an instance of the data_class#
field_values: dict[str, Any] = {}
required: list[str] = []
for field in fields(t):
if field.name.startswith("_"):
continue
# The first type in a Union
# str <- None | str | Path
field_type: type[Any] = unwrap_none_type(field.type) # type: ignore
data_field_name = field.metadata.get("alias", field.name)
if (
field.default is dataclasses.MISSING
and field.default_factory is dataclasses.MISSING
):
required.append(field.name)
# Populate the field_values dictionary with the field value
# if present in the data
if data_field_name in data:
field_value = data.get(data_field_name)
if field_value is None and (
field.type is None or is_type_in_union(field.type, type(None))
):
field_values[field.name] = None
else:
field_values[field.name] = construct_value(field_type, field_value)
# Check that all required field are present.
for field_name in required:
if field_name not in field_values:
formatted_path = " ".join(path)
raise ClanError(
f"Default value missing for: '{field_name}' in {t} {formatted_path}, got Value: {data}"
)
return t(**field_values) # type: ignore
def from_dict(t: type[G], data: dict[str, Any] | Any, path: list[str] = []) -> G:
if is_dataclass(t):
if not isinstance(data, dict):
raise ClanError(f"{data} is not a dict. Expected {t}")
return construct_dataclass(t, data, path) # type: ignore
else:
return construct_value(t, data, path)

View File

@@ -6,7 +6,7 @@ from ..clan_uri import FlakeId
from ..cmd import run
from ..dirs import machine_gcroot
from ..errors import ClanError
from ..machines.list import list_machines
from ..machines.list import list_nixos_machines
from ..machines.machines import Machine
from ..nix import nix_add_to_gcroots, nix_build, nix_config, nix_eval, nix_metadata
from ..vms.inspect import VmConfig, inspect_vm
@@ -40,7 +40,7 @@ def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
system = config["system"]
# Check if the machine exists
machines = list_machines(flake_url, False)
machines: list[str] = list_nixos_machines(flake_url)
if machine_name not in machines:
raise ClanError(
f"Machine {machine_name} not found in {flake_url}. Available machines: {', '.join(machines)}"
@@ -57,7 +57,7 @@ def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
# Get the Clan name
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clan.core.clanName'
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clan.core.name'
]
)
res = run_cmd(cmd)
@@ -66,7 +66,7 @@ def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
# Get the clan icon path
cmd = nix_eval(
[
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clan.core.clanIcon'
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clan.core.icon'
]
)
res = run_cmd(cmd)
@@ -79,9 +79,9 @@ def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
cmd = nix_build(
[
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clan.core.clanIcon'
f'{flake_url}#clanInternals.machines."{system}"."{machine_name}".config.clan.core.icon'
],
machine_gcroot(flake_url=str(flake_url)) / "clanIcon",
machine_gcroot(flake_url=str(flake_url)) / "icon",
)
run_cmd(cmd)

View File

@@ -4,12 +4,10 @@ import json
import logging
import os
import re
import sys
from pathlib import Path
from typing import Any, get_origin
from clan_cli.cmd import run
from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.dirs import machine_settings_file
from clan_cli.errors import ClanError
from clan_cli.git import commit_file
@@ -305,65 +303,3 @@ def set_option(
repo_dir=flake_dir,
commit_message=f"Set option {option_description}",
)
# takes a (sub)parser and configures it
def register_parser(
parser: argparse.ArgumentParser | None,
) -> None:
if parser is None:
parser = argparse.ArgumentParser(
description="Set or show NixOS options",
)
# inject callback function to process the input later
parser.set_defaults(func=get_option)
set_machine_action = parser.add_argument(
"--machine",
"-m",
help="Machine to configure",
type=str,
default="default",
)
add_dynamic_completer(set_machine_action, complete_machines)
parser.add_argument(
"--show-trace",
help="Show nix trace on evaluation error",
action="store_true",
)
parser.add_argument(
"--options-file",
help="JSON file with options",
type=Path,
)
parser.add_argument(
"--settings-file",
help="JSON file with settings",
type=Path,
)
parser.add_argument(
"--quiet",
help="Do not print the value",
action="store_true",
)
parser.add_argument(
"option",
help="Option to read or set (e.g. foo.bar)",
type=str,
)
def main(argv: list[str] | None = None) -> None:
if argv is None:
argv = sys.argv
parser = argparse.ArgumentParser()
register_parser(parser)
parser.parse_args(argv[1:])
if __name__ == "__main__":
main()

View File

@@ -18,7 +18,7 @@ def verify_machine_config(
) -> str | None:
"""
Verify that the machine evaluates successfully
Returns a tuple of (success, error_message)
Returns None, in case of success, or a String containing the error_message
"""
if config is None:
config = config_for_machine(flake_dir, machine_name)

View File

@@ -11,6 +11,8 @@ from clan_cli.errors import ClanError, ClanHttpError
from clan_cli.nix import nix_eval
# TODO: When moving the api to `clan-app`, the whole config module should be
# ported to the `clan-app`, because it is not used by the cli at all.
@API.register
def machine_schema(
flake_dir: Path,
@@ -86,9 +88,9 @@ def machine_schema(
[
clan-core.nixosModules.clanCore
# potentially the config might affect submodule options,
# therefore we need to import it
# therefore we need to import it
config
{{ clan.core.clanName = "fakeClan"; }}
{{ clan.core.name = "fakeClan"; }}
]
# add all clan modules specified via clanImports
++ (map (name: clan-core.clanModules.${{name}}) config.clanImports or []);

View File

@@ -105,14 +105,8 @@ def generate_service_facts(
)
files_to_commit = []
# store secrets
for secret in machine.facts_data[service]["secret"]:
if isinstance(secret, str):
# TODO: This is the old NixOS module, can be dropped everyone has updated.
secret_name = secret
groups = []
else:
secret_name = secret["name"]
groups = secret.get("groups", [])
for secret_name, secret in machine.facts_data[service]["secret"].items():
groups = secret.get("groups", [])
secret_file = secrets_dir / secret_name
if not secret_file.is_file():

View File

@@ -31,9 +31,11 @@ def upload_secrets(machine: Machine) -> None:
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
"-az",
"--recursive",
"--links",
"--times",
"--compress",
"--delete",
"--chown=root:root",
"--chmod=D700,F600",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",

View File

@@ -6,7 +6,7 @@ import os
import shutil
import textwrap
from collections.abc import Sequence
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
@@ -19,23 +19,118 @@ from .completions import add_dynamic_completer, complete_machines
from .errors import ClanError
from .facts.secret_modules import SecretStoreBase
from .machines.machines import Machine
from .nix import nix_shell
from .nix import nix_build, nix_shell
log = logging.getLogger(__name__)
@dataclass
class WifiConfig:
ssid: str
password: str
@dataclass
class SystemConfig:
language: str | None = field(default=None)
keymap: str | None = field(default=None)
ssh_keys_path: list[str] | None = field(default=None)
wifi_settings: list[WifiConfig] | None = field(default=None)
@API.register
def list_possible_keymaps() -> list[str]:
cmd = nix_build(["nixpkgs#kbd"])
result = run(cmd, log=Log.STDERR, error_msg="Failed to find kbdinfo")
keymaps_dir = Path(result.stdout.strip()) / "share" / "keymaps"
if not keymaps_dir.exists():
raise FileNotFoundError(f"Keymaps directory '{keymaps_dir}' does not exist.")
keymap_files = []
for root, _, files in os.walk(keymaps_dir):
for file in files:
if file.endswith(".map.gz"):
# Remove '.map.gz' ending
name_without_ext = file[:-7]
keymap_files.append(name_without_ext)
return keymap_files
@API.register
def list_possible_languages() -> list[str]:
cmd = nix_build(["nixpkgs#glibcLocales"])
result = run(cmd, log=Log.STDERR, error_msg="Failed to find glibc locales")
locale_file = Path(result.stdout.strip()) / "share" / "i18n" / "SUPPORTED"
if not locale_file.exists():
raise FileNotFoundError(f"Locale file '{locale_file}' does not exist.")
with locale_file.open() as f:
lines = f.readlines()
languages = []
for line in lines:
if line.startswith("#"):
continue
if "SUPPORTED-LOCALES" in line:
continue
# Split by '/' and take the first part
language = line.split("/")[0].strip()
languages.append(language)
return languages
@API.register
def flash_machine(
machine: Machine,
*,
mode: str,
disks: dict[str, str],
system_config: dict[str, Any],
system_config: SystemConfig,
dry_run: bool,
write_efi_boot_entries: bool,
debug: bool,
extra_args: list[str] = [],
) -> None:
system_config_nix: dict[str, Any] = {}
if system_config.wifi_settings:
wifi_settings = {}
for wifi in system_config.wifi_settings:
wifi_settings[wifi.ssid] = {"password": wifi.password}
system_config_nix["clan"] = {"iwd": {"networks": wifi_settings}}
if system_config.language:
if system_config.language not in list_possible_languages():
raise ClanError(
f"Language '{system_config.language}' is not a valid language. "
f"Run 'clan flash --list-languages' to see a list of possible languages."
)
system_config_nix["i18n"] = {"defaultLocale": system_config.language}
if system_config.keymap:
if system_config.keymap not in list_possible_keymaps():
raise ClanError(
f"Keymap '{system_config.keymap}' is not a valid keymap. "
f"Run 'clan flash --list-keymaps' to see a list of possible keymaps."
)
system_config_nix["console"] = {"keyMap": system_config.keymap}
if system_config.ssh_keys_path:
root_keys = []
for key_path in map(lambda x: Path(x), system_config.ssh_keys_path):
try:
root_keys.append(key_path.read_text())
except OSError as e:
raise ClanError(f"Cannot read SSH public key file: {key_path}: {e}")
system_config_nix["users"] = {
"users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}}
}
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store: SecretStoreBase = secret_facts_module.SecretStore(
machine=machine
@@ -58,7 +153,8 @@ def flash_machine(
raise ClanError(
"sudo is required to run disko-install as a non-root user"
)
disko_install.append("sudo")
wrapper = 'set -x; disko_install=$(command -v disko-install); exec sudo "$disko_install" "$@"'
disko_install.extend(["bash", "-c", wrapper])
disko_install.append("disko-install")
if write_efi_boot_entries:
@@ -76,7 +172,7 @@ def flash_machine(
disko_install.extend(
[
"--system-config",
json.dumps(system_config),
json.dumps(system_config_nix),
]
)
disko_install.extend(["--option", "dry-run", "true"])
@@ -94,15 +190,13 @@ class FlashOptions:
flake: FlakeId
machine: str
disks: dict[str, str]
ssh_keys_path: list[Path]
dry_run: bool
confirm: bool
debug: bool
mode: str
language: str
keymap: str
write_efi_boot_entries: bool
nix_options: list[str]
system_config: SystemConfig
class AppendDiskAction(argparse.Action):
@@ -126,17 +220,36 @@ def flash_command(args: argparse.Namespace) -> None:
flake=args.flake,
machine=args.machine,
disks=args.disk,
ssh_keys_path=args.ssh_pubkey,
dry_run=args.dry_run,
confirm=not args.yes,
debug=args.debug,
mode=args.mode,
language=args.language,
keymap=args.keymap,
system_config=SystemConfig(
language=args.language,
keymap=args.keymap,
ssh_keys_path=args.ssh_pubkey,
wifi_settings=None,
),
write_efi_boot_entries=args.write_efi_boot_entries,
nix_options=args.option,
)
if args.list_languages:
for language in list_possible_languages():
print(language)
return
if args.list_keymaps:
for keymap in list_possible_keymaps():
print(keymap)
return
if args.wifi:
opts.system_config.wifi_settings = [
WifiConfig(ssid=ssid, password=password)
for ssid, password in args.wifi.items()
]
machine = Machine(opts.machine, flake=opts.flake)
if opts.confirm and not opts.dry_run:
disk_str = ", ".join(f"{name}={device}" for name, device in opts.disks.items())
@@ -148,28 +261,11 @@ def flash_command(args: argparse.Namespace) -> None:
if ask != "y":
return
extra_config: dict[str, Any] = {}
if opts.ssh_keys_path:
root_keys = []
for key_path in opts.ssh_keys_path:
try:
root_keys.append(key_path.read_text())
except OSError as e:
raise ClanError(f"Cannot read SSH public key file: {key_path}: {e}")
extra_config["users"] = {
"users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}}
}
if opts.keymap:
extra_config["console"] = {"keyMap": opts.keymap}
if opts.language:
extra_config["i18n"] = {"defaultLocale": opts.language}
flash_machine(
machine,
mode=opts.mode,
disks=opts.disks,
system_config=extra_config,
system_config=opts.system_config,
dry_run=opts.dry_run,
debug=opts.debug,
write_efi_boot_entries=opts.write_efi_boot_entries,
@@ -202,6 +298,15 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
Mount is useful for updating an existing system without losing data.
"""
)
parser.add_argument(
"--wifi",
type=str,
nargs=2,
metavar=("ssid", "password"),
action=AppendDiskAction,
help="wifi network to connect to",
default={},
)
parser.add_argument(
"--mode",
type=str,
@@ -221,6 +326,18 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
type=str,
help="system language",
)
parser.add_argument(
"--list-languages",
help="List possible languages",
default=False,
action="store_true",
)
parser.add_argument(
"--list-keymaps",
help="List possible keymaps",
default=False,
action="store_true",
)
parser.add_argument(
"--keymap",
type=str,

View File

@@ -1,16 +0,0 @@
import os
def is_flatpak() -> bool:
"""Check if the current process is running inside a flatpak sandbox."""
# FLATPAK_ID environment variable check
flatpak_env = "FLATPAK_ID" in os.environ
flatpak_file = False
try:
with open("/.flatpak-info"):
flatpak_file = True
except FileNotFoundError:
pass
return flatpak_env and flatpak_file

View File

@@ -7,7 +7,7 @@ import logging
from typing import Any
from clan_cli.clan.inspect import FlakeConfig, inspect_flake
from clan_cli.machines.list import list_machines
from clan_cli.machines.list import list_nixos_machines
from ..clan_uri import ClanURI
from ..dirs import user_history_file
@@ -72,7 +72,7 @@ def new_history_entry(url: str, machine: str) -> HistoryEntry:
def add_all_to_history(uri: ClanURI) -> list[HistoryEntry]:
history = list_history()
new_entries: list[HistoryEntry] = []
for machine in list_machines(uri.get_url()):
for machine in list_nixos_machines(uri.get_url()):
new_entry = _add_maschine_to_history_list(uri.get_url(), machine, history)
new_entries.append(new_entry)
write_history_file(history)

View File

@@ -32,6 +32,10 @@ from .classes import (
ServiceBorgbackupRoleClient,
ServiceBorgbackupRoleServer,
ServiceMeta,
ServiceSingleDisk,
ServiceSingleDiskRole,
ServiceSingleDiskRoleDefault,
SingleDiskConfig,
)
# Re export classes here
@@ -49,6 +53,11 @@ __all__ = [
"ServiceBorgbackupRole",
"ServiceBorgbackupRoleClient",
"ServiceBorgbackupRoleServer",
# Single Disk service
"ServiceSingleDisk",
"ServiceSingleDiskRole",
"ServiceSingleDiskRoleDefault",
"SingleDiskConfig",
]
@@ -82,6 +91,7 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
"--json",
]
)
proc = run_no_stdout(cmd)
try:

View File

@@ -6,7 +6,6 @@ from .delete import register_delete_parser
from .hardware import register_hw_generate
from .install import register_install_parser
from .list import register_list_parser
from .show import register_show_parser
from .update import register_update_parser
@@ -86,17 +85,6 @@ For more detailed information, visit: https://docs.clan.lol/getting-started/conf
)
register_hw_generate(generate_hw_parser)
show_parser = subparser.add_parser(
"show",
help="Show a machine",
epilog=(
"""
This subcommand shows the details of a machine managed by this clan like icon, description, etc
"""
),
)
register_show_parser(show_parser)
install_parser = subparser.add_parser(
"install",
help="Install a machine",

View File

@@ -31,6 +31,8 @@ def create_machine(flake: FlakeId, machine: Machine) -> None:
if machine.name in full_inventory.machines.keys():
raise ClanError(f"Machine with the name {machine.name} already exists")
print(f"Define machine {machine.name}", machine)
inventory.machines.update({machine.name: machine})
save_inventory(inventory, flake.path, f"Create machine {machine.name}")

View File

@@ -94,6 +94,7 @@ def generate_machine_hardware_info(
machine_name: str,
hostname: str | None = None,
password: str | None = None,
keyfile: str | None = None,
force: bool | None = False,
) -> HardwareInfo:
"""
@@ -117,15 +118,15 @@ def generate_machine_hardware_info(
[
*(["sshpass", "-p", f"{password}"] if password else []),
"ssh",
# Disable strict host key checking
"-o",
"StrictHostKeyChecking=no",
*(["-i", f"{keyfile}"] if keyfile else []),
# Disable known hosts file
"-o",
"UserKnownHostsFile=/dev/null",
"-p",
str(machine.target_host.port),
target_host,
"-o UserKnownHostsFile=/dev/null",
f"{hostname}",
"nixos-generate-config",
# Filesystems are managed by disko
"--no-filesystems",

View File

@@ -3,10 +3,12 @@ import importlib
import json
import logging
import os
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_cli.api import API
from ..clan_uri import FlakeId
from ..cmd import Log, run
from ..completions import add_dynamic_completer, complete_machines
@@ -91,15 +93,30 @@ def install_nixos(
@dataclass
class InstallOptions:
# flake to install
flake: FlakeId
machine: str
target_host: str
kexec: str | None
confirm: bool
debug: bool
no_reboot: bool
json_ssh_deploy: dict[str, str] | None
nix_options: list[str]
kexec: str | None = None
debug: bool = False
no_reboot: bool = False
json_ssh_deploy: dict[str, str] | None = None
nix_options: list[str] = field(default_factory=list)
@API.register
def install_machine(opts: InstallOptions, password: str | None) -> None:
machine = Machine(opts.machine, flake=opts.flake)
machine.target_host_address = opts.target_host
install_nixos(
machine,
kexec=opts.kexec,
debug=opts.debug,
password=password,
no_reboot=opts.no_reboot,
extra_args=opts.nix_options,
)
def install_command(args: argparse.Namespace) -> None:
@@ -123,32 +140,23 @@ def install_command(args: argparse.Namespace) -> None:
target_host = args.target_host
password = None
opts = InstallOptions(
flake=args.flake,
machine=args.machine,
target_host=target_host,
kexec=args.kexec,
confirm=not args.yes,
debug=args.debug,
no_reboot=args.no_reboot,
json_ssh_deploy=json_ssh_deploy,
nix_options=args.option,
)
machine = Machine(opts.machine, flake=opts.flake)
machine.target_host_address = opts.target_host
if opts.confirm:
ask = input(f"Install {machine.name} to {opts.target_host}? [y/N] ")
if not args.yes:
ask = input(f"Install {args.machine} to {target_host}? [y/N] ")
if ask != "y":
return
install_nixos(
machine,
kexec=opts.kexec,
debug=opts.debug,
password=password,
no_reboot=opts.no_reboot,
extra_args=opts.nix_options,
return install_machine(
InstallOptions(
flake=args.flake,
machine=args.machine,
target_host=target_host,
kexec=args.kexec,
debug=args.debug,
no_reboot=args.no_reboot,
json_ssh_deploy=json_ssh_deploy,
nix_options=args.option,
),
password,
)

View File

@@ -1,22 +1,121 @@
import argparse
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from clan_cli.api import API
from clan_cli.cmd import run_no_stdout
from clan_cli.errors import ClanCmdError, ClanError
from clan_cli.inventory import Machine, load_inventory_eval
from clan_cli.nix import nix_eval, nix_shell
log = logging.getLogger(__name__)
@API.register
def list_machines(flake_url: str | Path, debug: bool = False) -> dict[str, Machine]:
def list_inventory_machines(flake_url: str | Path) -> dict[str, Machine]:
inventory = load_inventory_eval(flake_url)
return inventory.machines
@dataclass
class MachineDetails:
machine: Machine
has_hw_specs: bool = False
# TODO:
# has_disk_specs: bool = False
@API.register
def get_inventory_machine_details(
flake_url: str | Path, machine_name: str
) -> MachineDetails:
inventory = load_inventory_eval(flake_url)
machine = inventory.machines.get(machine_name)
if machine is None:
raise ClanError(f"Machine {machine_name} not found in inventory")
hw_config_path = (
Path(flake_url) / "machines" / Path(machine_name) / "hardware-configuration.nix"
)
return MachineDetails(
machine=machine,
has_hw_specs=hw_config_path.exists(),
)
@API.register
def list_nixos_machines(flake_url: str | Path) -> list[str]:
cmd = nix_eval(
[
f"{flake_url}#nixosConfigurations",
"--apply",
"builtins.attrNames",
"--json",
]
)
proc = run_no_stdout(cmd)
try:
res = proc.stdout.strip()
data = json.loads(res)
return data
except json.JSONDecodeError as e:
raise ClanError(f"Error decoding machines from flake: {e}")
@dataclass
class ConnectionOptions:
keyfile: str | None = None
timeout: int = 2
@API.register
def check_machine_online(
flake_url: str | Path, machine_name: str, opts: ConnectionOptions | None
) -> Literal["Online", "Offline"]:
machine = load_inventory_eval(flake_url).machines.get(machine_name)
if not machine:
raise ClanError(f"Machine {machine_name} not found in inventory")
hostname = machine.deploy.targetHost
if not hostname:
raise ClanError(f"Machine {machine_name} does not specify a targetHost")
timeout = opts.timeout if opts and opts.timeout else 2
cmd = nix_shell(
["nixpkgs#util-linux", *(["nixpkgs#openssh"] if hostname else [])],
[
"ssh",
*(["-i", f"{opts.keyfile}"] if opts and opts.keyfile else []),
# Disable strict host key checking
"-o StrictHostKeyChecking=no",
# Disable known hosts file
"-o UserKnownHostsFile=/dev/null",
f"-o ConnectTimeout={timeout}",
f"{hostname}",
"true",
"&> /dev/null",
],
)
try:
proc = run_no_stdout(cmd)
if proc.returncode != 0:
return "Offline"
return "Online"
except ClanCmdError:
return "Offline"
def list_command(args: argparse.Namespace) -> None:
flake_path = args.flake.path
for name in list_machines(flake_path, args.debug).keys():
for name in list_nixos_machines(flake_path):
print(name)

View File

@@ -1,59 +0,0 @@
import argparse
import dataclasses
import json
import logging
from pathlib import Path
from clan_cli.api import API
from ..cmd import run_no_stdout
from ..completions import add_dynamic_completer, complete_machines
from ..nix import nix_config, nix_eval
from .types import machine_name_type
log = logging.getLogger(__name__)
@dataclasses.dataclass
class MachineInfo:
machine_name: str
machine_description: str | None
machine_icon: str | None
@API.register
def show_machine(flake_url: str | Path, machine_name: str) -> MachineInfo:
config = nix_config()
system = config["system"]
cmd = nix_eval(
[
f"{flake_url}#clanInternals.machines.{system}.{machine_name}",
"--apply",
"machine: { inherit (machine.config.clan.core) machineDescription machineIcon machineName; }",
"--json",
]
)
proc = run_no_stdout(cmd)
res = proc.stdout.strip()
machine = json.loads(res)
return MachineInfo(
machine_name=machine.get("machineName"),
machine_description=machine.get("machineDescription", None),
machine_icon=machine.get("machineIcon", None),
)
def show_command(args: argparse.Namespace) -> None:
machine = show_machine(args.flake.path, args.machine)
print(f"Name: {machine.machine_name}")
print(f"Description: {machine.machine_description or ''}")
print(f"Icon: {machine.machine_icon or ''}")
def register_show_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=show_command)
machine_parser = parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type
)
add_dynamic_completer(machine_parser, complete_machines)

View File

@@ -5,11 +5,15 @@ import os
import shlex
import sys
from clan_cli.api import API
from clan_cli.clan_uri import FlakeId
from ..cmd import run
from ..completions import add_dynamic_completer, complete_machines
from ..errors import ClanError
from ..facts.generate import generate_facts
from ..facts.upload import upload_secrets
from ..inventory import Machine as InventoryMachine
from ..machines.machines import Machine
from ..nix import nix_command, nix_metadata
from ..ssh import HostKeyCheck
@@ -81,6 +85,25 @@ def upload_sources(
)
@API.register
def update_machines(base_path: str, machines: list[InventoryMachine]) -> None:
group_machines: list[Machine] = []
# Convert InventoryMachine to Machine
for machine in machines:
m = Machine(
name=machine.name,
flake=FlakeId(base_path),
)
if not machine.deploy.targetHost:
raise ClanError(f"'TargetHost' is not set for machine '{machine.name}'")
# Copy targetHost to machine
m.target_host_address = machine.deploy.targetHost
group_machines.append(m)
deploy_machine(MachineGroup(group_machines))
def deploy_machine(machines: MachineGroup) -> None:
"""
Deploy to all hosts in parallel
@@ -97,8 +120,10 @@ def deploy_machine(machines: MachineGroup) -> None:
generate_vars([machine], None, False)
upload_secrets(machine)
path = upload_sources(".", target)
path = upload_sources(
str(machine.flake.path) if machine.flake.is_local() else machine.flake.url,
target,
)
if host.host_key_check != HostKeyCheck.STRICT:
ssh_arg += " -o StrictHostKeyChecking=no"
if host.host_key_check == HostKeyCheck.NONE:
@@ -109,6 +134,7 @@ def deploy_machine(machines: MachineGroup) -> None:
cmd = [
"nixos-rebuild",
"switch",
"--show-trace",
"--fast",
"--option",
"keep-going",

View File

@@ -35,6 +35,7 @@ def nix_build(flags: list[str], gcroot: Path | None = None) -> list[str]:
str(gcroot),
"--print-out-paths",
"--no-write-lock-file",
"--show-trace",
]
)
+ flags
@@ -47,6 +48,7 @@ def nix_build(flags: list[str], gcroot: Path | None = None) -> list[str]:
"--no-link",
"--print-out-paths",
"--no-write-lock-file",
"--show-trace",
]
)
+ flags

View File

@@ -47,10 +47,16 @@ def get_machine(flake_dir: Path, name: str) -> str:
def has_machine(flake_dir: Path, name: str) -> bool:
"""
Checks if a machine exists in the sops machines folder
"""
return (sops_machines_folder(flake_dir) / name / "key.json").exists()
def list_machines(flake_dir: Path) -> list[str]:
def list_sops_machines(flake_dir: Path) -> list[str]:
"""
Lists all machines in the sops machines folder
"""
path = sops_machines_folder(flake_dir)
def validate(name: str) -> bool:
@@ -86,7 +92,7 @@ def remove_secret(flake_dir: Path, machine: str, secret: str) -> None:
def list_command(args: argparse.Namespace) -> None:
if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory")
lst = list_machines(args.flake.path)
lst = list_sops_machines(args.flake.path)
if len(lst) > 0:
print("\n".join(lst))

View File

@@ -163,7 +163,10 @@ def remove_command(args: argparse.Namespace) -> None:
def add_secret_argument(parser: argparse.ArgumentParser, autocomplete: bool) -> None:
secrets_parser = parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
"secret",
metavar="secret-name",
help="the name of the secret",
type=secret_name_type,
)
if autocomplete:
add_dynamic_completer(secrets_parser, complete_secrets)

View File

@@ -57,6 +57,7 @@ def decrypt_dependencies(
generator_name: str,
secret_vars_store: SecretStoreBase,
public_vars_store: FactStoreBase,
shared: bool,
) -> dict[str, dict[str, bytes]]:
generator = machine.vars_generators[generator_name]
dependencies = set(generator["dependencies"])
@@ -67,11 +68,11 @@ def decrypt_dependencies(
for file_name, file in dep_files.items():
if file["secret"]:
decrypted_dependencies[dep_generator][file_name] = (
secret_vars_store.get(dep_generator, file_name)
secret_vars_store.get(dep_generator, file_name, shared=shared)
)
else:
decrypted_dependencies[dep_generator][file_name] = (
public_vars_store.get(dep_generator, file_name)
public_vars_store.get(dep_generator, file_name, shared=shared)
)
return decrypted_dependencies
@@ -109,10 +110,11 @@ def execute_generator(
msg += "fact/secret generation is only supported for local flakes"
generator = machine.vars_generators[generator_name]["finalScript"]
is_shared = machine.vars_generators[generator_name]["share"]
# build temporary file tree of dependencies
decrypted_dependencies = decrypt_dependencies(
machine, generator_name, secret_vars_store, public_vars_store
machine, generator_name, secret_vars_store, public_vars_store, shared=is_shared
)
env = os.environ.copy()
with TemporaryDirectory() as tmp:
@@ -159,11 +161,18 @@ def execute_generator(
raise ClanError(msg)
if file["secret"]:
file_path = secret_vars_store.set(
generator_name, file_name, secret_file.read_bytes(), groups
generator_name,
file_name,
secret_file.read_bytes(),
groups,
shared=is_shared,
)
else:
file_path = public_vars_store.set(
generator_name, file_name, secret_file.read_bytes()
generator_name,
file_name,
secret_file.read_bytes(),
shared=is_shared,
)
if file_path:
files_to_commit.append(file_path)
@@ -260,18 +269,18 @@ def generate_vars(
) -> bool:
was_regenerated = False
for machine in machines:
errors = 0
errors = []
try:
was_regenerated |= _generate_vars_for_machine(
machine, generator_name, regenerate
)
except Exception as exc:
log.error(f"Failed to generate facts for {machine.name}: {exc}")
errors += 1
if errors > 0:
errors += [exc]
if len(errors) > 0:
raise ClanError(
f"Failed to generate facts for {errors} hosts. Check the logs above"
)
f"Failed to generate facts for {len(errors)} hosts. Check the logs above"
) from errors[0]
if not was_regenerated:
print("All secrets and facts are already up to date")

View File

@@ -10,16 +10,18 @@ class FactStoreBase(ABC):
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
def exists(self, service: str, name: str, shared: bool = False) -> bool:
pass
@abstractmethod
def set(self, service: str, name: str, value: bytes) -> Path | None:
def set(
self, service: str, name: str, value: bytes, shared: bool = False
) -> Path | None:
pass
# get a single fact
@abstractmethod
def get(self, service: str, name: str) -> bytes:
def get(self, service: str, name: str, shared: bool = False) -> bytes:
pass
# get all facts

View File

@@ -10,17 +10,22 @@ class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
self.per_machine_folder = (
self.machine.flake_dir / "vars" / "per-machine" / self.machine.name
)
self.shared_folder = self.machine.flake_dir / "vars" / "shared"
def set(self, generator_name: str, name: str, value: bytes) -> Path | None:
def _var_path(self, generator_name: str, name: str, shared: bool) -> Path:
if shared:
return self.shared_folder / generator_name / name
else:
return self.per_machine_folder / generator_name / name
def set(
self, generator_name: str, name: str, value: bytes, shared: bool = False
) -> Path | None:
if self.machine.flake.is_local():
fact_path = (
self.machine.flake.path
/ "machines"
/ self.machine.name
/ "vars"
/ generator_name
/ name
)
fact_path = self._var_path(generator_name, name, shared)
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.touch()
fact_path.write_bytes(value)
@@ -30,35 +35,21 @@ class FactStore(FactStoreBase):
f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
)
def exists(self, generator_name: str, name: str) -> bool:
fact_path = (
self.machine.flake_dir
/ "machines"
/ self.machine.name
/ "vars"
/ generator_name
/ name
)
return fact_path.exists()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
return self._var_path(generator_name, name, shared).exists()
# get a single fact
def get(self, generator_name: str, name: str) -> bytes:
fact_path = (
self.machine.flake_dir
/ "machines"
/ self.machine.name
/ "vars"
/ generator_name
/ name
)
return fact_path.read_bytes()
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:
return self._var_path(generator_name, name, shared).read_bytes()
# get all public vars
def get_all(self) -> dict[str, dict[str, bytes]]:
facts_folder = self.machine.flake_dir / "machines" / self.machine.name / "vars"
facts: dict[str, dict[str, bytes]] = {}
facts["TODO"] = {}
if facts_folder.exists():
for fact_path in facts_folder.iterdir():
if self.per_machine_folder.exists():
for fact_path in self.per_machine_folder.iterdir():
facts["TODO"][fact_path.name] = fact_path.read_bytes()
if self.shared_folder.exists():
for fact_path in self.shared_folder.iterdir():
facts["TODO"][fact_path.name] = fact_path.read_bytes()
return facts

View File

@@ -17,18 +17,20 @@ class FactStore(FactStoreBase):
self.dir = vm_state_dir(str(machine.flake), machine.name) / "facts"
log.debug(f"FactStore initialized with dir {self.dir}")
def exists(self, service: str, name: str) -> bool:
def exists(self, service: str, name: str, shared: bool = False) -> bool:
fact_path = self.dir / service / name
return fact_path.exists()
def set(self, service: str, name: str, value: bytes) -> Path | None:
def set(
self, service: str, name: str, value: bytes, shared: bool = False
) -> Path | None:
fact_path = self.dir / service / name
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.write_bytes(value)
return None
# get a single fact
def get(self, service: str, name: str) -> bytes:
def get(self, service: str, name: str, shared: bool = False) -> bytes:
fact_path = self.dir / service / name
if fact_path.exists():
return fact_path.read_bytes()

View File

@@ -11,16 +11,21 @@ class SecretStoreBase(ABC):
@abstractmethod
def set(
self, service: str, name: str, value: bytes, groups: list[str]
self,
service: str,
name: str,
value: bytes,
groups: list[str],
shared: bool = False,
) -> Path | None:
pass
@abstractmethod
def get(self, service: str, name: str) -> bytes:
def get(self, service: str, name: str, shared: bool = False) -> bytes:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
def exists(self, service: str, name: str, shared: bool = False) -> bool:
pass
def update_check(self) -> bool:

View File

@@ -12,8 +12,25 @@ class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
@property
def _password_store_dir(self) -> str:
return os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
def _var_path(self, generator_name: str, name: str, shared: bool) -> Path:
if shared:
return Path(f"shared/{generator_name}/{name}")
else:
return Path(f"machines/{self.machine.name}/{generator_name}/{name}")
def set(
self, generator_name: str, name: str, value: bytes, groups: list[str]
self,
generator_name: str,
name: str,
value: bytes,
groups: list[str],
shared: bool = False,
) -> Path | None:
subprocess.run(
nix_shell(
@@ -22,7 +39,7 @@ class SecretStore(SecretStoreBase):
"pass",
"insert",
"-m",
f"machines/{self.machine.name}/{generator_name}/{name}",
str(self._var_path(generator_name, name, shared)),
],
),
input=value,
@@ -30,34 +47,28 @@ class SecretStore(SecretStoreBase):
)
return None # we manage the files outside of the git repo
def get(self, generator_name: str, name: str) -> bytes:
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
[
"pass",
"show",
f"machines/{self.machine.name}/{generator_name}/{name}",
str(self._var_path(generator_name, name, shared)),
],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, generator_name: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = (
Path(password_store)
/ f"machines/{self.machine.name}/{generator_name}/{name}.gpg"
)
return secret_path.exists()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
return (
Path(self._password_store_dir)
/ f"{self._var_path(generator_name, name, shared)}.gpg"
).exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
password_store = self._password_store_dir
hashes = []
hashes.append(
subprocess.run(
@@ -117,15 +128,17 @@ class SecretStore(SecretStoreBase):
return local_hash.decode() == remote_hash
# TODO: fixme
def upload(self, output_dir: Path) -> None:
for service in self.machine.facts_data:
for secret in self.machine.facts_data[service]["secret"]:
if isinstance(secret, dict):
secret_name = secret["name"]
else:
# TODO: drop old format soon
secret_name = secret
with (output_dir / secret_name).open("wb") as f:
f.chmod(0o600)
f.write(self.get(service, secret_name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())
pass
# for service in self.machine.facts_data:
# for secret in self.machine.facts_data[service]["secret"]:
# if isinstance(secret, dict):
# secret_name = secret["name"]
# else:
# # TODO: drop old format soon
# secret_name = secret
# with (output_dir / secret_name).open("wb") as f:
# f.chmod(0o600)
# f.write(self.get(service, secret_name))
# (output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -36,20 +36,30 @@ class SecretStore(SecretStoreBase):
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def secret_path(self, generator_name: str, secret_name: str) -> Path:
return (
self.machine.flake_dir
/ "sops"
/ "vars"
/ self.machine.name
/ generator_name
/ secret_name
)
def secret_path(
self, generator_name: str, secret_name: str, shared: bool = False
) -> Path:
if shared:
base_path = self.machine.flake_dir / "sops" / "vars" / "shared"
else:
base_path = (
self.machine.flake_dir
/ "sops"
/ "vars"
/ "per-machine"
/ self.machine.name
)
return base_path / generator_name / secret_name
def set(
self, generator_name: str, name: str, value: bytes, groups: list[str]
self,
generator_name: str,
name: str,
value: bytes,
groups: list[str],
shared: bool = False,
) -> Path | None:
path = self.secret_path(generator_name, name)
path = self.secret_path(generator_name, name, shared)
encrypt_secret(
self.machine.flake_dir,
path,
@@ -59,14 +69,14 @@ class SecretStore(SecretStoreBase):
)
return path
def get(self, generator_name: str, name: str) -> bytes:
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:
return decrypt_secret(
self.machine.flake_dir, self.secret_path(generator_name, name)
self.machine.flake_dir, self.secret_path(generator_name, name, shared)
).encode("utf-8")
def exists(self, generator_name: str, name: str) -> bool:
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
return has_secret(
self.secret_path(generator_name, name),
self.secret_path(generator_name, name, shared),
)
def upload(self, output_dir: Path) -> None:

View File

@@ -15,18 +15,23 @@ class SecretStore(SecretStoreBase):
self.dir.mkdir(parents=True, exist_ok=True)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
self,
service: str,
name: str,
value: bytes,
groups: list[str],
shared: bool = False,
) -> Path | None:
secret_file = self.dir / service / name
secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
def get(self, service: str, name: str, shared: bool = False) -> bytes:
secret_file = self.dir / service / name
return secret_file.read_bytes()
def exists(self, service: str, name: str) -> bool:
def exists(self, service: str, name: str, shared: bool = False) -> bool:
return (self.dir / service / name).exists()
def upload(self, output_dir: Path) -> None:

View File

@@ -13,14 +13,14 @@ log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
secret_store_module = importlib.import_module(machine.secret_facts_module)
secret_store = secret_store_module.SecretStore(machine=machine)
if secret_facts_store.update_check():
if secret_store.update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
secret_facts_store.upload(Path(tempdir))
secret_store.upload(Path(tempdir))
host = machine.target_host
ssh_cmd = host.ssh_cmd()
@@ -31,9 +31,11 @@ def upload_secrets(machine: Machine) -> None:
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
"-az",
"--recursive",
"--links",
"--times",
"--compress",
"--delete",
"--chown=root:root",
"--chmod=D700,F600",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",

View File

@@ -94,6 +94,7 @@ def qemu_command(
virtiofsd_socket: Path,
qmp_socket_file: Path,
qga_socket_file: Path,
portmap: list[tuple[int, int]] = [],
) -> QemuCommand:
kernel_cmdline = [
(Path(nixos_config["toplevel"]) / "kernel-params").read_text(),
@@ -103,6 +104,7 @@ def qemu_command(
]
if not vm.waypipe:
kernel_cmdline.append("console=tty0")
hostfwd = ",".join(f"hostfwd=tcp::{h}-:{g}" for h, g in portmap)
# fmt: off
command = [
"qemu-kvm",
@@ -116,7 +118,7 @@ def qemu_command(
# speed-up boot by not waiting for the boot menu
"-boot", "menu=off,strict=on",
"-device", "virtio-rng-pci",
"-netdev", "user,id=user.0",
"-netdev", f"user,id=user.0,{hostfwd}",
"-device", "virtio-net-pci,netdev=user.0,romfile=",
"-chardev", f"socket,id=char1,path={virtiofsd_socket}",
"-device", "vhost-user-fs-pci,chardev=char1,tag=nix-store",

View File

@@ -108,6 +108,7 @@ def run_vm(
cachedir: Path | None = None,
socketdir: Path | None = None,
nix_options: list[str] = [],
portmap: list[tuple[int, int]] = [],
) -> None:
with ExitStack() as stack:
machine = Machine(name=vm.machine_name, flake=vm.flake_url)
@@ -168,6 +169,7 @@ def run_vm(
virtiofsd_socket=virtiofsd_socket,
qmp_socket_file=qmp_socket_file,
qga_socket_file=qga_socket_file,
portmap=portmap,
)
packages = ["nixpkgs#qemu"]
@@ -199,7 +201,9 @@ def run_command(
vm: VmConfig = inspect_vm(machine=machine_obj)
run_vm(vm, nix_options=args.option)
portmap = [(h, g) for h, g in (p.split(":") for p in args.publish)]
run_vm(vm, nix_options=args.option, portmap=portmap)
def register_run_parser(parser: argparse.ArgumentParser) -> None:
@@ -207,4 +211,13 @@ def register_run_parser(parser: argparse.ArgumentParser) -> None:
"machine", type=str, help="machine in the flake to run"
)
add_dynamic_completer(machine_action, complete_machines)
# option: --publish 2222:22
parser.add_argument(
"--publish",
"-p",
action="append",
type=str,
default=[],
help="Forward ports from host to guest",
)
parser.set_defaults(func=lambda args: run_command(args))

View File

@@ -17,8 +17,6 @@
setuptools,
stdenv,
pydantic,
# custom args
clan-core-path,
nixpkgs,
@@ -30,7 +28,6 @@
let
pythonDependencies = [
argcomplete # Enables shell completions
pydantic # Dataclass deserialisation / validation / schemas
];
# load nixpkgs runtime dependencies from a json file
@@ -63,9 +60,7 @@ let
source = runCommand "clan-cli-source" { } ''
cp -r ${./.} $out
chmod -R +w $out
rm $out/clan_cli/config/jsonschema
ln -sf ${nixpkgs'} $out/clan_cli/nixpkgs
cp -r ${../../lib/jsonschema} $out/clan_cli/config/jsonschema
cp -r ${../../templates} $out/clan_cli/templates
${classgen}/bin/classgen ${inventory-schema}/schema.json $out/clan_cli/inventory/classes.py

View File

@@ -1,4 +1,8 @@
import os
from pathlib import Path
import pytest
from helpers import cli
class KeyPair:
@@ -11,6 +15,22 @@ class SopsSetup:
def __init__(self, keys: list[KeyPair]) -> None:
self.keys = keys
def init(self, flake_path: Path | None = None) -> None:
if flake_path is None:
flake_path = Path.cwd()
self.user = os.environ.get("USER", "user")
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake_path),
self.user,
self.keys[0].pubkey,
]
)
KEYS = [
KeyPair(

View File

@@ -15,6 +15,7 @@ pytest_plugins = [
"ports",
"host_group",
"fixtures_flakes",
"stdout",
]

View File

@@ -1,23 +1,23 @@
secret-key: ENC[AES256_GCM,data:gjX4OmCUdd3TlA4p,iv:3yZVpyd6FqkITQY0nU2M1iubmzvkR6PfkK2m/s6nQh8=,tag:Abgp9xkiFFylZIyAlap6Ew==,type:str]
nested:
secret-key: ENC[AES256_GCM,data:iUMgDhhIjwvd7wL4,iv:jiJIrh12dSu/sXX+z9ITVoEMNDMjwIlFBnyv40oN4LE=,tag:G9VmAa66Km1sc7JEhW5AvA==,type:str]
secret-key: ENC[AES256_GCM,data:iUMgDhhIjwvd7wL4,iv:jiJIrh12dSu/sXX+z9ITVoEMNDMjwIlFBnyv40oN4LE=,tag:G9VmAa66Km1sc7JEhW5AvA==,type:str]
sops:
kms: []
gcp_kms: []
azure_kv: []
hc_vault: []
age:
- recipient: age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62
enc: |
-----BEGIN AGE ENCRYPTED FILE-----
YWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSA0eWdRVjlydXlXOVZFQ3lO
bzU1eG9Iam5Ka29Sdlo0cHJ4b1R6bjdNSzBjCkgwRndCbWZQWHlDU0x1cWRmaGVt
N29lbjR6UjN0L2RhaXEzSG9zQmRsZGsKLS0tIEdsdWgxSmZwU3BWUDVxVWRSSC9M
eVZ6bjgwZnR2TTM5MkRYZWNFSFplQWsKmSzv12/dftL9jx2y35UZUGVK6xWdatE8
BGJiCvMlp0BQNrh2s/+YaEaBa48w8LL79U/XJnEZ+ZUwxmlbSTn6Hg==
-----END AGE ENCRYPTED FILE-----
lastmodified: "2023-08-08T14:27:20Z"
mac: ENC[AES256_GCM,data:iRWWX+L5Q5nKn3fBCLaWoz/mvqGnNnRd93gJmYXDZbRjFoHa9IFJZst5QDIDa1ZRYUe6G0/+lV5SBi+vwRm1pHysJ3c0ZWYjBP+e1jw3jLXxLV5gACsDC8by+6rFUCho0Xgu+Nqu2ehhNenjQQnCvDH5ivWbW70KFT5ynNgR9Tw=,iv:RYnnbLMC/hNfMwWPreMq9uvY0khajwQTZENO/P34ckY=,tag:Xi1PS5vM1c+sRkroHkPn1Q==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3
kms: []
gcp_kms: []
azure_kv: []
hc_vault: []
age:
- recipient: age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62
enc: |
-----BEGIN AGE ENCRYPTED FILE-----
YWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSA0eWdRVjlydXlXOVZFQ3lO
bzU1eG9Iam5Ka29Sdlo0cHJ4b1R6bjdNSzBjCkgwRndCbWZQWHlDU0x1cWRmaGVt
N29lbjR6UjN0L2RhaXEzSG9zQmRsZGsKLS0tIEdsdWgxSmZwU3BWUDVxVWRSSC9M
eVZ6bjgwZnR2TTM5MkRYZWNFSFplQWsKmSzv12/dftL9jx2y35UZUGVK6xWdatE8
BGJiCvMlp0BQNrh2s/+YaEaBa48w8LL79U/XJnEZ+ZUwxmlbSTn6Hg==
-----END AGE ENCRYPTED FILE-----
lastmodified: "2023-08-08T14:27:20Z"
mac: ENC[AES256_GCM,data:iRWWX+L5Q5nKn3fBCLaWoz/mvqGnNnRd93gJmYXDZbRjFoHa9IFJZst5QDIDa1ZRYUe6G0/+lV5SBi+vwRm1pHysJ3c0ZWYjBP+e1jw3jLXxLV5gACsDC8by+6rFUCho0Xgu+Nqu2ehhNenjQQnCvDH5ivWbW70KFT5ynNgR9Tw=,iv:RYnnbLMC/hNfMwWPreMq9uvY0khajwQTZENO/P34ckY=,tag:Xi1PS5vM1c+sRkroHkPn1Q==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3

View File

@@ -1,4 +1,3 @@
import fileinput
import json
import logging
import os
@@ -25,20 +24,26 @@ def substitute(
flake: Path = Path(__file__).parent,
) -> None:
sops_key = str(flake.joinpath("sops.key"))
for line in fileinput.input(file, inplace=True):
line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
if clan_core_flake:
line = line.replace("__CLAN_CORE__", str(clan_core_flake))
line = line.replace(
"git+https://git.clan.lol/clan/clan-core", str(clan_core_flake)
)
line = line.replace(
"https://git.clan.lol/clan/clan-core/archive/main.tar.gz",
str(clan_core_flake),
)
line = line.replace("__CLAN_SOPS_KEY_PATH__", sops_key)
line = line.replace("__CLAN_SOPS_KEY_DIR__", str(flake))
print(line, end="")
buf = ""
with file.open() as f:
for line in f:
line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
if clan_core_flake:
line = line.replace("__CLAN_CORE__", str(clan_core_flake))
line = line.replace(
"git+https://git.clan.lol/clan/clan-core", str(clan_core_flake)
)
line = line.replace(
"https://git.clan.lol/clan/clan-core/archive/main.tar.gz",
str(clan_core_flake),
)
line = line.replace("__CLAN_SOPS_KEY_PATH__", sops_key)
line = line.replace("__CLAN_SOPS_KEY_DIR__", str(flake))
buf += line
print(f"file: {file}")
print(f"clan_core: {clan_core_flake}")
print(f"flake: {flake}")
file.write_text(buf)
class FlakeForTest(NamedTuple):
@@ -91,10 +96,13 @@ def generate_flake(
for file in flake.rglob("*"):
if file.is_file():
print(f"Final Content of {file}:")
for line in fileinput.input(file, inplace=True):
for key, value in substitutions.items():
line = line.replace(key, value)
print(line, end="")
buf = ""
with file.open() as f:
for line in f:
for key, value in substitutions.items():
line = line.replace(key, value)
buf += line
file.write_text(buf)
# generate machines from machineConfigs
for machine_name, machine_config in machine_configs.items():

View File

@@ -0,0 +1,11 @@
from collections import defaultdict
from collections.abc import Callable
from typing import Any
def def_value() -> defaultdict:
return defaultdict(def_value)
# allows defining nested dictionary in a single line
nested_dict: Callable[[], dict[str, Any]] = lambda: defaultdict(def_value)

View File

@@ -0,0 +1,95 @@
import contextlib
import os
import socket
import sys
import threading
import traceback
from pathlib import Path
from time import sleep
from clan_cli.dirs import vm_state_dir
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from . import cli
def find_free_port() -> int:
"""Find an unused localhost port from 1024-65535 and return it."""
with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
def run_vm_in_thread(machine_name: str, ssh_port: int | None = None) -> int:
# runs machine and prints exceptions
if ssh_port is None:
ssh_port = find_free_port()
def run() -> None:
try:
cli.run(["vms", "run", machine_name, "--publish", f"{ssh_port}:22"])
except Exception:
# print exception details
print(traceback.format_exc(), file=sys.stderr)
print(sys.exc_info()[2], file=sys.stderr)
# run the machine in a separate thread
t = threading.Thread(target=run, name="run")
t.daemon = True
t.start()
return ssh_port
# wait for qmp socket to exist
def wait_vm_up(machine_name: str, flake_url: str | None = None) -> None:
if flake_url is None:
flake_url = str(Path.cwd())
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
timeout: float = 600
while True:
if timeout <= 0:
raise TimeoutError(
f"qmp socket {socket_file} not found. Is the VM running?"
)
if socket_file.exists():
break
sleep(0.1)
timeout -= 0.1
# wait for vm to be down by checking if qmp socket is down
def wait_vm_down(machine_name: str, flake_url: str | None = None) -> None:
if flake_url is None:
flake_url = str(Path.cwd())
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
timeout: float = 300
while socket_file.exists():
if timeout <= 0:
raise TimeoutError(
f"qmp socket {socket_file} still exists. Is the VM down?"
)
sleep(0.1)
timeout -= 0.1
# wait for vm to be up then connect and return qmp instance
def qmp_connect(machine_name: str, flake_url: str | None = None) -> QEMUMonitorProtocol:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, flake_url)
qmp = QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
)
qmp.connect()
return qmp
# wait for vm to be up then connect and return qga instance
def qga_connect(machine_name: str, flake_url: str | None = None) -> QgaSession:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, flake_url)
return QgaSession(os.path.realpath(state_dir / "qga.sock"))

View File

@@ -0,0 +1,23 @@
from typing import Any
import pytest
from pytest import CaptureFixture
class CaptureOutput:
def __init__(self, capsys: CaptureFixture) -> None:
self.capsys = capsys
def __enter__(self) -> "CaptureOutput":
self.capsys.readouterr()
return self
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> bool:
res = self.capsys.readouterr()
self.out = res.out
self.err = res.err
@pytest.fixture
def capture_output(capsys: CaptureFixture) -> CaptureOutput:
return CaptureOutput(capsys)

View File

@@ -1,9 +1,9 @@
import pytest
from helpers import cli
from stdout import CaptureOutput
def test_help(capsys: pytest.CaptureFixture) -> None:
with pytest.raises(SystemExit):
def test_help(capture_output: CaptureOutput) -> None:
with capture_output as output, pytest.raises(SystemExit):
cli.run(["--help"])
captured = capsys.readouterr()
assert captured.out.startswith("usage:")
assert output.out.startswith("usage:")

View File

@@ -1,8 +1,6 @@
from pathlib import Path
import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from clan_cli import config
from clan_cli.config import parsing
@@ -11,28 +9,6 @@ from clan_cli.errors import ClanError
example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
def test_configure_machine(
test_flake: FlakeForTest,
capsys: pytest.CaptureFixture,
) -> None:
# clear the output buffer
capsys.readouterr()
# read a option value
cli.run(
[
"config",
"--flake",
str(test_flake.path),
"-m",
"machine1",
"clan.jitsi.enable",
]
)
# read the output
assert capsys.readouterr().out == "false\n"
def test_walk_jsonschema_all_types() -> None:
schema = dict(
type="object",

View File

@@ -5,14 +5,15 @@ from pathlib import Path
import pytest
from fixtures_flakes import substitute
from helpers import cli
from stdout import CaptureOutput
@pytest.mark.impure
def test_create_flake(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture,
temporary_home: Path,
clan_core: Path,
capture_output: CaptureOutput,
) -> None:
flake_dir = temporary_home / "test-flake"
@@ -29,7 +30,6 @@ def test_create_flake(
monkeypatch.chdir(flake_dir)
cli.run(["machines", "create", "machine1"])
capsys.readouterr() # flush cache
# create a hardware-configuration.nix that doesn't throw an eval error
@@ -39,8 +39,9 @@ def test_create_flake(
) as hw_config_nix:
hw_config_nix.write("{}")
cli.run(["machines", "list"])
assert "machine1" in capsys.readouterr().out
with capture_output as output:
cli.run(["machines", "list"])
assert "machine1" in output.out
flake_show = subprocess.run(
["nix", "flake", "show", "--json"],
check=True,
@@ -57,9 +58,9 @@ def test_create_flake(
@pytest.mark.impure
def test_ui_template(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture,
temporary_home: Path,
clan_core: Path,
capture_output: CaptureOutput,
) -> None:
flake_dir = temporary_home / "test-flake"
url = f"{clan_core}#minimal"
@@ -73,10 +74,10 @@ def test_ui_template(
monkeypatch.chdir(flake_dir)
cli.run(["machines", "create", "machine1"])
capsys.readouterr() # flush cache
cli.run(["machines", "list"])
assert "machine1" in capsys.readouterr().out
with capture_output as output:
cli.run(["machines", "list"])
assert "machine1" in output.out
flake_show = subprocess.run(
["nix", "flake", "show", "--json"],
check=True,

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
import pytest
@@ -18,6 +19,7 @@ from clan_cli.inventory import (
ServiceBorgbackupRoleServer,
ServiceMeta,
)
from clan_cli.machines import machines
def test_simple() -> None:
@@ -45,11 +47,11 @@ def test_nested() -> None:
class Person:
name: str
# deeply nested dataclasses
home: Path | str | None
age: Age
age_list: list[Age]
age_dict: dict[str, Age]
# Optional field
home: Path | None
person_dict = {
"name": "John",
@@ -72,6 +74,55 @@ def test_nested() -> None:
assert from_dict(Person, person_dict) == expected_person
def test_nested_nullable() -> None:
@dataclass
class SystemConfig:
language: str | None = field(default=None)
keymap: str | None = field(default=None)
ssh_keys_path: list[str] | None = field(default=None)
@dataclass
class FlashOptions:
machine: machines.Machine
mode: str
disks: dict[str, str]
system_config: SystemConfig
dry_run: bool
write_efi_boot_entries: bool
debug: bool
data = {
"machine": {
"name": "flash-installer",
"flake": {"loc": "git+https://git.clan.lol/clan/clan-core"},
},
"mode": "format",
"disks": {"main": "/dev/sda"},
"system_config": {"language": "en_US.UTF-8", "keymap": "en"},
"dry_run": False,
"write_efi_boot_entries": False,
"debug": False,
"op_key": "jWnTSHwYhSgr7Qz3u4ppD",
}
expected = FlashOptions(
machine=machines.Machine(
name="flash-installer",
flake=machines.FlakeId("git+https://git.clan.lol/clan/clan-core"),
),
mode="format",
disks={"main": "/dev/sda"},
system_config=SystemConfig(
language="en_US.UTF-8", keymap="en", ssh_keys_path=None
),
dry_run=False,
write_efi_boot_entries=False,
debug=False,
)
assert from_dict(FlashOptions, data) == expected
def test_simple_field_missing() -> None:
@dataclass
class Person:
@@ -83,6 +134,44 @@ def test_simple_field_missing() -> None:
from_dict(Person, person_dict)
def test_nullable() -> None:
@dataclass
class Person:
name: None
person_dict = {
"name": None,
}
from_dict(Person, person_dict)
def test_nullable_non_exist() -> None:
@dataclass
class Person:
name: None
person_dict = {}
with pytest.raises(ClanError):
from_dict(Person, person_dict)
def test_list() -> None:
data = [
{"name": "John"},
{"name": "Sarah"},
]
@dataclass
class Name:
name: str
result = from_dict(list[Name], data)
assert result == [Name("John"), Name("Sarah")]
def test_deserialize_extensive_inventory() -> None:
# TODO: Make this an abstract test, so it doesn't break the test if the inventory changes
data = {
@@ -177,3 +266,19 @@ def test_private_public_fields() -> None:
assert from_dict(Person, data) == expected
assert dataclass_to_dict(expected) == data
def test_literal_field() -> None:
@dataclass
class Person:
name: Literal["open_file", "select_folder", "save"]
data = {"name": "open_file"}
expected = Person(name="open_file")
assert from_dict(Person, data) == expected
assert dataclass_to_dict(expected) == data
with pytest.raises(ClanError):
# Not a valid value
from_dict(Person, {"name": "open"})

View File

@@ -3,6 +3,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from stdout import CaptureOutput
if TYPE_CHECKING:
pass
@@ -10,18 +11,17 @@ if TYPE_CHECKING:
@pytest.mark.impure
def test_flakes_inspect(
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
test_flake_with_core: FlakeForTest, capture_output: CaptureOutput
) -> None:
cli.run(
[
"flakes",
"inspect",
"--flake",
str(test_flake_with_core.path),
"--machine",
"vm1",
]
)
out = capsys.readouterr() # empty the buffer
assert "Icon" in out.out
with capture_output as output:
cli.run(
[
"flakes",
"inspect",
"--flake",
str(test_flake_with_core.path),
"--machine",
"vm1",
]
)
assert "Icon" in output.out

View File

@@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from pytest import CaptureFixture
from stdout import CaptureOutput
from clan_cli.dirs import user_history_file
from clan_cli.history.add import HistoryEntry
@@ -32,17 +32,15 @@ def test_history_add(
@pytest.mark.impure
def test_history_list(
capsys: CaptureFixture,
capture_output: CaptureOutput,
test_flake_with_core: FlakeForTest,
) -> None:
cmd = [
"history",
"list",
]
cli.run(cmd)
assert str(test_flake_with_core.path) not in capsys.readouterr().out
with capture_output as output:
cli.run(["history", "list"])
assert str(test_flake_with_core.path) not in output.out
cli.run(["history", "add", f"clan://{test_flake_with_core.path}#vm1"])
cli.run(cmd)
assert str(test_flake_with_core.path) in capsys.readouterr().out
with capture_output as output:
cli.run(["history", "list"])
assert str(test_flake_with_core.path) in output.out

View File

@@ -4,6 +4,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from stdout import CaptureOutput
if TYPE_CHECKING:
from age_keys import KeyPair
@@ -12,7 +13,7 @@ if TYPE_CHECKING:
def test_import_sops(
test_root: Path,
test_flake: FlakeForTest,
capsys: pytest.CaptureFixture,
capture_output: CaptureOutput,
monkeypatch: pytest.MonkeyPatch,
age_keys: list["KeyPair"],
) -> None:
@@ -88,11 +89,11 @@ def test_import_sops(
]
cli.run(cmd)
capsys.readouterr()
cli.run(["secrets", "users", "list", "--flake", str(test_flake.path)])
users = sorted(capsys.readouterr().out.rstrip().split())
with capture_output as output:
cli.run(["secrets", "users", "list", "--flake", str(test_flake.path)])
users = sorted(output.out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-key"])
assert capsys.readouterr().out == "secret-value"
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-key"])
assert output.out == "secret-value"

View File

@@ -1,40 +1,32 @@
import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from stdout import CaptureOutput
@pytest.mark.impure
def test_machine_subcommands(
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
test_flake_with_core: FlakeForTest,
capture_output: CaptureOutput,
) -> None:
cli.run(
["machines", "create", "--flake", str(test_flake_with_core.path), "machine1"]
)
capsys.readouterr()
cli.run(["machines", "list", "--flake", str(test_flake_with_core.path)])
with capture_output as output:
cli.run(["machines", "list", "--flake", str(test_flake_with_core.path)])
out = capsys.readouterr()
assert "machine1" in out.out
assert "vm1" in out.out
assert "vm2" in out.out
capsys.readouterr()
cli.run(["machines", "show", "--flake", str(test_flake_with_core.path), "machine1"])
out = capsys.readouterr()
assert "machine1" in out.out
assert "Description" in out.out
print(out)
print(output.out)
assert "machine1" in output.out
assert "vm1" in output.out
assert "vm2" in output.out
cli.run(
["machines", "delete", "--flake", str(test_flake_with_core.path), "machine1"]
)
capsys.readouterr()
cli.run(["machines", "list", "--flake", str(test_flake_with_core.path)])
out = capsys.readouterr()
assert "machine1" not in out.out
assert "vm1" in out.out
assert "vm2" in out.out
with capture_output as output:
cli.run(["machines", "list", "--flake", str(test_flake_with_core.path)])
assert "machine1" not in output.out
assert "vm1" in output.out
assert "vm2" in output.out

View File

@@ -1,48 +0,0 @@
import pytest
from fixtures_flakes import FlakeForTest
from clan_cli.clan_uri import FlakeId
from clan_cli.config.machine import (
config_for_machine,
set_config_for_machine,
verify_machine_config,
)
from clan_cli.config.schema import machine_schema
from clan_cli.inventory import Machine, MachineDeploy
from clan_cli.machines.create import create_machine
from clan_cli.machines.list import list_machines
@pytest.mark.with_core
def test_schema_for_machine(test_flake_with_core: FlakeForTest) -> None:
schema = machine_schema(test_flake_with_core.path, config={})
assert "properties" in schema
@pytest.mark.with_core
def test_create_machine_on_minimal_clan(test_flake_minimal: FlakeForTest) -> None:
assert list_machines(test_flake_minimal.path) == {}
create_machine(
FlakeId(test_flake_minimal.path),
Machine(
name="foo",
system="x86_64-linux",
description="A test machine",
tags=["test"],
icon=None,
deploy=MachineDeploy(),
),
)
result = list_machines(test_flake_minimal.path)
assert list(result.keys()) == ["foo"]
# Writes into settings.json
set_config_for_machine(
test_flake_minimal.path, "foo", dict(services=dict(openssh=dict(enable=True)))
)
config = config_for_machine(test_flake_minimal.path, "foo")
assert config["services"]["openssh"]["enable"]
assert verify_machine_config(test_flake_minimal.path, "foo") is None

View File

@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from stdout import CaptureOutput
from clan_cli.errors import ClanError
@@ -19,7 +20,7 @@ log = logging.getLogger(__name__)
def _test_identities(
what: str,
test_flake: FlakeForTest,
capsys: pytest.CaptureFixture,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
) -> None:
sops_folder = test_flake.path / "sops"
@@ -64,24 +65,22 @@ def _test_identities(
]
)
capsys.readouterr() # empty the buffer
cli.run(
[
"secrets",
what,
"get",
"--flake",
str(test_flake.path),
"foo",
]
)
out = capsys.readouterr() # empty the buffer
assert age_keys[1].pubkey in out.out
with capture_output as output:
cli.run(
[
"secrets",
what,
"get",
"--flake",
str(test_flake.path),
"foo",
]
)
assert age_keys[1].pubkey in output.out
capsys.readouterr() # empty the buffer
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
with capture_output as output:
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
assert "foo" in output.out
cli.run(["secrets", what, "remove", "--flake", str(test_flake.path), "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
@@ -89,30 +88,29 @@ def _test_identities(
with pytest.raises(ClanError): # already removed
cli.run(["secrets", what, "remove", "--flake", str(test_flake.path), "foo"])
capsys.readouterr()
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
out = capsys.readouterr()
assert "foo" not in out.out
with capture_output as output:
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
assert "foo" not in output.out
def test_users(
test_flake: FlakeForTest, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"]
) -> None:
_test_identities("users", test_flake, capsys, age_keys)
_test_identities("users", test_flake, capture_output, age_keys)
def test_machines(
test_flake: FlakeForTest, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"]
) -> None:
_test_identities("machines", test_flake, capsys, age_keys)
_test_identities("machines", test_flake, capture_output, age_keys)
def test_groups(
test_flake: FlakeForTest, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"]
) -> None:
capsys.readouterr() # empty the buffer
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
assert capsys.readouterr().out == ""
with capture_output as output:
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
assert output.out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(
@@ -197,9 +195,9 @@ def test_groups(
]
)
capsys.readouterr() # empty the buffer
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
out = capsys.readouterr().out
with capture_output as output:
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
out = output.out
assert "user1" in out
assert "machine1" in out
@@ -243,20 +241,20 @@ def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
def test_secrets(
test_flake: FlakeForTest,
capsys: pytest.CaptureFixture,
capture_output: CaptureOutput,
monkeypatch: pytest.MonkeyPatch,
age_keys: list["KeyPair"],
) -> None:
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert capsys.readouterr().out == ""
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == ""
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = capsys.readouterr().out
with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = output.out
assert key.startswith("age1")
cli.run(
["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key]
@@ -265,12 +263,12 @@ def test_secrets(
with pytest.raises(ClanError): # does not exist yet
cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"])
cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"])
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"])
assert capsys.readouterr().out == "foo"
capsys.readouterr()
cli.run(["secrets", "users", "list", "--flake", str(test_flake.path)])
users = capsys.readouterr().out.rstrip().split("\n")
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"])
assert output.out == "foo"
with capture_output as output:
cli.run(["secrets", "users", "list", "--flake", str(test_flake.path)])
users = output.out.rstrip().split("\n")
assert len(users) == 1, f"users: {users}"
owner = users[0]
@@ -280,17 +278,17 @@ def test_secrets(
cli.run(["secrets", "rename", "--flake", str(test_flake.path), "initialkey", "key"])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert capsys.readouterr().out == "key\n"
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == "key\n"
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", "--flake", str(test_flake.path), "nonexisting"])
assert capsys.readouterr().out == ""
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path), "nonexisting"])
assert output.out == ""
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"])
assert capsys.readouterr().out == "key\n"
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"])
assert output.out == "key\n"
cli.run(
[
@@ -314,15 +312,14 @@ def test_secrets(
"key",
]
)
capsys.readouterr()
cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)])
assert capsys.readouterr().out == "machine1\n"
with capture_output as output:
cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)])
assert output.out == "machine1\n"
with use_key(age_keys[1].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert capsys.readouterr().out == "foo"
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
# rotate machines key
cli.run(
@@ -340,10 +337,9 @@ def test_secrets(
# should also rotate the encrypted secret
with use_key(age_keys[0].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert capsys.readouterr().out == "foo"
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
cli.run(
[
@@ -379,10 +375,9 @@ def test_secrets(
"key",
]
)
capsys.readouterr()
with use_key(age_keys[1].privkey, monkeypatch):
with capture_output as output, use_key(age_keys[1].privkey, monkeypatch):
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert capsys.readouterr().out == "foo"
assert output.out == "foo"
cli.run(
[
"secrets",
@@ -441,7 +436,6 @@ def test_secrets(
]
)
capsys.readouterr() # empty the buffer
cli.run(
[
"secrets",
@@ -455,9 +449,9 @@ def test_secrets(
)
with use_key(age_keys[1].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert capsys.readouterr().out == "foo"
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
# extend group will update secrets
cli.run(
@@ -484,9 +478,9 @@ def test_secrets(
)
with use_key(age_keys[2].privkey, monkeypatch): # user2
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert capsys.readouterr().out == "foo"
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
cli.run(
[
@@ -501,9 +495,9 @@ def test_secrets(
)
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
# user2 is not in the group anymore
capsys.readouterr()
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
print(capsys.readouterr().out)
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
print(output.out)
cli.run(
[
@@ -520,6 +514,6 @@ def test_secrets(
cli.run(["secrets", "remove", "--flake", str(test_flake.path), "key"])
cli.run(["secrets", "remove", "--flake", str(test_flake.path), "key2"])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert capsys.readouterr().out == ""
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == ""

View File

@@ -104,3 +104,23 @@ def test_dataclass_to_dict_defaults() -> None:
"foo": {"home": {"a": "b"}, "work": ["a", "b"]},
}
assert dataclass_to_dict(real_person) == expected
def test_filters_null_fields() -> None:
@dataclass
class Foo:
home: str | None = None
work: str | None = None
# None fields are filtered out
instance = Foo()
assert instance.home is None
assert dataclass_to_dict(instance) == {}
# fields that are set are not filtered
instance = Foo(home="home")
assert instance.home == "home"
assert instance.work is None
assert dataclass_to_dict(instance) == {"home": "home"}

View File

@@ -4,19 +4,20 @@ import sys
import pytest
import pytest_subprocess.fake_process
from pytest_subprocess import utils
from stdout import CaptureOutput
import clan_cli
from clan_cli.ssh import cli
def test_no_args(
capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch
monkeypatch: pytest.MonkeyPatch,
capture_output: CaptureOutput,
) -> None:
monkeypatch.setattr(sys, "argv", ["", "ssh"])
with pytest.raises(SystemExit):
with capture_output as output, pytest.raises(SystemExit):
clan_cli.main()
captured = capsys.readouterr()
assert captured.err.startswith("usage:")
assert output.err.startswith("usage:")
# using fp fixture from pytest-subprocess

View File

@@ -1,30 +1,20 @@
import os
import subprocess
from collections import defaultdict
from collections.abc import Callable
from io import StringIO
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
import pytest
from age_keys import SopsSetup
from fixtures_flakes import generate_flake
from helpers import cli
from root import CLAN_CORE
from clan_cli.clan_uri import FlakeId
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
from clan_cli.vars.public_modules import in_repo
from clan_cli.vars.secret_modules import password_store, sops
def def_value() -> defaultdict:
return defaultdict(def_value)
# allows defining nested dictionary in a single line
nested_dict: Callable[[], dict[str, Any]] = lambda: defaultdict(def_value)
from tests.age_keys import SopsSetup
from tests.fixtures_flakes import generate_flake
from tests.helpers import cli
from tests.helpers.nixos_config import nested_dict
from tests.root import CLAN_CORE
def test_get_subgraph() -> None:
@@ -89,11 +79,9 @@ def test_generate_public_var(
)
monkeypatch.chdir(flake.path)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_value"
)
assert var_file_path.is_file()
assert var_file_path.read_text() == "hello\n"
store = in_repo.FactStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert store.exists("my_generator", "my_value")
assert store.get("my_generator", "my_value").decode() == "hello\n"
@pytest.mark.impure
@@ -102,7 +90,6 @@ def test_generate_secret_var_sops(
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
user = os.environ.get("USER", "user")
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True
@@ -113,22 +100,12 @@ def test_generate_secret_var_sops(
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
user,
sops_setup.keys[0].pubkey,
]
)
sops_setup.init()
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
assert not var_file_path.is_file()
assert not in_repo_store.exists("my_generator", "my_secret")
sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert sops_store.exists("my_generator", "my_secret")
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@@ -140,7 +117,6 @@ def test_generate_secret_var_sops_with_default_group(
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
user = os.environ.get("USER", "user")
config = nested_dict()
config["clan"]["core"]["sops"]["defaultGroups"] = ["my_group"]
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
@@ -152,34 +128,15 @@ def test_generate_secret_var_sops_with_default_group(
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
user,
sops_setup.keys[0].pubkey,
]
)
cli.run(["secrets", "groups", "add-user", "my_group", user])
sops_setup.init()
cli.run(["secrets", "groups", "add-user", "my_group", sops_setup.user])
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
assert not (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
).is_file()
in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
assert not in_repo_store.exists("my_generator", "my_secret")
sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert sops_store.exists("my_generator", "my_secret")
assert (
flake.path
/ "sops"
/ "vars"
/ "my_machine"
/ "my_generator"
/ "my_secret"
/ "groups"
/ "my_group"
).exists()
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@@ -226,10 +183,6 @@ def test_generate_secret_var_password_store(
nix_shell(["nixpkgs#pass"], ["pass", "init", "test@local"]), check=True
)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
)
assert not var_file_path.is_file()
store = password_store.SecretStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
@@ -243,7 +196,6 @@ def test_generate_secret_for_multiple_machines(
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
user = os.environ.get("USER", "user")
machine1_config = nested_dict()
machine1_generator = machine1_config["clan"]["core"]["vars"]["generators"][
"my_generator"
@@ -268,29 +220,19 @@ def test_generate_secret_for_multiple_machines(
machine_configs=dict(machine1=machine1_config, machine2=machine2_config),
)
monkeypatch.chdir(flake.path)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
user,
sops_setup.keys[0].pubkey,
]
)
sops_setup.init()
cli.run(["vars", "generate", "--flake", str(flake.path)])
# check if public vars have been created correctly
machine1_var_file_path = (
flake.path / "machines" / "machine1" / "vars" / "my_generator" / "my_value"
in_repo_store1 = in_repo.FactStore(
Machine(name="machine1", flake=FlakeId(flake.path))
)
machine2_var_file_path = (
flake.path / "machines" / "machine2" / "vars" / "my_generator" / "my_value"
in_repo_store2 = in_repo.FactStore(
Machine(name="machine2", flake=FlakeId(flake.path))
)
assert machine1_var_file_path.is_file()
assert machine1_var_file_path.read_text() == "machine1\n"
assert machine2_var_file_path.is_file()
assert machine2_var_file_path.read_text() == "machine2\n"
assert in_repo_store1.exists("my_generator", "my_value")
assert in_repo_store2.exists("my_generator", "my_value")
assert in_repo_store1.get("my_generator", "my_value").decode() == "machine1\n"
assert in_repo_store2.get("my_generator", "my_value").decode() == "machine2\n"
# check if secret vars have been created correctly
sops_store1 = sops.SecretStore(Machine(name="machine1", flake=FlakeId(flake.path)))
sops_store2 = sops.SecretStore(Machine(name="machine2", flake=FlakeId(flake.path)))
@@ -320,16 +262,13 @@ def test_dependant_generators(
)
monkeypatch.chdir(flake.path)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
parent_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "child_generator" / "my_value"
in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
assert parent_file_path.is_file()
assert parent_file_path.read_text() == "hello\n"
child_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "child_generator" / "my_value"
)
assert child_file_path.is_file()
assert child_file_path.read_text() == "hello\n"
assert in_repo_store.exists("parent_generator", "my_value")
assert in_repo_store.get("parent_generator", "my_value").decode() == "hello\n"
assert in_repo_store.exists("child_generator", "my_value")
assert in_repo_store.get("child_generator", "my_value").decode() == "hello\n"
@pytest.mark.impure
@@ -362,8 +301,55 @@ def test_prompt(
monkeypatch.chdir(flake.path)
monkeypatch.setattr("sys.stdin", StringIO(input_value))
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_value"
in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
assert var_file_path.is_file()
assert var_file_path.read_text() == input_value
assert in_repo_store.exists("my_generator", "my_value")
assert in_repo_store.get("my_generator", "my_value").decode() == input_value
@pytest.mark.impure
def test_share_flag(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
shared_generator = config["clan"]["core"]["vars"]["generators"]["shared_generator"]
shared_generator["files"]["my_secret"]["secret"] = True
shared_generator["files"]["my_value"]["secret"] = False
shared_generator["script"] = (
"echo hello > $out/my_secret && echo hello > $out/my_value"
)
shared_generator["share"] = True
unshared_generator = config["clan"]["core"]["vars"]["generators"][
"unshared_generator"
]
unshared_generator["files"]["my_secret"]["secret"] = True
unshared_generator["files"]["my_value"]["secret"] = False
unshared_generator["script"] = (
"echo hello > $out/my_secret && echo hello > $out/my_value"
)
unshared_generator["share"] = False
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
# check secrets stored correctly
assert sops_store.exists("shared_generator", "my_secret", shared=True)
assert not sops_store.exists("shared_generator", "my_secret", shared=False)
assert sops_store.exists("unshared_generator", "my_secret", shared=False)
assert not sops_store.exists("unshared_generator", "my_secret", shared=True)
# check values stored correctly
assert in_repo_store.exists("shared_generator", "my_value", shared=True)
assert not in_repo_store.exists("shared_generator", "my_value", shared=False)
assert in_repo_store.exists("unshared_generator", "my_value", shared=False)
assert not in_repo_store.exists("unshared_generator", "my_value", shared=True)

View File

@@ -0,0 +1,42 @@
from pathlib import Path
import pytest
from tests.age_keys import SopsSetup
from tests.fixtures_flakes import generate_flake
from tests.helpers import cli
from tests.helpers.nixos_config import nested_dict
from tests.helpers.vms import qga_connect, run_vm_in_thread, wait_vm_down
from tests.root import CLAN_CORE
@pytest.mark.impure
def test_vm_deployment(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
config["clan"]["virtualisation"]["graphics"] = False
config["services"]["getty"]["autologinUser"] = "root"
config["services"]["openssh"]["enable"] = True
config["networking"]["firewall"]["enable"] = False
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True
my_generator["files"]["my_value"]["secret"] = False
my_generator["script"] = "echo hello > $out/my_secret && echo hello > $out/my_value"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["vars", "generate", "my_machine"])
run_vm_in_thread("my_machine")
qga = qga_connect("my_machine")
qga.run("ls /run/secrets/my_machine/my_generator/my_secret", check=True)
_, out, _ = qga.run("cat /run/secrets/my_machine/my_generator/my_secret")
assert out == "hello\n"
qga.exec_cmd("poweroff")
wait_vm_down("my_machine")

View File

@@ -1,93 +1,29 @@
import os
import sys
import threading
import traceback
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import FlakeForTest, generate_flake
from helpers import cli
from root import CLAN_CORE
from stdout import CaptureOutput
from clan_cli.dirs import vm_state_dir
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from tests.fixtures_flakes import FlakeForTest, generate_flake
from tests.helpers import cli
from tests.helpers.nixos_config import nested_dict
from tests.helpers.vms import qga_connect, qmp_connect, run_vm_in_thread, wait_vm_down
from tests.root import CLAN_CORE
if TYPE_CHECKING:
from age_keys import KeyPair
from tests.age_keys import KeyPair
no_kvm = not os.path.exists("/dev/kvm")
def run_vm_in_thread(machine_name: str) -> None:
# runs machine and prints exceptions
def run() -> None:
try:
cli.run(["vms", "run", machine_name])
except Exception:
# print exception details
print(traceback.format_exc(), file=sys.stderr)
print(sys.exc_info()[2], file=sys.stderr)
# run the machine in a separate thread
t = threading.Thread(target=run, name="run")
t.daemon = True
t.start()
# wait for qmp socket to exist
def wait_vm_up(state_dir: Path) -> None:
socket_file = state_dir / "qga.sock"
timeout: float = 100
while True:
if timeout <= 0:
raise TimeoutError(
f"qga socket {socket_file} not found. Is the VM running?"
)
if socket_file.exists():
break
sleep(0.1)
timeout -= 0.1
# wait for vm to be down by checking if qga socket is down
def wait_vm_down(state_dir: Path) -> None:
socket_file = state_dir / "qga.sock"
timeout: float = 300
while socket_file.exists():
if timeout <= 0:
raise TimeoutError(
f"qga socket {socket_file} still exists. Is the VM down?"
)
sleep(0.1)
timeout -= 0.1
# wait for vm to be up then connect and return qmp instance
def qmp_connect(state_dir: Path) -> QEMUMonitorProtocol:
wait_vm_up(state_dir)
qmp = QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
)
qmp.connect()
return qmp
# wait for vm to be up then connect and return qga instance
def qga_connect(state_dir: Path) -> QgaSession:
wait_vm_up(state_dir)
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
@pytest.mark.impure
def test_inspect(
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
test_flake_with_core: FlakeForTest, capture_output: CaptureOutput
) -> None:
cli.run(["vms", "inspect", "--flake", str(test_flake_with_core.path), "vm1"])
out = capsys.readouterr() # empty the buffer
assert "Cores" in out.out
with capture_output as output:
cli.run(["vms", "inspect", "--flake", str(test_flake_with_core.path), "vm1"])
assert "Cores" in output.out
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@@ -129,7 +65,7 @@ def test_vm_qmp(
# set up a simple clan flake
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "new-clan",
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(
my_machine=dict(
clan=dict(
@@ -144,14 +80,11 @@ def test_vm_qmp(
# 'clan vms run' must be executed from within the flake
monkeypatch.chdir(flake.path)
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
state_dir = vm_state_dir(str(flake.path), "my_machine")
# start the VM
run_vm_in_thread("my_machine")
# connect with qmp
qmp = qmp_connect(state_dir)
qmp = qmp_connect("my_machine")
# verify that issuing a command works
# result = qmp.cmd_obj({"execute": "query-status"})
@@ -169,121 +102,60 @@ def test_vm_persistence(
temporary_home: Path,
) -> None:
# set up a clan flake with some systemd services to test persistence
config = nested_dict()
# logrotate-checkconf doesn't work in VM because /nix/store is owned by nobody
config["my_machine"]["systemd"]["services"]["logrotate-checkconf"]["enable"] = False
config["my_machine"]["services"]["getty"]["autologinUser"] = "root"
config["my_machine"]["clan"]["virtualisation"] = {"graphics": False}
config["my_machine"]["clan"]["networking"] = {"targetHost": "client"}
config["my_machine"]["clan"]["core"]["state"]["my_state"]["folders"] = [
# to be owned by root
"/var/my-state",
# to be owned by user 'test'
"/var/user-state",
]
config["my_machine"]["users"]["users"] = {
"test": {"password": "test", "isNormalUser": True},
"root": {"password": "root"},
}
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "new-clan",
machine_configs=dict(
my_machine=dict(
services=dict(getty=dict(autologinUser="root")),
clanCore=dict(
state=dict(
my_state=dict(
folders=[
# to be owned by root
"/var/my-state",
# to be owned by user 'test'
"/var/user-state",
]
)
)
),
# create test user to test if state can be owned by user
users=dict(
users=dict(
test=dict(
password="test",
isNormalUser=True,
),
root=dict(password="root"),
)
),
# create a systemd service to create a file in the state folder
# and another to read it after reboot
systemd=dict(
services=dict(
create_state=dict(
description="Create a file in the state folder",
wantedBy=["multi-user.target"],
script="""
if [ ! -f /var/my-state/root ]; then
echo "Creating a file in the state folder"
echo "dream2nix" > /var/my-state/root
# create /var/my-state/test owned by user test
echo "dream2nix" > /var/my-state/test
chown test /var/my-state/test
# make sure /var/user-state is owned by test
chown test /var/user-state
fi
""",
serviceConfig=dict(
Type="oneshot",
),
),
reboot=dict(
description="Reboot the machine",
wantedBy=["multi-user.target"],
after=["my-state.service"],
script="""
if [ ! -f /var/my-state/rebooting ]; then
echo "Rebooting the machine"
touch /var/my-state/rebooting
poweroff
else
touch /var/my-state/rebooted
fi
""",
),
read_after_reboot=dict(
description="Read a file in the state folder",
wantedBy=["multi-user.target"],
after=["reboot.service"],
# TODO: currently state folders itself cannot be owned by users
script="""
if ! cat /var/my-state/test; then
echo "cannot read from state file" > /var/my-state/error
# ensure root file is owned by root
elif [ "$(stat -c '%U' /var/my-state/root)" != "root" ]; then
echo "state file /var/my-state/root is not owned by user root" > /var/my-state/error
# ensure test file is owned by test
elif [ "$(stat -c '%U' /var/my-state/test)" != "test" ]; then
echo "state file /var/my-state/test is not owned by user test" > /var/my-state/error
# ensure /var/user-state is owned by test
elif [ "$(stat -c '%U' /var/user-state)" != "test" ]; then
echo "state folder /var/user-state is not owned by user test" > /var/my-state/error
fi
""",
serviceConfig=dict(
Type="oneshot",
),
),
)
),
clan=dict(
virtualisation=dict(graphics=False),
networking=dict(targetHost="client"),
),
)
),
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=config,
)
monkeypatch.chdir(flake.path)
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
state_dir = vm_state_dir(str(flake.path), "my_machine")
monkeypatch.chdir(flake.path)
run_vm_in_thread("my_machine")
# wait for the VM to start
wait_vm_up(state_dir)
# wait for the VM to start and connect qga
qga = qga_connect("my_machine")
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
wait_vm_down(state_dir)
wait_vm_down("my_machine")
# start vm again
run_vm_in_thread("my_machine")
# connect second time
qga = qga_connect(state_dir)
qga = qga_connect("my_machine")
# check state exists
qga.run("cat /var/my-state/test", check=True)
# ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True)
# ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True)
# ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True)
# ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test")
@@ -301,5 +173,5 @@ def test_vm_persistence(
assert exitcode == 0, out
# use qmp to shutdown the machine (prevent zombie qemu processes)
qmp = qmp_connect(state_dir)
qmp = qmp_connect("my_machine")
qmp.command("system_powerdown")