Compare commits

..

13 Commits

Author SHA1 Message Date
Johannes Kirschbauer
5859eeac5a vm_manager: remove in favor of clan-app 2025-08-13 20:23:05 +02:00
hsjobeki
62ef90e959 Merge pull request 'codeowners: remove @lopter due to inactivity' (#4742) from codeowners into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4742
2025-08-13 17:48:08 +00:00
hsjobeki
7fdbd2e3eb Merge pull request 'dirs: remove unused conditional import' (#4736) from import-cleanup-dirs into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4736
2025-08-13 17:46:19 +00:00
hsjobeki
7daebd5ee0 Merge pull request 'networking: remove unused conditional import' (#4737) from networking-import into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4737
2025-08-13 17:46:04 +00:00
hsjobeki
cc8dd0564b Merge pull request 'askpass: use protocol as interface' (#4739) from interface-askpass into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4739
2025-08-13 17:43:01 +00:00
Johannes Kirschbauer
23e52954c9 codeowners: remove @lopter due to inactivity 2025-08-13 19:41:47 +02:00
hsjobeki
4717d1f149 Merge pull request 'lib/Remote: Unify class method _parse_ssh_uri with class file' (#4733) from imports-cleanup into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4733
2025-08-13 17:40:36 +00:00
Johannes Kirschbauer
e28f280036 vars: remove unused conditional import 2025-08-13 19:24:53 +02:00
Johannes Kirschbauer
6fa2a977df askpass: use protocol as interface
Avoids a cyclic dependency on the Remote class
Strips down the dependency closure by explizitly declaring what functions it needs
2025-08-13 19:21:18 +02:00
Johannes Kirschbauer
65dba2508f dirs: remove unused conditional import 2025-08-13 19:04:32 +02:00
Johannes Kirschbauer
9884643070 networking: remove unused conditional import 2025-08-13 19:03:35 +02:00
Johannes Kirschbauer
5083992f7b lib: remove unused TYPE_CHECKING 2025-08-13 18:26:57 +02:00
Johannes Kirschbauer
6bd8839128 lib/Remote: Unify class method _parse_ssh_uri with class file 2025-08-13 18:26:28 +02:00
90 changed files with 1469 additions and 4547 deletions

View File

@@ -1,2 +0,0 @@
nixosModules/clanCore/vars/.* @lopter
pkgs/clan-cli/clan_cli/(secrets|vars)/.* @lopter

View File

@@ -116,6 +116,7 @@ nav:
- Overview: reference/cli/index.md
- reference/cli/backups.md
- reference/cli/facts.md
- reference/cli/flakes.md
- reference/cli/flash.md
- reference/cli/machines.md

View File

@@ -59,8 +59,6 @@
"pkgs/clan-cli/clan_cli/tests/data/password-store/.gpg-id"
"pkgs/clan-cli/clan_cli/tests/data/ssh_host_ed25519_key"
"pkgs/clan-cli/clan_cli/tests/data/sshd_config"
"pkgs/clan-vm-manager/.vscode/lhebendanz.weaudit"
"pkgs/clan-vm-manager/bin/clan-vm-manager"
"clanServices/hello-world/default.nix"
"sops/secrets/test-backup-age.key/secret"
];
@@ -115,21 +113,7 @@
];
extraPythonPaths = [ "../clan-cli" ];
};
}
// (
if pkgs.stdenv.isLinux then
{
"clan-vm-manager" = {
directory = "pkgs/clan-vm-manager";
extraPythonPackages = self'.packages.clan-vm-manager.externalTestDeps ++ [
(pkgs.python3.withPackages (ps: self'.packages.clan-cli.devshellPyDeps ps))
];
extraPythonPaths = [ "../clan-cli" ];
};
}
else
{ }
);
treefmt.programs.ruff.check = true;
treefmt.programs.ruff.format = true;
};

View File

@@ -20,6 +20,7 @@ from . import (
)
from .arg_actions import AppendOptionAction
from .clan import show
from .facts import cli as facts
from .flash import cli as flash_cli
from .hyperlink import help_hyperlink
from .machines import cli as machines
@@ -301,6 +302,45 @@ For more detailed information, visit: {help_hyperlink("secrets", "https://docs.c
)
secrets.register_parser(parser_secrets)
parser_facts = subparsers.add_parser(
"facts",
help="Manage facts",
description="Manage facts",
epilog=(
f"""
Note: Facts are being deprecated, please use Vars instead.
For a migration guide visit: {help_hyperlink("vars", "https://docs.clan.lol/guides/migrations/migration-facts-vars")}
This subcommand provides an interface to facts of clan machines.
Facts are artifacts that a service can generate.
There are public and secret facts.
Public facts can be referenced by other machines directly.
Public facts can include: ip addresses, public keys.
Secret facts can include: passwords, private keys.
A service is an included clan-module that implements facts generation functionality.
For example the zerotier module will generate private and public facts.
In this case the public fact will be the resulting zerotier-ip of the machine.
The secret fact will be the zerotier-identity-secret, which is used by zerotier
to prove the machine has control of the zerotier-ip.
Examples:
$ clan facts generate
Will generate facts for all machines.
$ clan facts generate --service [SERVICE] --regenerate
Will regenerate facts, if they are already generated for a specific service.
This is especially useful for resetting certain passwords while leaving the rest
of the facts for a machine in place.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
facts.register_parser(parser_facts)
# like facts but with vars instead of facts
parser_vars = subparsers.add_parser(
"vars",

View File

@@ -0,0 +1,60 @@
import argparse
import logging
from clan_lib.flake import require_flake
from clan_lib.machines.machines import Machine
from clan_cli.completions import add_dynamic_completer, complete_machines
log = logging.getLogger(__name__)
def check_secrets(machine: Machine, service: None | str = None) -> bool:
missing_secret_facts = []
missing_public_facts = []
services = [service] if service else list(machine.facts_data.keys())
for service in services:
for secret_fact in machine.facts_data[service]["secret"]:
if isinstance(secret_fact, str):
secret_name = secret_fact
else:
secret_name = secret_fact["name"]
if not machine.secret_facts_store.exists(service, secret_name):
machine.info(
f"Secret fact '{secret_fact}' for service '{service}' is missing."
)
missing_secret_facts.append((service, secret_name))
for public_fact in machine.facts_data[service]["public"]:
if not machine.public_facts_store.exists(service, public_fact):
machine.info(
f"Public fact '{public_fact}' for service '{service}' is missing."
)
missing_public_facts.append((service, public_fact))
machine.debug(f"missing_secret_facts: {missing_secret_facts}")
machine.debug(f"missing_public_facts: {missing_public_facts}")
return not (missing_secret_facts or missing_public_facts)
def check_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machine = Machine(
name=args.machine,
flake=flake,
)
check_secrets(machine, service=args.service)
def register_check_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to check secrets for",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.add_argument(
"--service",
help="the service to check",
)
parser.set_defaults(func=check_command)

View File

@@ -0,0 +1,15 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_check_command_no_flake(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "check", "machine1"])

View File

@@ -0,0 +1,133 @@
# !/usr/bin/env python3
import argparse
from clan_cli.hyperlink import help_hyperlink
from .check import register_check_parser
from .generate import register_generate_parser
from .list import register_list_parser
from .upload import register_upload_parser
# takes a (sub)parser and configures it
def register_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
check_parser = subparser.add_parser(
"check",
help="check if facts are up to date",
epilog=(
f"""
This subcommand allows checking if all facts are up to date.
Examples:
$ clan facts check [MACHINE]
Will check facts for the specified machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_check_parser(check_parser)
list_parser = subparser.add_parser(
"list",
help="list all facts",
epilog=(
f"""
This subcommand allows listing all public facts for a specific machine.
The resulting list will be a json string with the name of the fact as its key
and the fact itself as it's value.
This is how an example output might look like:
```
\u007b
"[FACT_NAME]": "[FACT]"
\u007d
```
Examples:
$ clan facts list [MACHINE]
Will list facts for the specified machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_list_parser(list_parser)
parser_generate = subparser.add_parser(
"generate",
help="generate public and secret facts for machines",
epilog=(
f"""
This subcommand allows control of the generation of facts.
Often this function will be invoked automatically on deploying machines,
but there are situations the user may want to have more granular control,
especially for the regeneration of certain services.
A service is an included clan-module that implements facts generation functionality.
For example the zerotier module will generate private and public facts.
In this case the public fact will be the resulting zerotier-ip of the machine.
The secret fact will be the zerotier-identity-secret, which is used by zerotier
to prove the machine has control of the zerotier-ip.
Examples:
$ clan facts generate
Will generate facts for all machines.
$ clan facts generate [MACHINE]
Will generate facts for the specified machine.
$ clan facts generate [MACHINE] --service [SERVICE]
Will generate facts for the specified machine for the specified service.
$ clan facts generate --service [SERVICE] --regenerate
Will regenerate facts, if they are already generated for a specific service.
This is especially useful for resetting certain passwords while leaving the rest
of the facts for a machine in place.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_generate_parser(parser_generate)
parser_upload = subparser.add_parser(
"upload",
help="upload secrets for machines",
epilog=(
f"""
This subcommand allows uploading secrets to remote machines.
If using sops as a secret backend it will upload the private key to the machine.
If using password store it uploads all the secrets you manage to the machine.
The default backend is sops.
Examples:
$ clan facts upload [MACHINE]
Will upload secrets to a specific machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_upload_parser(parser_upload)

View File

@@ -0,0 +1,263 @@
import argparse
import logging
import os
import sys
import traceback
from collections.abc import Callable
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_lib.cmd import RunOpts, run
from clan_lib.errors import ClanError
from clan_lib.flake import require_flake
from clan_lib.git import commit_files
from clan_lib.machines.list import list_full_machines
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
complete_services_for_machine,
)
from .check import check_secrets
from .public_modules import FactStoreBase
from .secret_modules import SecretStoreBase
log = logging.getLogger(__name__)
def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
"""
Read multi-line input from stdin.
"""
print(prompt, flush=True)
proc = run(["cat"], RunOpts(check=False))
log.info("Input received. Processing...")
return proc.stdout.rstrip(os.linesep).rstrip()
def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]:
# fmt: off
return nix_shell(
[
"bash",
"bubblewrap",
],
[
"bwrap",
"--unshare-all",
"--tmpfs", "/",
"--ro-bind", "/nix/store", "/nix/store",
"--ro-bind", "/bin/sh", "/bin/sh",
"--dev", "/dev",
# not allowed to bind procfs in some sandboxes
"--bind", str(facts_dir), str(facts_dir),
"--bind", str(secrets_dir), str(secrets_dir),
"--chdir", "/",
# Doesn't work in our CI?
#"--proc", "/proc",
#"--hostname", "facts",
"--bind", "/proc", "/proc",
"--uid", "1000",
"--gid", "1000",
"--",
"bash", "-c", generator
],
)
# fmt: on
def generate_service_facts(
machine: Machine,
service: str,
regenerate: bool,
secret_facts_store: SecretStoreBase,
public_facts_store: FactStoreBase,
tmpdir: Path,
prompt: Callable[[str, str], str],
) -> bool:
service_dir = tmpdir / service
# check if all secrets exist and generate them if at least one is missing
needs_regeneration = not check_secrets(machine, service=service)
machine.debug(f"{service} needs_regeneration: {needs_regeneration}")
if not (needs_regeneration or regenerate):
return False
if not isinstance(machine.flake, Path):
msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes"
env = os.environ.copy()
facts_dir = service_dir / "facts"
facts_dir.mkdir(parents=True)
env["facts"] = str(facts_dir)
secrets_dir = service_dir / "secrets"
secrets_dir.mkdir(parents=True)
env["secrets"] = str(secrets_dir)
# compatibility for old outputs.nix users
if isinstance(machine.facts_data[service]["generator"], str):
generator = machine.facts_data[service]["generator"]
else:
generator = machine.facts_data[service]["generator"]["finalScript"]
if machine.facts_data[service]["generator"]["prompt"]:
prompt_value = prompt(
service, machine.facts_data[service]["generator"]["prompt"]
)
env["prompt_value"] = prompt_value
from clan_lib import bwrap
if sys.platform == "linux" and bwrap.bubblewrap_works():
cmd = bubblewrap_cmd(generator, facts_dir, secrets_dir)
else:
cmd = ["bash", "-c", generator]
run(
cmd,
RunOpts(env=env),
)
files_to_commit = []
# store secrets
for secret_name, secret in machine.facts_data[service]["secret"].items():
groups = secret.get("groups", [])
secret_file = secrets_dir / secret_name
if not secret_file.is_file():
msg = f"did not generate a file for '{secret_name}' when running the following command:\n"
msg += generator
raise ClanError(msg)
secret_path = secret_facts_store.set(
service, secret_name, secret_file.read_bytes(), groups
)
if secret_path:
files_to_commit.append(secret_path)
# store facts
for name in machine.facts_data[service]["public"]:
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += machine.facts_data[service]["generator"]
raise ClanError(msg)
fact_file = public_facts_store.set(service, name, fact_file.read_bytes())
if fact_file:
files_to_commit.append(fact_file)
commit_files(
files_to_commit,
machine.flake_dir,
f"Update facts/secrets for service {service} in machine {machine.name}",
)
return True
def prompt_func(service: str, text: str) -> str:
print(f"{text}: ")
return read_multiline_input()
def _generate_facts_for_machine(
machine: Machine,
service: str | None,
regenerate: bool,
tmpdir: Path,
prompt: Callable[[str, str], str] = prompt_func,
) -> bool:
local_temp = tmpdir / machine.name
local_temp.mkdir()
machine_updated = False
if service and service not in machine.facts_data:
services = list(machine.facts_data.keys())
msg = f"Could not find service with name: {service}. The following services are available: {services}"
raise ClanError(msg)
if service:
machine_service_facts = {service: machine.facts_data[service]}
else:
machine_service_facts = machine.facts_data
for service in machine_service_facts:
machine_updated |= generate_service_facts(
machine=machine,
service=service,
regenerate=regenerate,
secret_facts_store=machine.secret_facts_store,
public_facts_store=machine.public_facts_store,
tmpdir=local_temp,
prompt=prompt,
)
if machine_updated:
# flush caches to make sure the new secrets are available in evaluation
machine.flush_caches()
return machine_updated
def generate_facts(
machines: list[Machine],
service: str | None = None,
regenerate: bool = False,
prompt: Callable[[str, str], str] = prompt_func,
) -> bool:
was_regenerated = False
with TemporaryDirectory(prefix="facts-generate-") as _tmpdir:
tmpdir = Path(_tmpdir).resolve()
for machine in machines:
errors = 0
try:
was_regenerated |= _generate_facts_for_machine(
machine, service, regenerate, tmpdir, prompt
)
except (OSError, ClanError) as e:
machine.error(f"Failed to generate facts: {e}")
traceback.print_exc()
errors += 1
if errors > 0:
msg = (
f"Failed to generate facts for {errors} hosts. Check the logs above"
)
raise ClanError(msg)
if not was_regenerated and len(machines) > 0:
log.info("All secrets and facts are already up to date")
return was_regenerated
def generate_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machines: list[Machine] = list(list_full_machines(flake).values())
if len(args.machines) > 0:
machines = list(
filter(
lambda m: m.name in args.machines,
machines,
)
)
generate_facts(machines, args.service, args.regenerate)
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machines",
type=str,
help="machine to generate facts for. if empty, generate facts for all machines",
nargs="*",
default=[],
)
add_dynamic_completer(machines_parser, complete_machines)
service_parser = parser.add_argument(
"--service",
type=str,
help="service to generate facts for, if empty, generate facts for every service",
default=None,
)
add_dynamic_completer(service_parser, complete_services_for_machine)
parser.add_argument(
"--regenerate",
action=argparse.BooleanOptionalAction,
help="whether to regenerate facts for the specified machine",
default=None,
)
parser.set_defaults(func=generate_command)

View File

@@ -0,0 +1,15 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_generate_command_no_flake(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "generate"])

View File

@@ -0,0 +1,33 @@
import argparse
import json
import logging
from clan_lib.flake import require_flake
from clan_lib.machines.machines import Machine
from clan_cli.completions import add_dynamic_completer, complete_machines
log = logging.getLogger(__name__)
def get_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machine = Machine(name=args.machine, flake=flake)
# the raw_facts are bytestrings making them not json serializable
raw_facts = machine.public_facts_store.get_all()
facts = {}
for key in raw_facts["TODO"]:
facts[key] = raw_facts["TODO"][key].decode("utf8")
print(json.dumps(facts, indent=4))
def register_list_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to print facts for",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.set_defaults(func=get_command)

View File

@@ -0,0 +1,13 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_list_command_no_flake(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "list", "machine1"])

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import clan_lib.machines.machines as machines
class FactStoreBase(ABC):
@abstractmethod
def __init__(self, machine: machines.Machine) -> None:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
@abstractmethod
def set(self, service: str, name: str, value: bytes) -> Path | None:
pass
# get a single fact
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
# get all facts
@abstractmethod
def get_all(self) -> dict[str, dict[str, bytes]]:
pass

View File

@@ -0,0 +1,51 @@
from pathlib import Path
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from . import FactStoreBase
class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
def set(self, service: str, name: str, value: bytes) -> Path | None:
if self.machine.flake.is_local:
fact_path = (
self.machine.flake.path
/ "machines"
/ self.machine.name
/ "facts"
/ name
)
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.touch()
fact_path.write_bytes(value)
return fact_path
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg)
def exists(self, service: str, name: str) -> bool:
fact_path = (
self.machine.flake_dir / "machines" / self.machine.name / "facts" / name
)
return fact_path.exists()
# get a single fact
def get(self, service: str, name: str) -> bytes:
fact_path = (
self.machine.flake_dir / "machines" / self.machine.name / "facts" / name
)
return fact_path.read_bytes()
# get all facts
def get_all(self) -> dict[str, dict[str, bytes]]:
facts_folder = self.machine.flake_dir / "machines" / self.machine.name / "facts"
facts: dict[str, dict[str, bytes]] = {}
facts["TODO"] = {}
if facts_folder.exists():
for fact_path in facts_folder.iterdir():
facts["TODO"][fact_path.name] = fact_path.read_bytes()
return facts

View File

@@ -0,0 +1,47 @@
import logging
from pathlib import Path
from clan_lib.dirs import vm_state_dir
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from . import FactStoreBase
log = logging.getLogger(__name__)
class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
self.dir = vm_state_dir(machine.flake.identifier, machine.name) / "facts"
machine.debug(f"FactStore initialized with dir {self.dir}")
def exists(self, service: str, name: str) -> bool:
fact_path = self.dir / service / name
return fact_path.exists()
def set(self, service: str, name: str, value: bytes) -> Path | None:
fact_path = self.dir / service / name
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.write_bytes(value)
return None
# get a single fact
def get(self, service: str, name: str) -> bytes:
fact_path = self.dir / service / name
if fact_path.exists():
return fact_path.read_bytes()
msg = f"Fact {name} for service {service} not found"
raise ClanError(msg)
# get all facts
def get_all(self) -> dict[str, dict[str, bytes]]:
facts: dict[str, dict[str, bytes]] = {}
if self.dir.exists():
for service in self.dir.iterdir():
facts[service.name] = {}
for fact in service.iterdir():
facts[service.name][fact.name] = fact.read_bytes()
return facts

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import clan_lib.machines.machines as machines
from clan_lib.ssh.host import Host
class SecretStoreBase(ABC):
@abstractmethod
def __init__(self, machine: machines.Machine) -> None:
pass
@abstractmethod
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
pass
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
def needs_upload(self, host: Host) -> bool:
return True
@abstractmethod
def upload(self, output_dir: Path) -> None:
pass

View File

@@ -0,0 +1,122 @@
import os
import subprocess
from pathlib import Path
from typing import override
from clan_lib.cmd import Log, RunOpts
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_lib.ssh.host import Host
from clan_cli.facts.secret_modules import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
subprocess.run(
nix_shell(
["pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value,
check=True,
)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
check=False,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
check=False,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
@override
def needs_upload(self, host: Host) -> bool:
local_hash = self.generate_hash()
with host.host_connection() as ssh:
remote_hash = ssh.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
RunOpts(log=Log.STDERR, check=False),
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return True
return local_hash.decode() != remote_hash
def upload(self, output_dir: Path) -> None:
os.umask(0o077)
for service in self.machine.facts_data:
for secret in self.machine.facts_data[service]["secret"]:
if isinstance(secret, dict):
secret_name = secret["name"]
else:
# TODO: drop old format soon
secret_name = secret
(output_dir / secret_name).write_bytes(self.get(service, secret_name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -0,0 +1,72 @@
from pathlib import Path
from typing import override
from clan_lib.machines.machines import Machine
from clan_lib.ssh.host import Host
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key, load_age_plugins
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "facts_data"):
return
if not self.machine.facts_data:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
add_groups=self.machine.select("config.clan.core.sops.defaultGroups"),
age_plugins=load_age_plugins(self.machine.flake),
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
path = (
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}"
)
encrypt_secret(
self.machine.flake_dir,
path,
value,
add_machines=[self.machine.name],
add_groups=groups,
age_plugins=load_age_plugins(self.machine.flake),
)
return path
def get(self, service: str, name: str) -> bytes:
return decrypt_secret(
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
age_plugins=load_age_plugins(self.machine.flake),
).encode("utf-8")
def exists(self, service: str, name: str) -> bool:
return has_secret(
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
)
@override
def needs_upload(self, host: Host) -> bool:
return False
# We rely now on the vars backend to upload the age key
def upload(self, output_dir: Path) -> None:
pass

View File

@@ -0,0 +1,36 @@
import shutil
from pathlib import Path
from typing import override
from clan_lib.dirs import vm_state_dir
from clan_lib.machines.machines import Machine
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.dir = vm_state_dir(machine.flake.identifier, machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
secret_file = self.dir / service / name
secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
secret_file = self.dir / service / name
return secret_file.read_bytes()
def exists(self, service: str, name: str) -> bool:
return (self.dir / service / name).exists()
@override
def upload(self, output_dir: Path) -> None:
if output_dir.exists():
shutil.rmtree(output_dir)
shutil.copytree(self.dir, output_dir)

View File

@@ -0,0 +1,42 @@
import argparse
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_lib.flake import require_flake
from clan_lib.machines.machines import Machine
from clan_lib.ssh.host import Host
from clan_lib.ssh.upload import upload
from clan_cli.completions import add_dynamic_completer, complete_machines
log = logging.getLogger(__name__)
def upload_secrets(machine: Machine, host: Host) -> None:
if not machine.secret_facts_store.needs_upload(host):
machine.info("Secrets already uploaded")
return
with TemporaryDirectory(prefix="facts-upload-") as _tempdir:
local_secret_dir = Path(_tempdir).resolve()
machine.secret_facts_store.upload(local_secret_dir)
remote_secret_dir = Path(machine.secrets_upload_directory)
upload(host, local_secret_dir, remote_secret_dir)
def upload_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machine = Machine(name=args.machine, flake=flake)
with machine.target_host().host_connection() as host:
upload_secrets(machine, host)
def register_upload_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to upload secrets to",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.set_defaults(func=upload_command)

View File

@@ -0,0 +1,15 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_upload_command_no_flake(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "upload", "machine1"])

View File

@@ -0,0 +1,133 @@
import ipaddress
from typing import TYPE_CHECKING
import pytest
from clan_cli.facts.secret_modules.sops import SecretStore
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.tests.fixtures_flakes import FlakeForTest
from clan_cli.tests.helpers import cli
from clan_cli.tests.helpers.validator import is_valid_age_key
from clan_lib.flake import Flake
from clan_lib.machines.machines import Machine
if TYPE_CHECKING:
from .age_keys import KeyPair
@pytest.mark.impure
def test_generate_secret(
monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake_with_core.path),
"user1",
age_keys[0].pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake_with_core.path),
"admins",
"user1",
]
)
cmd = [
"vars",
"generate",
"--flake",
str(test_flake_with_core.path),
"vm1",
"--generator",
"zerotier",
]
cli.run(cmd)
store1 = SecretStore(
Machine(name="vm1", flake=Flake(str(test_flake_with_core.path)))
)
assert store1.exists("", "age.key")
network_id = (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm1"
/ "zerotier"
/ "zerotier-network-id"
/ "value"
).read_text()
assert len(network_id) == 16
secrets_folder = sops_secrets_folder(test_flake_with_core.path)
age_key = secrets_folder / "vm1-age.key" / "secret"
identity_secret = (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm1"
/ "zerotier"
/ "zerotier-identity-secret"
/ "secret"
)
age_key_mtime = age_key.lstat().st_mtime_ns
secret1_mtime = identity_secret.lstat().st_mtime_ns
# Assert that the age key is valid
age_secret = store1.get("", "age.key").decode()
assert is_valid_age_key(age_secret)
# test idempotency for vm1 and also generate for vm2
cli.run(
[
"vars",
"generate",
"--flake",
str(test_flake_with_core.path),
"--generator",
"zerotier",
]
)
assert age_key.lstat().st_mtime_ns == age_key_mtime
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
store2 = SecretStore(
Machine(name="vm2", flake=Flake(str(test_flake_with_core.path)))
)
assert store2.exists("", "age.key")
assert (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm2"
/ "zerotier"
/ "zerotier-identity-secret"
/ "secret"
).exists()
ip = (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm2"
/ "zerotier"
/ "zerotier-ip"
/ "value"
).read_text()
assert ipaddress.IPv6Address(ip).is_private
# Assert that the age key is valid
age_secret = store2.get("", "age.key").decode()
assert is_valid_age_key(age_secret)

View File

@@ -820,6 +820,63 @@ def test_stdout_of_generate(
caplog.clear()
@pytest.mark.with_core
def test_migration(
monkeypatch: pytest.MonkeyPatch,
flake_with_sops: ClanFlake,
caplog: pytest.LogCaptureFixture,
) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_service = config["clan"]["core"]["facts"]["services"]["my_service"]
my_service["public"]["my_value"] = {}
my_service["secret"]["my_secret"] = {}
my_service["generator"]["script"] = (
'echo -n hello > "$facts"/my_value && echo -n hello > "$secrets"/my_secret'
)
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_value"]["secret"] = False
my_generator["files"]["my_secret"]["secret"] = True
my_generator["migrateFact"] = "my_service"
my_generator["script"] = 'echo -n other > "$out"/my_value'
other_service = config["clan"]["core"]["facts"]["services"]["other_service"]
other_service["secret"]["other_value"] = {}
other_service["generator"]["script"] = 'echo -n hello > "$secrets"/other_value'
other_generator = config["clan"]["core"]["vars"]["generators"]["other_generator"]
# the var to migrate to is mistakenly marked as not secret (migration should fail)
other_generator["files"]["other_value"]["secret"] = False
other_generator["migrateFact"] = "my_service"
other_generator["script"] = 'echo -n value-from-vars > "$out"/other_value'
flake.refresh()
monkeypatch.chdir(flake.path)
cli.run(["facts", "generate", "--flake", str(flake.path), "my_machine"])
with caplog.at_level(logging.INFO):
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
assert "Migrated var my_generator/my_value" in caplog.text
assert "Migrated secret var my_generator/my_secret" in caplog.text
flake_obj = Flake(str(flake.path))
my_generator = Generator("my_generator", machine="my_machine", _flake=flake_obj)
other_generator = Generator(
"other_generator", machine="my_machine", _flake=flake_obj
)
in_repo_store = in_repo.FactStore(flake=flake_obj)
sops_store = sops.SecretStore(flake=flake_obj)
assert in_repo_store.exists(my_generator, "my_value")
assert in_repo_store.get(my_generator, "my_value").decode() == "hello"
assert sops_store.exists(my_generator, "my_secret")
assert sops_store.get(my_generator, "my_secret").decode() == "hello"
assert in_repo_store.exists(other_generator, "other_value")
assert (
in_repo_store.get(other_generator, "other_value").decode() == "value-from-vars"
)
@pytest.mark.with_core
def test_fails_when_files_are_left_from_other_backend(
monkeypatch: pytest.MonkeyPatch,

View File

@@ -8,7 +8,6 @@ from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING
from clan_cli.completions import (
add_dynamic_completer,
@@ -16,6 +15,7 @@ from clan_cli.completions import (
complete_services_for_machine,
)
from clan_cli.vars._types import StoreBase
from clan_cli.vars.migration import check_can_migrate, migrate_files
from clan_lib.api import API
from clan_lib.cmd import RunOpts, run
from clan_lib.errors import ClanError
@@ -32,9 +32,6 @@ from .var import Var
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from clan_lib.flake import Flake
@dataclass(frozen=True)
class GeneratorKey:
@@ -509,6 +506,9 @@ def _generate_vars_for_machine(
) -> None:
_ensure_healthy(machine=machine, generators=generators)
for generator in generators:
if check_can_migrate(machine, generator):
migrate_files(machine, generator)
else:
_execute_generator(
machine=machine,
generator=generator,

View File

@@ -1,7 +1,136 @@
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from clan_lib.errors import ClanError
from clan_lib.git import commit_files
log = logging.getLogger(__name__)
if TYPE_CHECKING:
pass
from clan_cli.vars.generate import Generator
from clan_lib.machines.machines import Machine
def _migration_file_exists(
machine: "Machine",
generator: "Generator",
fact_name: str,
) -> bool:
for file in generator.files:
if file.name == fact_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
is_secret = file.secret
if is_secret:
if machine.secret_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the secret fact store"
)
if not is_secret:
if machine.public_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the public fact store"
)
return False
def _migrate_file(
machine: "Machine",
generator: "Generator",
var_name: str,
service_name: str,
fact_name: str,
) -> list[Path]:
for file in generator.files:
if file.name == var_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
paths = []
if file.secret:
old_value = machine.secret_facts_store.get(service_name, fact_name)
maybe_path = machine.secret_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
else:
old_value = machine.public_facts_store.get(service_name, fact_name)
maybe_path = machine.public_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
return paths
def migrate_files(
machine: "Machine",
generator: "Generator",
) -> None:
not_found = []
files_to_commit = []
for file in generator.files:
if _migration_file_exists(machine, generator, file.name):
assert generator.migrate_fact is not None
files_to_commit += _migrate_file(
machine, generator, file.name, generator.migrate_fact, file.name
)
else:
not_found.append(file.name)
if len(not_found) > 0:
msg = f"Could not migrate the following files for generator {generator.name}, as no fact or secret exists with the same name: {not_found}"
raise ClanError(msg)
commit_files(
files_to_commit,
machine.flake_dir,
f"migrated facts to vars for generator {generator.name} for machine {machine.name}",
)
def check_can_migrate(
machine: "Machine",
generator: "Generator",
) -> bool:
service_name = generator.migrate_fact
if not service_name:
return False
# ensure that none of the generated vars already exist in the store
all_files_missing = True
all_files_present = True
for file in generator.files:
if file.secret:
if machine.secret_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
else:
if machine.public_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
if not all_files_present and not all_files_missing:
msg = f"Cannot migrate facts for generator {generator.name} as some files already exist in the store"
raise ClanError(msg)
if all_files_present:
# all files already migrated, no need to run migration again
return False
# ensure that all files can be migrated (exists in the corresponding fact store)
return bool(
all(
_migration_file_exists(machine, generator, file.name)
for file in generator.files
)
)

View File

@@ -19,6 +19,7 @@ from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.facts.generate import generate_facts
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from clan_cli.vars.generate import generate_vars
@@ -83,7 +84,10 @@ def get_secrets(
secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(parents=True, exist_ok=True)
generate_facts([machine])
generate_vars([machine])
machine.secret_facts_store.upload(secrets_dir)
populate_secret_vars(machine, secrets_dir)
return secrets_dir

View File

@@ -4,11 +4,9 @@ import sys
import urllib.parse
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Protocol
from typing import Protocol
from clan_lib.errors import ClanError
if TYPE_CHECKING:
from clan_lib.flake import Flake
log = logging.getLogger(__name__)

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Literal
from clan_cli.facts.generate import generate_facts
from clan_cli.vars.generate import generate_vars
from clan_cli.vars.upload import populate_secret_vars
@@ -76,6 +77,7 @@ def run_machine_flash(
extra_args = []
system_config_nix: dict[str, Any] = {}
generate_facts([machine])
generate_vars([machine])
if system_config.language:
@@ -129,6 +131,7 @@ def run_machine_flash(
local_dir = tmpdir / upload_dir
local_dir.mkdir(parents=True)
machine.secret_facts_store.upload(local_dir)
populate_secret_vars(machine, local_dir)
disko_install = []

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Literal
from clan_cli.facts.generate import generate_facts
from clan_cli.machines.hardware import HardwareConfig
from clan_cli.vars.generate import generate_vars
@@ -86,6 +87,7 @@ def run_machine_install(opts: InstallOptions, target_host: Remote) -> None:
# Notify the UI about what we are doing
notify_install_step("generators")
generate_facts([machine])
generate_vars([machine])
with (
@@ -98,6 +100,7 @@ def run_machine_install(opts: InstallOptions, target_host: Remote) -> None:
# Notify the UI about what we are doing
notify_install_step("upload-secrets")
machine.secret_facts_store.upload(upload_dir)
machine.secret_vars_store.populate_dir(
machine.name, upload_dir, phases=["activation", "users", "services"]
)

View File

@@ -3,8 +3,10 @@ import logging
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from typing import Any, Literal
from clan_cli.facts import public_modules as facts_public_modules
from clan_cli.facts import secret_modules as facts_secret_modules
from clan_cli.vars._types import StoreBase
from clan_lib.api import API
@@ -14,9 +16,6 @@ from clan_lib.ssh.remote import Remote
log = logging.getLogger(__name__)
if TYPE_CHECKING:
pass
@dataclass(frozen=True)
class Machine:
@@ -82,6 +81,18 @@ class Machine:
f'{self._class_}Configurations."{self.name}".pkgs.hostPlatform.system'
)
@cached_property
def secret_facts_store(self) -> facts_secret_modules.SecretStoreBase:
secret_module = self.select("config.clan.core.facts.secretModule")
module = importlib.import_module(secret_module)
return module.SecretStore(machine=self)
@cached_property
def public_facts_store(self) -> facts_public_modules.FactStoreBase:
public_module = self.select("config.clan.core.facts.publicModule")
module = importlib.import_module(public_module)
return module.FactStore(machine=self)
@cached_property
def secret_vars_store(self) -> StoreBase:
secret_module = self.select("config.clan.core.vars.settings.secretModule")

View File

@@ -5,6 +5,8 @@ import re
import shlex
from contextlib import ExitStack
from clan_cli.facts.generate import generate_facts
from clan_cli.facts.upload import upload_secrets
from clan_cli.vars.generate import generate_vars
from clan_cli.vars.upload import upload_secret_vars
@@ -145,9 +147,11 @@ def run_machine_update(
# Some operations require root privileges on the target host.
target_host_root = stack.enter_context(_target_host.become_root())
generate_facts([machine], service=None, regenerate=False)
generate_vars([machine], generator_name=None, regenerate=False)
# Upload secrets to the target host using root
upload_secrets(machine, target_host_root)
upload_secret_vars(machine, target_host_root)
# Upload the flake's source to the build host.

View File

@@ -5,17 +5,15 @@ from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING, Any
from typing import Any
from clan_cli.vars.get import get_machine_var
from clan_lib.errors import ClanError
from clan_lib.flake import Flake
from clan_lib.import_utils import ClassSource, import_with_source
from clan_lib.ssh.remote import Remote
if TYPE_CHECKING:
from clan_lib.machines.machines import Machine
from clan_lib.ssh.remote import Remote
log = logging.getLogger(__name__)

View File

@@ -1,74 +0,0 @@
import re
import urllib.parse
from typing import TYPE_CHECKING
from clan_lib.errors import ClanError
if TYPE_CHECKING:
from clan_lib.ssh.remote import Remote
def parse_ssh_uri(
*,
machine_name: str,
address: str,
) -> "Remote":
"""
Parses an SSH URI into a Remote object.
The address can be in the form of:
- `ssh://[user@]hostname[:port]?option=value&option2=value2`
- `[user@]hostname[:port]`
The specification can be found here: https://www.ietf.org/archive/id/draft-salowey-secsh-uri-00.html
"""
if address.startswith("ssh://"):
# Strip the `ssh://` prefix if it exists
address = address[len("ssh://") :]
parts = address.split("?", maxsplit=1)
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
parts = endpoint.split("@")
match len(parts):
case 2:
user, host_port = parts
case 1:
user, host_port = "root", parts[0]
case _:
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
raise ClanError(msg)
# Make this check now rather than failing with a `ValueError`
# when looking up the port from the `urlsplit` result below:
if host_port.count(":") > 1 and not re.match(r".*\[.*]", host_port):
msg = f"Invalid hostname: {address}. IPv6 addresses must be enclosed in brackets , e.g. [::1]"
raise ClanError(msg)
options: dict[str, str] = {}
for o in maybe_options.split("&"):
if len(o) == 0:
continue
parts = o.split("=", maxsplit=1)
if len(parts) != 2:
msg = (
f"Invalid option in host `{address}`: option `{o}` does not have "
f"a value (i.e. expected something like `name=value`)"
)
raise ClanError(msg)
name, value = parts
options[name] = value
result = urllib.parse.urlsplit(f"//{host_port}")
if not result.hostname:
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
raise ClanError(msg)
hostname = result.hostname
port = result.port
from clan_lib.ssh.remote import Remote
return Remote(
address=hostname,
user=user,
port=port,
command_prefix=machine_name,
ssh_options=options,
)

View File

@@ -1,9 +1,11 @@
import ipaddress
import logging
import os
import re
import shlex
import subprocess
import sys
import urllib.parse
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
@@ -17,7 +19,6 @@ from clan_lib.colors import AnsiColor
from clan_lib.errors import ClanError, indent_command # Assuming these are available
from clan_lib.nix import nix_shell
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
from clan_lib.ssh.parse import parse_ssh_uri
from clan_lib.ssh.socks_wrapper import SocksWrapper
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
@@ -108,7 +109,7 @@ class Remote:
Parse a deployment address and return a Remote object.
"""
return parse_ssh_uri(machine_name=machine_name, address=address)
return _parse_ssh_uri(machine_name=machine_name, address=address)
def run_local(
self,
@@ -468,3 +469,69 @@ class Remote:
from clan_lib.network.check import check_machine_ssh_login
return check_machine_ssh_login(self)
def _parse_ssh_uri(
*,
machine_name: str,
address: str,
) -> "Remote":
"""
Parses an SSH URI into a Remote object.
The address can be in the form of:
- `ssh://[user@]hostname[:port]?option=value&option2=value2`
- `[user@]hostname[:port]`
The specification can be found here: https://www.ietf.org/archive/id/draft-salowey-secsh-uri-00.html
"""
if address.startswith("ssh://"):
# Strip the `ssh://` prefix if it exists
address = address[len("ssh://") :]
parts = address.split("?", maxsplit=1)
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
parts = endpoint.split("@")
match len(parts):
case 2:
user, host_port = parts
case 1:
user, host_port = "root", parts[0]
case _:
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
raise ClanError(msg)
# Make this check now rather than failing with a `ValueError`
# when looking up the port from the `urlsplit` result below:
if host_port.count(":") > 1 and not re.match(r".*\[.*]", host_port):
msg = f"Invalid hostname: {address}. IPv6 addresses must be enclosed in brackets , e.g. [::1]"
raise ClanError(msg)
options: dict[str, str] = {}
for o in maybe_options.split("&"):
if len(o) == 0:
continue
parts = o.split("=", maxsplit=1)
if len(parts) != 2:
msg = (
f"Invalid option in host `{address}`: option `{o}` does not have "
f"a value (i.e. expected something like `name=value`)"
)
raise ClanError(msg)
name, value = parts
options[name] = value
result = urllib.parse.urlsplit(f"//{host_port}")
if not result.hostname:
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
raise ClanError(msg)
hostname = result.hostname
port = result.port
from clan_lib.ssh.remote import Remote
return Remote(
address=hostname,
user=user,
port=port,
command_prefix=machine_name,
ssh_options=options,
)

View File

@@ -8,21 +8,28 @@ import sys
import termios
import threading
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Protocol
from clan_lib.cmd import terminate_process
from clan_lib.errors import ClanError
if TYPE_CHECKING:
from clan_lib.ssh.remote import Remote
logger = logging.getLogger(__name__)
remote_script = (Path(__file__).parent / "sudo_askpass_proxy.sh").read_text()
class RemoteP(Protocol):
@property
def address(self) -> str: ...
@property
def target(self) -> str: ...
def ssh_cmd(self) -> list[str]: ...
class SudoAskpassProxy:
def __init__(self, host: "Remote", prompt_command: list[str]) -> None:
def __init__(self, host: "RemoteP", prompt_command: list[str]) -> None:
self.host = host
self.password_prompt_command = prompt_command
self.ssh_process: subprocess.Popen | None = None

View File

@@ -1,7 +0,0 @@
# shellcheck shell=bash
source_up
watch_file flake-module.nix shell.nix default.nix
# Because we depend on nixpkgs sources, uploading to builders takes a long time
use flake .#clan-vm-manager --builders ''

View File

@@ -1 +0,0 @@
**/.vscode

View File

@@ -1,8 +0,0 @@
{
"clientRemote": "",
"gitRemote": "",
"gitSha": "",
"treeEntries": [],
"auditedFiles": [],
"resolvedEntries": []
}

View File

@@ -1,94 +0,0 @@
# Clan VM Manager
Provides users with the simple functionality to manage their locally registered clans.
![app-preview](screenshots/image.png)
## Available commands
Run this application
```bash
./bin/clan-vm-manager
```
Join the default machine of a clan
```bash
./bin/clan-vm-manager [clan-uri]
```
Join a specific machine of a clan
```bash
./bin/clan-vm-manager [clan-uri]#[machine]
```
For more available commands see the developer section below.
## Developing this Application
### Debugging Style and Layout
```bash
# Enable the GTK debugger
gsettings set org.gtk.Settings.Debug enable-inspector-keybinding true
# Start the application with the debugger attached
GTK_DEBUG=interactive ./bin/clan-vm-manager --debug
```
Appending `--debug` flag enables debug logging printed into the console.
### Profiling
To activate profiling you can run
```bash
CLAN_CLI_PERF=1 ./bin/clan-vm-manager
```
### Library Components
> Note:
>
> we recognized bugs when starting some cli-commands through the integrated vs-code terminal.
> If encountering issues make sure to run commands in a regular os-shell.
lib-Adw has a demo application showing all widgets. You can run it by executing
```bash
adwaita-1-demo
```
GTK4 has a demo application showing all widgets. You can run it by executing
```bash
gtk4-widget-factory
```
To find available icons execute
```bash
gtk4-icon-browser
```
### Links
Here are some important documentation links related to the Clan VM Manager:
- [Adw PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Adw-1): This link provides the PyGObject reference documentation for the Adw library, which is used in the Clan VM Manager. It contains detailed information about the Adw widgets and their usage.
- [GTK4 PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Gtk-4.0): This link provides the PyGObject reference documentation for GTK4, the toolkit used for building the user interface of the Clan VM Manager. It includes information about GTK4 widgets, signals, and other features.
- [Adw Widget Gallery](https://gnome.pages.gitlab.gnome.org/libadwaita/doc/main/widget-gallery.html): This link showcases a widget gallery for Adw, allowing you to see the available widgets and their visual appearance. It can be helpful for designing the user interface of the Clan VM Manager.
- [Python + GTK3 Tutorial](https://python-gtk-3-tutorial.readthedocs.io/en/latest/textview.html): Although the Clan VM Manager uses GTK4, this tutorial for GTK3 can still be useful as it covers the basics of building GTK-based applications with Python. It includes examples and explanations for various GTK widgets, including text views.
- [GNOME Human Interface Guidelines](https://developer.gnome.org/hig/): This link provides the GNOME Human Interface Guidelines, which offer design and usability recommendations for creating GNOME applications. It covers topics such as layout, navigation, and interaction patterns.
## Error handling
> Error dialogs should be avoided where possible, since they are disruptive.
>
> For simple non-critical errors, toasts can be a good alternative.

View File

@@ -1,13 +0,0 @@
#!/usr/bin/env python3
import sys
from pathlib import Path
module_path = Path(__file__).parent.parent.absolute()
sys.path.insert(0, str(module_path))
sys.path.insert(0, str(module_path.parent / "clan_cli"))
from clan_vm_manager import main # NOQA
if __name__ == "__main__":
main()

View File

@@ -1,46 +0,0 @@
{
"folders": [
{
"path": "."
},
{
"path": "../clan-cli/clan_cli"
},
{
"path": "../clan-cli/tests"
},
{
"path": "../../nixosModules"
},
{
"path": "../../lib/modules"
},
{
"path": "../../../democlan"
}
],
"settings": {
"python.linting.mypyEnabled": true,
"files.exclude": {
"**/__pycache__": true,
"**/.direnv": true,
"**/.hypothesis": true,
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/result/**": true,
"/nix/store/**": true
},
"search.exclude": {
"**/__pycache__": true,
"**/.direnv": true,
"**/.hypothesis": true,
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/result/": true,
"/nix/store/**": true
},
"files.autoSave": "off"
}
}

View File

@@ -1,14 +0,0 @@
import logging
import sys
from clan_cli.profiler import profile
from clan_vm_manager.app import MainApplication
log = logging.getLogger(__name__)
@profile
def main(argv: list[str] = sys.argv) -> int:
app = MainApplication()
return app.run(argv)

View File

@@ -1,6 +0,0 @@
import sys
from . import main
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,126 +0,0 @@
#!/usr/bin/env python3
import logging
from typing import Any, ClassVar
import gi
from clan_vm_manager import assets
from clan_vm_manager.singletons.toast import InfoToast, ToastOverlay
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from clan_lib.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, Gtk
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.singletons.use_join import GLib, GObject
from .windows.main_window import MainWindow
log = logging.getLogger(__name__)
class MainApplication(Adw.Application):
"""
This class is initialized every time the app is started
Only the Adw.ApplicationWindow is a singleton.
So don't use any singletons in the Adw.Application class.
"""
__gsignals__: ClassVar = {
"join_request": (GObject.SignalFlags.RUN_FIRST, None, [str]),
}
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(
application_id="org.clan.vm-manager",
flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE,
)
self.add_main_option(
"debug",
ord("d"),
GLib.OptionFlags.NONE,
GLib.OptionArg.NONE,
"enable debug mode",
None,
)
self.window: MainWindow | None = None
self.connect("activate", self.on_activate)
self.connect("shutdown", self.on_shutdown)
def on_shutdown(self, source: "MainApplication") -> None:
log.debug("Shutting down Adw.Application")
if self.get_windows() == []:
log.warning("No windows to destroy")
if self.window:
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
# self.get_windows() returns an empty list. Desync between window and application?
self.window.close()
# Killing vms directly. This is dirty
self.window.kill_vms()
else:
log.error("No window to destroy")
def do_command_line(self, command_line: Any) -> int:
options = command_line.get_options_dict()
# convert GVariantDict -> GVariant -> dict
options = options.end().unpack()
if "debug" in options and self.window is None:
setup_logging(logging.DEBUG)
elif self.window is None:
setup_logging(logging.INFO)
log.debug("Debug logging enabled")
if "debug" in options:
ToastOverlay.use().add_toast_unique(
InfoToast("Debug logging enabled").toast, "info.debugging.enabled"
)
args = command_line.get_arguments()
self.activate()
if len(args) > 1:
uri = args[1]
self.emit("join_request", uri)
return 0
def on_window_hide_unhide(self, *_args: Any) -> None:
if not self.window:
log.error("No window to hide/unhide")
return
if self.window.is_visible():
self.window.hide()
else:
self.window.present()
def dummy_menu_entry(self) -> None:
log.info("Dummy menu entry called")
def on_activate(self, source: "MainApplication") -> None:
if not self.window:
self.init_style()
self.window = MainWindow(config=ClanConfig(initial_view="list"))
self.window.set_application(self)
self.window.show()
# TODO: For css styling
def init_style(self) -> None:
resource_path = assets.loc / "style.css"
log.debug(f"Style css path: {resource_path}")
css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(resource_path))
display = Gdk.Display.get_default()
assert display is not None
Gtk.StyleContext.add_provider_for_display(
display,
css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
)

View File

@@ -1,7 +0,0 @@
from pathlib import Path
loc: Path = Path(__file__).parent
def get_asset(name: str | Path) -> Path:
return loc / name

Binary file not shown.

Before

Width:  |  Height:  |  Size: 95 KiB

View File

@@ -1,63 +0,0 @@
/* Insert custom styles here */
navigation-view {
padding: 5px;
/* padding-left: 5px;
padding-right: 5px;
padding-bottom: 5px; */
}
avatar {
margin: 2px;
}
.trust {
padding-top: 25px;
padding-bottom: 25px;
}
.join-list {
margin-top: 1px;
margin-left: 2px;
margin-right: 2px;
}
.progress-bar {
margin-right: 25px;
min-width: 200px;
}
.group-list {
background-color: inherit;
}
.group-list > .activatable:hover {
background-color: unset;
}
.group-list > row {
margin-top: 12px;
border-bottom: unset;
}
.vm-list {
margin-top: 25px;
margin-bottom: 25px;
}
.no-shadow {
box-shadow: none;
}
.search-entry {
margin-bottom: 12px;
}
searchbar {
margin-bottom: 25px;
}
.log-view {
margin-top: 12px;
font-family: monospace;
padding: 8px;
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 375 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 717 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 717 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 KiB

View File

@@ -1,61 +0,0 @@
# Import the urllib.parse, enum and dataclasses modules
import urllib.parse
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from clan_lib.flake import Flake
# Define the ClanURI class
@dataclass
class ClanURI:
flake: Flake
machine_name: str
def get_url(self) -> str:
return str(self.flake)
@classmethod
def from_str(
cls,
url: str,
machine_name: str | None = None,
) -> "ClanURI":
uri = url
if machine_name:
uri += f"#{machine_name}"
# Users might copy whitespace along with the URI
uri = uri.strip()
# Check if the URI starts with clan://
# If it does, remove the clan:// prefix
prefix = "clan://"
if uri.startswith(prefix):
uri = uri[len(prefix) :]
# Fix missing colon (caused by browsers like Firefox)
if "//" in uri and ":" not in uri.split("//", 1)[0]:
# If there's a `//` but no colon before it, add one before the `//`
parts = uri.split("//", 1)
uri = f"{parts[0]}://{parts[1]}"
# Parse the URI into components
# url://netloc/path;parameters?query#fragment
components: urllib.parse.ParseResult = urllib.parse.urlparse(uri)
# Replace the query string in the components with the new query string
clean_comps = components._replace(query=components.query, fragment="")
# Parse the URL into a ClanUrl object
if clean_comps.path and Path(clean_comps.path).exists():
flake = Flake(clean_comps.path)
else:
flake = Flake(clean_comps.geturl())
machine_name = "defaultVM"
if components.fragment:
machine_name = components.fragment
return cls(flake, machine_name)

View File

@@ -1,143 +0,0 @@
import dataclasses
import inspect
import logging
import multiprocessing as mp
import os
import signal
import sys
import traceback
from collections.abc import Callable
from pathlib import Path
from typing import Any, get_type_hints
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with out_file.open("w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Validate kwargs against the signature of func
func_signature = inspect.signature(func)
bound_args = func_signature.bind_partial(**kwargs)
bound_args.apply_defaults() # Ensure defaults are applied to missing args
# Type-check kwargs against func's annotations
type_hints = get_type_hints(func)
for arg_name, arg_value in kwargs.items():
if arg_name in type_hints:
expected_type = type_hints[arg_name]
if not isinstance(arg_value, expected_type):
msg = f"Argument '{arg_name}' must be of type {expected_type}, but got type {type(arg_value)}"
raise TypeError(msg)
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@@ -1,230 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any, TypeVar
import gi
gi.require_version("Gio", "2.0")
from clan_lib.errors import ClanError
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
# GObject and Gio.ListModel are not compatible with mypy, so we need to ignore the errors
# clan_vm_manager/components/gkvstore.py:21: error: Definition of "newv" in base class "Object" is incompatible with definition in base class "GInterface" [misc]
# clan_vm_manager/components/gkvstore.py:21: error: Definition of "install_properties" in base class "Object" is incompatible with definition in base class "GInterface" [misc]
# clan_vm_manager/components/gkvstore.py:21: error: Definition of "getv" in base class "Object" is incompatible with definition in base class "GInterface" [misc]
class GKVStore[K, V: GObject.Object](GObject.GObject, Gio.ListModel): # type: ignore[misc]
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
# From Python 3.7 onwards dictionaries are ordered by default
self._items: dict[K, V] = {}
##################################
# #
# Gio.ListStore Interface #
# #
##################################
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
def find_with_equal_func(
self, item: V, equal_func: Callable[[V, V], bool]
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item):
return True, i
return False, -1
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item, user_data):
return True, i
return False, -1
def insert(self, position: int, item: V) -> None:
log.warning("Inserting is O(n) in GKVStore. Better use append")
log.warning(
"This functions may have incorrect items_changed signal behavior. Please test it"
)
key = self.key_gen(item)
if key in self._items:
msg = "Key already exists in the dictionary"
raise ClanError(msg)
if position < 0 or position > len(self._items):
msg = "Index out of range"
raise IndexError(msg)
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for _i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None:
msg = "insert_sorted is not implemented in GKVStore"
raise NotImplementedError(msg)
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
msg = "sort is not implemented in GKVStore"
raise NotImplementedError(msg)
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
msg = "splice is not implemented in GKVStore"
raise NotImplementedError(msg)
##################################
# #
# Gio.ListModel Interface #
# #
##################################
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> Any:
return self.gtype.__gtype__ # type: ignore[attr-defined]
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
##################################
# #
# Dict Interface #
# #
##################################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
log.debug("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
else:
# Add the new key-value pair
self._items[key] = value
position = max(len(self._items) - 1, 0)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V: # type: ignore[override]
return self._items[key]
def __contains__(self, key: K) -> bool: # type: ignore[override]
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()
##################################
# #
# Custom Methods #
# #
##################################
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]
def register_on_change(
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
) -> None:
self.connect("items-changed", callback)

View File

@@ -1,10 +0,0 @@
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "4.0")
@dataclass
class ClanConfig:
initial_view: str

View File

@@ -1,75 +0,0 @@
import logging
from collections.abc import Callable
from typing import TypeVar
import gi
from clan_vm_manager import assets
gi.require_version("Adw", "1")
from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
class EmptySplash(Gtk.Box):
def __init__(self, on_join: Callable[[str], None]) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.on_join = on_join
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png")))
if clan_icon:
image = Gtk.Image.new_from_pixbuf(clan_icon)
else:
image = Gtk.Image.new_from_icon_name("image-missing")
# same as the clamp
image.set_pixel_size(400)
image.set_opacity(0.5)
image.set_margin_top(20)
image.set_margin_bottom(10)
vbox.append(image)
empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.")
join_entry = Gtk.Entry()
join_entry.set_placeholder_text("clan://<url>")
join_entry.set_hexpand(True)
join_button = Gtk.Button(label="Join")
join_button.connect("clicked", self._on_join, join_entry)
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
clamp = Adw.Clamp()
clamp.set_maximum_size(400)
clamp.set_margin_bottom(40)
vbox.append(empty_label)
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
hbox.append(join_entry)
hbox.append(join_button)
vbox.append(hbox)
clamp.set_child(vbox)
self.append(clamp)
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
except Exception:
log.exception("Failed to load image")
return None
else:
return pixbuf
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
"""
Callback for the join button
Extracts the text from the entry and calls the on_join callback
"""
log.info(f"Splash screen: Joining {entry.get_text()}")
self.on_join(entry.get_text())

File diff suppressed because it is too large Load Diff

View File

@@ -1,379 +0,0 @@
import datetime
import logging
import multiprocessing as mp
import os
import tempfile
import threading
import time
import weakref
from collections.abc import Callable, Generator
from contextlib import contextmanager
from pathlib import Path
from typing import IO, ClassVar
import gi
from clan_cli import vms
from clan_cli.vms.inspect import inspect_vm
from clan_cli.vms.qemu import QMPWrapper
from clan_lib.dirs import vm_state_dir
from clan_lib.machines.machines import Machine
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.executor import MPProcess, spawn
from clan_vm_manager.history import HistoryEntry
from clan_vm_manager.singletons.toast import (
InfoToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
class VMObject(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]),
}
def __init__(
self,
icon: Path,
data: HistoryEntry,
build_log_cb: Callable[[Gio.File], None],
) -> None:
super().__init__()
# Store the data from the history entry
self.data: HistoryEntry = data
self.build_log_cb = build_log_cb
# Create a process object to store the VM process
self.vm_process: MPProcess = MPProcess(
"vm_dummy", mp.Process(), Path("./dummy")
)
self.build_process: MPProcess = MPProcess(
"build_dummy", mp.Process(), Path("./dummy")
)
self._start_thread: threading.Thread = threading.Thread()
self.machine: Machine | None = None
self.qmp_wrap: QMPWrapper | None = None
# Watcher to stop the VM
self.KILL_TIMEOUT: int = 20 # seconds
self._stop_thread: threading.Thread = threading.Thread()
# Build progress bar vars
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
self.progress_bar.hide()
self.progress_bar.set_hexpand(True) # Horizontally expand
self.prog_bar_id: int = 0
# Create a temporary directory to store the logs
self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
)
self._logs_id: int = 0
self._log_file: IO[str] | None = None
# To be able to set the switch state programmatically
# we need to store the handler id returned by the connect method
# and block the signal while we change the state. This is cursed.
self.switch: Gtk.Switch = Gtk.Switch()
self.switch_handler_id: int = self.switch.connect(
"notify::active", self._on_switch_toggle
)
self.connect("vm_status_changed", self._on_vm_status_changed)
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop)
def _vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed")
return GLib.SOURCE_REMOVE
def update(self, data: HistoryEntry) -> None:
self.data = data
def _on_vm_status_changed(self, source: "VMObject") -> None:
# Signal may be emitted multiple times
self.emit("vm_build_notify", self.is_building(), self.is_running())
prev_state = self.switch.get_state()
next_state = self.is_running() and not self.is_building()
self.switch.set_state(next_state)
if prev_state is False and next_state is True:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"{source.data.flake.flake_attr} started").toast,
"success.vm.start",
)
if self.switch.get_sensitive() is False and not self.is_building():
self.switch.set_sensitive(True)
exit_vm = self.vm_process.proc.exitcode
exit_build = self.build_process.proc.exitcode
exitc = exit_vm or exit_build
if not self.is_running() and exitc != 0:
with self.switch.handler_block(self.switch_handler_id):
self.switch.set_active(False)
log.error(f"VM exited with error. Exitcode: {exitc}")
ToastOverlay.use().add_toast_unique(
WarningToast(f"VM exited with error. Exitcode: {exitc}").toast,
"warning.vm.exit",
)
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
if switch.get_active():
switch.set_state(False)
switch.set_sensitive(False)
self.start()
else:
switch.set_state(True)
self.shutdown()
switch.set_sensitive(False)
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def _create_machine(self) -> Generator[Machine]:
uri = ClanURI.from_str(
url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr
)
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake,
)
assert self.machine is not None
state_dir = vm_state_dir(
flake_url=self.machine.flake.identifier, vm_name=self.machine.name
)
self.qmp_wrap = QMPWrapper(state_dir)
assert self.machine is not None
yield self.machine
self.machine = None
def _pulse_progress_bar_task(self) -> bool:
if self.progress_bar.is_visible():
self.progress_bar.pulse()
return GLib.SOURCE_CONTINUE
return GLib.SOURCE_REMOVE
def __start(self) -> None:
with self._create_machine() as machine:
# Start building VM
tstart = datetime.datetime.now(tz=datetime.UTC)
log.info(f"Building VM {self.get_id()}")
log_dir = Path(str(self.log_dir.name))
# Start the build process
self.build_process = spawn(
on_except=None,
out_file=log_dir / "build.log",
func=vms.run.build_vm,
machine=machine,
tmpdir=log_dir,
)
gfile = Gio.File.new_for_path(str(log_dir / "build.log"))
# Gio documentation:
# Obtains a file monitor for the given file.
# If no file notification mechanism exists, then regular polling of the file is used.
g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None)
g_monitor.connect("changed", self.on_logs_changed)
GLib.idle_add(self._vm_status_changed_task)
self.switch.set_sensitive(True)
# Start the logs watcher
self._logs_id = GLib.timeout_add(
50, self._get_logs_task, self.build_process
)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.build_process.out_file}")
# Start the progress bar and show it
self.progress_bar.show()
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task)
if self.prog_bar_id == 0:
log.error("Couldn't spawn a progress bar task")
# Wait for the build to finish then hide the progress bar
self.build_process.proc.join()
tend = datetime.datetime.now(tz=datetime.UTC)
log.info(f"VM {self.get_id()} build took {tend - tstart}s")
self.progress_bar.hide()
# Check if the VM was built successfully
if self.build_process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
return
log.info(f"Successfully built VM {self.get_id()}")
vm_config = inspect_vm(machine)
# Start the VM
self.vm_process = spawn(
on_except=None,
out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm,
vm_config=vm_config,
runtime_config=vms.run.RuntimeConfig(),
)
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
# Wait for the VM to stop
self.vm_process.proc.join()
log.debug(f"VM {self.get_id()} has stopped")
GLib.idle_add(self._vm_status_changed_task)
def on_logs_changed(
self,
monitor: Gio.FileMonitor,
file: Gio.File,
other_file: Gio.File,
event_type: Gio.FileMonitorEvent,
) -> None:
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
# File was changed and the changes were written to disk
# wire up the callback for setting the logs
self.build_log_cb(file)
def start(self) -> None:
if self.is_running():
log.warning("VM is already running. Ignoring start request")
self.emit("vm_status_changed", self)
return
log.debug(f"VM state dir {self.log_dir.name}")
self._start_thread = threading.Thread(target=self.__start)
self._start_thread.start()
def _get_logs_task(self, proc: MPProcess) -> bool:
if not proc.out_file.exists():
return GLib.SOURCE_CONTINUE
if not self._log_file:
try:
self._log_file = Path(proc.out_file).open() # noqa: SIM115
except Exception:
log.exception(f"Failed to open log file {proc.out_file}")
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
if not proc.proc.is_alive():
log.debug("Removing logs watcher")
self._log_file = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
def is_running(self) -> bool:
return self._start_thread.is_alive()
def is_building(self) -> bool:
return self.build_process.proc.is_alive()
def is_shutting_down(self) -> bool:
return self._stop_thread.is_alive()
def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.datetime.now(tz=datetime.UTC)
while self.is_running():
diff = datetime.datetime.now(tz=datetime.UTC) - start_time
if diff.seconds > self.KILL_TIMEOUT:
log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
)
self.vm_process.kill_group()
break
if self.is_building():
log.info(f"VM {self.get_id()} is still building. Killing it")
self.build_process.kill_group()
break
if not self.machine:
log.error(f"Machine object is None. Killing VM {self.get_id()}")
self.vm_process.kill_group()
break
# Try to shutdown the VM gracefully using QMP
try:
assert self.qmp_wrap is not None
with self.qmp_wrap.qmp_ctx() as qmp:
qmp.command("system_powerdown")
except Exception as ex:
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
# Try 20 times to stop the VM
time.sleep(self.KILL_TIMEOUT / 20)
GLib.idle_add(self._vm_status_changed_task)
log.debug(f"VM {self.get_id()} has stopped")
ToastOverlay.use().add_toast_unique(
InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit"
)
def shutdown(self) -> None:
if not self.is_running():
log.warning("VM not running. Ignoring shutdown request.")
self.emit("vm_status_changed", self)
return
if self.is_shutting_down():
log.warning("Shutdown already in progress")
self.emit("vm_status_changed", self)
return
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def _kill_ref_drop(self) -> None:
if self.is_running():
log.warning("Killing VM due to reference drop")
self.kill()
def kill(self) -> None:
if not self.is_running():
log.warning(f"Tried to kill VM {self.get_id()} is not running")
return
log.info(f"Killing VM {self.get_id()} now")
if self.vm_process.proc.is_alive():
self.vm_process.kill_group()
if self.build_process.proc.is_alive():
self.build_process.kill_group()
def read_whole_log(self) -> str:
if not self.vm_process.out_file.exists():
log.error(f"Log file {self.vm_process.out_file} does not exist")
return ""
return self.vm_process.out_file.read_text()
def __str__(self) -> str:
return f"VM({self.get_id()})"
def __repr__(self) -> str:
return self.__str__()

View File

@@ -1,159 +0,0 @@
import argparse
import dataclasses
import datetime
import json
import logging
from typing import Any
from clan_cli.clan.inspect import FlakeConfig, inspect_flake
from clan_lib.dirs import user_history_file
from clan_lib.errors import ClanError
from clan_lib.flake import Flake
from clan_lib.locked_open import read_history_file, write_history_file
from clan_lib.machines.list import list_machines
from clan_vm_manager.clan_uri import ClanURI
log = logging.getLogger(__name__)
@dataclasses.dataclass
class HistoryEntry:
last_used: str
flake: FlakeConfig
settings: dict[str, Any] = dataclasses.field(default_factory=dict)
@classmethod
def from_json(cls: type["HistoryEntry"], data: dict[str, Any]) -> "HistoryEntry":
return cls(
last_used=data["last_used"],
flake=FlakeConfig.from_json(data["flake"]),
settings=data.get("settings", {}),
)
def _merge_dicts(d1: dict, d2: dict) -> dict:
# create a new dictionary that copies d1
merged = dict(d1)
# iterate over the keys and values of d2
for key, value in d2.items():
# if the key is in d1 and both values are dictionaries, merge them recursively
if key in d1 and isinstance(d1[key], dict) and isinstance(value, dict):
merged[key] = _merge_dicts(d1[key], value)
# otherwise, update the value of the key in the merged dictionary
else:
merged[key] = value
# return the merged dictionary
return merged
def list_history() -> list[HistoryEntry]:
logs: list[HistoryEntry] = []
if not user_history_file().exists():
return []
try:
parsed = read_history_file()
for i, p in enumerate(parsed.copy()):
# Everything from the settings dict is merged into the flake dict, and can override existing values
parsed[i] = _merge_dicts(p, p.get("settings", {}))
logs = [HistoryEntry.from_json(p) for p in parsed]
except (json.JSONDecodeError, TypeError) as ex:
msg = f"History file at {user_history_file()} is corrupted"
raise ClanError(msg) from ex
return logs
def new_history_entry(url: str, machine: str) -> HistoryEntry:
flake = inspect_flake(url, machine)
return HistoryEntry(
flake=flake,
last_used=datetime.datetime.now(tz=datetime.UTC).isoformat(),
)
def add_all_to_history(uri: ClanURI) -> list[HistoryEntry]:
history = list_history()
new_entries: list[HistoryEntry] = []
for machine in list_machines(Flake(uri.get_url())):
new_entry = _add_maschine_to_history_list(uri.get_url(), machine, history)
new_entries.append(new_entry)
write_history_file(history)
return new_entries
def add_history(uri: ClanURI) -> HistoryEntry:
user_history_file().parent.mkdir(parents=True, exist_ok=True)
history = list_history()
new_entry = _add_maschine_to_history_list(uri.get_url(), uri.machine_name, history)
write_history_file(history)
return new_entry
def _add_maschine_to_history_list(
uri_path: str, uri_machine: str, entries: list[HistoryEntry]
) -> HistoryEntry:
for new_entry in entries:
if (
new_entry.flake.flake_url == str(uri_path)
and new_entry.flake.flake_attr == uri_machine
):
new_entry.last_used = datetime.datetime.now(tz=datetime.UTC).isoformat()
return new_entry
new_entry = new_history_entry(uri_path, uri_machine)
entries.append(new_entry)
return new_entry
def add_history_command(args: argparse.Namespace) -> None:
if args.all:
add_all_to_history(args.uri)
else:
add_history(args.uri)
def list_history_command(args: argparse.Namespace) -> None:
res: dict[str, list[HistoryEntry]] = {}
for history_entry in list_history():
url = str(history_entry.flake.flake_url)
if res.get(url) is None:
res[url] = []
res[url].append(history_entry)
for flake_url, entries in res.items():
print(flake_url)
for entry in entries:
d = datetime.datetime.fromisoformat(entry.last_used)
last_used = d.strftime("%d/%m/%Y %H:%M:%S")
print(f" {entry.flake.flake_attr} ({last_used})")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="clan history",
description="Manage clan history",
)
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
add_parser = subparser.add_parser("add", help="Add a clan flake")
add_parser.add_argument(
"uri", type=ClanURI.from_str, help="Path to the flake", default="."
)
add_parser.add_argument(
"--all", help="Add all machines", default=False, action="store_true"
)
add_parser.set_defaults(func=add_history_command)
list_parser = subparser.add_parser("list", help="List recently used flakes")
list_parser.set_defaults(func=list_history_command)
return parser.parse_args()
def main() -> None:
args = parse_args()
args.func(args)

View File

@@ -1,151 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.views.logs import Logs
log = logging.getLogger(__name__)
class ToastOverlay:
"""
The ToastOverlay is a class that manages the display of toasts
It should be used as a singleton in your application to prevent duplicate toasts
Usage
"""
# For some reason, the adw toast overlay cannot be subclassed
# Thats why it is added as a class property
overlay: Adw.ToastOverlay
active_toasts: set[str]
_instance: "None | ToastOverlay" = None
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ToastOverlay":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.overlay = Adw.ToastOverlay()
cls.active_toasts = set()
return cls._instance
def add_toast_unique(self, toast: Adw.Toast, key: str) -> None:
if key not in self.active_toasts:
self.active_toasts.add(key)
self.overlay.add_toast(toast)
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class ErrorToast:
toast: Adw.Toast
def __init__(
self, message: str, persistent: bool = False, details: str = ""
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""<span foreground='red'>❌ Error </span> {message}"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.HIGH)
self.toast.set_button_label("Show more")
if persistent:
self.toast.set_timeout(0)
views = ViewStack.use().view
# we cannot check this type, python is not smart enough
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view.set_message(details)
self.toast.connect(
"button-clicked",
lambda _: views.set_visible_child_name("logs"),
)
class WarningToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"<span foreground='orange'>⚠ Warning </span> {message}"
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class InfoToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span>❕</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class SuccessToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span foreground='green'>✅</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class LogToast:
toast: Adw.Toast
def __init__(
self,
message: str,
on_button_click: Callable[[], None],
button_label: str = "More",
persistent: bool = False,
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""Logs are available <span weight="regular">{message}</span>"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
self.toast.set_button_label(button_label)
self.toast.connect(
"button-clicked",
lambda _: on_button_click(),
)

View File

@@ -1,115 +0,0 @@
import logging
import threading
from collections.abc import Callable
from typing import Any, ClassVar, cast
import gi
from clan_lib.machines.machines import Machine
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.history import HistoryEntry, add_history
from clan_vm_manager.singletons.use_vms import ClanStore
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class JoinValue(GObject.Object):
__gsignals__: ClassVar = {
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
}
url: ClanURI
entry: HistoryEntry | None
def _join_finished_task(self) -> bool:
self.emit("join_finished")
return GLib.SOURCE_REMOVE
def __init__(self, url: ClanURI) -> None:
super().__init__()
self.url: ClanURI = url
self.entry: HistoryEntry | None = None
def __join(self) -> None:
new_entry = add_history(self.url)
self.entry = new_entry
GLib.idle_add(self._join_finished_task)
def join(self) -> None:
threading.Thread(target=self.__join).start()
class JoinList:
"""
This is a singleton.
It is initialized with the first call of use()
"""
_instance: "None | JoinList" = None
list_store: Gio.ListStore
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "JoinList":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(JoinValue)
ClanStore.use().register_on_deep_change(cls._instance.rerender_join_list)
return cls._instance
def rerender_join_list(
self, source: GKVStore, position: int, removed: int, added: int
) -> None:
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
)
def is_empty(self) -> bool:
return self.list_store.get_n_items() == 0
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
"""
Add a join request.
This method can add multiple join requests if called subsequently for each request.
"""
value = JoinValue(uri)
machine_id = Machine(uri.machine_name, uri.flake)
machine_id_list = []
for machine_obj in self.list_store:
mvalue: ClanURI = cast(JoinValue, machine_obj).url
machine = Machine(mvalue.machine_name, mvalue.flake)
machine_id_list.append(machine.get_id())
if machine_id in machine_id_list:
log.info(f"Join request already exists: {value.url}. Ignoring.")
return
value.connect("join_finished", self._on_join_finished)
value.connect("join_finished", after_join)
self.list_store.append(value)
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
assert source.entry is not None
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:
(has, idx) = self.list_store.find(value)
if has:
self.list_store.remove(idx)

View File

@@ -1,37 +0,0 @@
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
class ViewStack:
"""
This is a singleton.
It is initialized with the first call of use()
Usage:
ViewStack.use().set_visible()
ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
"""
_instance: "None | ViewStack" = None
view: Adw.ViewStack
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ViewStack":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.view = Adw.ViewStack()
return cls._instance

View File

@@ -1,187 +0,0 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, ClassVar
import gi
from clan_lib.flake import Flake
from clan_lib.machines.machines import Machine
from clan_vm_manager import assets
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.components.vmobj import VMObject
from clan_vm_manager.history import HistoryEntry
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.views.logs import Logs
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class VMStore(GKVStore):
def __init__(self) -> None:
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
class Emitter(GObject.GObject):
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
class ClanStore:
_instance: "None | ClanStore" = None
_clan_store: GKVStore[Flake, VMStore]
_emitter: Emitter
# set the vm that is outputting logs
# build logs are automatically streamed to the logs-view
_logging_vm: VMObject | None = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ClanStore":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
cls._emitter = Emitter()
return cls._instance
def emit(self, signal: str) -> None:
self._emitter.emit(signal)
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
self._emitter.connect(signal, cb)
def set_logging_vm(self, ident: str) -> VMObject | None:
vm = self.get_vm(ClanURI.from_str(f"clan://{ident}"))
if vm is not None:
self._logging_vm = vm
return self._logging_vm
def register_on_deep_change(
self, callback: Callable[[GKVStore, int, int, int], None]
) -> None:
"""
Register a callback that is called when a clan_store or one of the included VMStores changes
"""
def on_vmstore_change(
store: VMStore, position: int, removed: int, added: int
) -> None:
callback(store, position, removed, added)
def on_clanstore_change(
store: "GKVStore", position: int, removed: int, added: int
) -> None:
if added > 0:
store.values()[position].register_on_change(on_vmstore_change)
callback(store, position, removed, added)
self.clan_store.register_on_change(on_clanstore_change)
@property
def clan_store(self) -> GKVStore[Flake, VMStore]:
return self._clan_store
def create_vm_task(self, vm: HistoryEntry) -> bool:
self.push_history_entry(vm)
return GLib.SOURCE_REMOVE
def push_history_entry(self, entry: HistoryEntry) -> None:
# TODO: We shouldn't do this here but in the list view
if entry.flake.icon is None:
icon: Path = assets.loc / "placeholder.jpeg"
else:
icon = Path(entry.flake.icon)
def log_details(gfile: Gio.File) -> None:
self.log_details(vm, gfile)
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
self.push(vm)
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
views = ViewStack.use().view
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
def file_read_callback(
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
) -> None:
try:
# Finish the asynchronous read operation
res = source_object.load_contents_finish(result)
_success, contents, _etag_out = res
# Convert the byte array to a string and print it
logs_view.set_message(contents.decode("utf-8"))
except Exception as e:
print(f"Error reading file: {e}")
# only one vm can output logs at a time
if vm == self._logging_vm:
gfile.load_contents_async(None, file_read_callback, None)
# we cannot check this type, python is not smart enough
def push(self, vm: VMObject) -> None:
url = vm.data.flake.flake_url
# Only write to the store if the Clan is not already in it
# Every write to the KVStore rerenders bound widgets to the clan_store
if url not in self.clan_store:
log.debug(f"Creating new VMStore for {url}")
vm_store = VMStore()
vm_store.append(vm)
self.clan_store[url] = vm_store
else:
vm_store = self.clan_store[url]
machine = vm.data.flake.flake_attr
old_vm = vm_store.get(machine)
if old_vm:
log.info(
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
)
old_vm.update(vm.data)
else:
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
vm_store.append(vm)
def remove(self, vm: VMObject) -> None:
del self.clan_store[vm.data.flake.flake_url][vm.data.flake.flake_attr]
def get_vm(self, uri: ClanURI) -> None | VMObject:
machine = Machine(uri.machine_name, uri.flake)
vm_store = self.clan_store.get(machine.flake)
if vm_store is None:
return None
vm = vm_store.get(str(machine.name), None)
return vm
def get_running_vms(self) -> list[VMObject]:
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()

View File

@@ -1,61 +0,0 @@
import os
from collections.abc import Callable
from functools import partial
from typing import Any, Literal, TypeVar
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GObject, Gtk
# Define a TypeVar that is bound to GObject.Object
ListItem = TypeVar("ListItem", bound=GObject.Object)
def create_details_list[ListItem: GObject.Object](
model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget]
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class PreferencesValue(GObject.Object):
variant: Literal["CPU", "MEMORY"]
editable: bool
data: Any
def __init__(
self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any
) -> None:
super().__init__()
self.variant = variant
self.editable = editable
self.data = data
class Details(Gtk.Box):
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
preferences_store = Gio.ListStore.new(PreferencesValue)
preferences_store.append(PreferencesValue("CPU", True, 1))
self.details_list = create_details_list(
model=preferences_store, render_row=self.render_entry_row
)
self.append(self.details_list)
def render_entry_row(
self, boxed_list: Gtk.ListBox, item: PreferencesValue
) -> Gtk.Widget:
cores: int | None = os.cpu_count()
fcores = float(cores) if cores else 1.0
row = Adw.SpinRow.new_with_range(0, fcores, 1)
row.set_value(item.data)
return row

View File

@@ -1,358 +0,0 @@
import base64
import logging
from collections.abc import Callable
from functools import partial
from typing import Any, TypeVar
import gi
from clan_lib.errors import ClanError
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.components.list_splash import EmptySplash
from clan_vm_manager.components.vmobj import VMObject
from clan_vm_manager.singletons.toast import (
LogToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
from clan_vm_manager.singletons.use_join import JoinList, JoinValue
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.singletons.use_vms import ClanStore, VMStore
from clan_vm_manager.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
def create_boxed_list[CustomStore: Gio.ListModel, ListItem: GObject.Object](
model: CustomStore,
render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget],
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.add_css_class("no-shadow")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class ClanList(Gtk.Box):
"""
The ClanList
Is the composition of
the ClanListToolbar
the clanListView
# ------------------------ #
# - Tools <Start> <Stop> < Edit> #
# ------------------------ #
# - List Items
# - <...>
# ------------------------#
"""
def __init__(self, config: ClanConfig) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
app.connect("join_request", self.on_join_request)
self.log_label: Gtk.Label = Gtk.Label()
# Add join list
self.join_boxed_list = create_boxed_list(
model=JoinList.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
clan_store = ClanStore.use()
clan_store.connect("is_ready", self.display_splash)
self.group_list = create_boxed_list(
model=clan_store.clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
self.append(self.group_list)
self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x))
def display_splash(self, source: GKVStore) -> None:
print("Displaying splash")
if (
ClanStore.use().clan_store.get_n_items() == 0
and JoinList.use().list_store.get_n_items() == 0
):
self.append(self.splash)
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
self.remove(self.splash)
vm = vm_store.first()
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
grp = Adw.PreferencesGroup()
grp.set_title(vm.data.flake.clan_name)
grp.set_description(str(vm.data.flake.flake_url))
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
app = Gio.Application.get_default()
assert app is not None
app.add_action(add_action)
# menu_model = Gio.Menu()
# TODO: Make this lazy, blocks UI startup for too long
# for vm in machines.list.list_nixos_machines(flake_url=vm.data.flake.flake_url):
# if vm not in vm_store:
# menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
add_button = Gtk.Button()
add_button_content = Adw.ButtonContent.new()
add_button_content.set_label("Add machine")
add_button_content.set_icon_name("list-add-symbolic")
add_button.add_css_class("flat")
add_button.set_child(add_button_content)
# add_button.set_has_frame(False)
# add_button.set_menu_model(menu_model)
# add_button.set_label("Add machine")
box.append(add_button)
grp.set_header_suffix(box)
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
grp.add(vm_list)
return grp
def on_add(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Adding new machine", target)
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
flake = vm.data.flake
row = Adw.ActionRow()
# ====== Display Avatar ======
avatar = Adw.Avatar()
machine_icon = flake.vm.machine_icon
# If there is a machine icon, display it else
# display the clan icon
if machine_icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon)))
elif flake.icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon)))
else:
avatar.set_text(flake.clan_name + " " + flake.flake_attr)
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
# ====== Display Name And Url =====
row.set_title(flake.flake_attr)
row.set_title_lines(1)
row.set_title_selectable(True)
# If there is a machine description, display it else
# display the clan name
if flake.vm.machine_description:
row.set_subtitle(flake.vm.machine_description)
else:
row.set_subtitle(flake.clan_name)
row.set_subtitle_lines(1)
# ==== Display build progress bar ====
build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
build_box.set_valign(Gtk.Align.CENTER)
build_box.append(vm.progress_bar)
build_box.set_homogeneous(False)
row.add_suffix(build_box) # This allows children to have different sizes
# ==== Action buttons ====
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
button_box.set_valign(Gtk.Align.CENTER)
## Drop down menu
open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s"))
open_action.connect("activate", self.on_edit)
action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8")
build_logs_action = Gio.SimpleAction.new(
f"logs.{action_id}", GLib.VariantType.new("s")
)
build_logs_action.connect("activate", self.on_show_build_logs)
build_logs_action.set_enabled(False)
app = Gio.Application.get_default()
assert app is not None
app.add_action(open_action)
app.add_action(build_logs_action)
# set a callback function for conditionally enabling the build_logs action
def on_vm_build_notify(
vm: VMObject, is_building: bool, is_running: bool
) -> None:
build_logs_action.set_enabled(is_building or is_running)
app.add_action(build_logs_action)
if is_building:
ToastOverlay.use().add_toast_unique(
LogToast(
"""Build process running ...""",
on_button_click=lambda: self.show_vm_build_logs(vm.get_id()),
).toast,
f"info.build.running.{vm}",
)
vm.connect("vm_build_notify", on_vm_build_notify)
menu_model = Gio.Menu()
menu_model.append("Edit", f"app.edit::{vm.get_id()}")
menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}")
pref_button = Gtk.MenuButton()
pref_button.set_icon_name("open-menu-symbolic")
pref_button.set_menu_model(menu_model)
button_box.append(pref_button)
## VM switch button
switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
switch_box.set_valign(Gtk.Align.CENTER)
switch_box.append(vm.switch)
button_box.append(switch_box)
row.add_suffix(button_box)
return row
def on_edit(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Editing settings for machine", target)
def on_show_build_logs(self, _: Any, parameter: Any) -> None:
target = parameter.get_string()
self.show_vm_build_logs(target)
def show_vm_build_logs(self, target: str) -> None:
vm = ClanStore.use().set_logging_vm(target)
if vm is None:
msg = f"VM {target} not found"
raise ClanError(msg)
views = ViewStack.use().view
# Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore
if logs is None:
msg = "Logs view not found"
raise ClanError(msg)
name = vm.machine.name if vm.machine else "Unknown"
logs.set_title(f"""📄<span weight="normal"> {name}</span>""")
# initial message. Streaming happens automatically when the file is changed by the build process
logs.set_message(vm.build_process.out_file.read_text())
views.set_visible_child_name("logs")
def render_join_row(
self, boxed_list: Gtk.ListBox, join_val: JoinValue
) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
log.debug("Rendering join row for %s", join_val.url)
row = Adw.ActionRow()
row.set_title(join_val.url.machine_name)
row.set_subtitle(str(join_val.url))
row.add_css_class("trust")
vm = ClanStore.use().get_vm(join_val.url)
# Can't do this here because clan store is empty at this point
if vm is not None:
sub = row.get_subtitle()
assert sub is not None
ToastOverlay.use().add_toast_unique(
WarningToast(
f"""<span weight="regular">{join_val.url.machine_name!s}</span> Already exists. Joining again will update it"""
).toast,
"warning.duplicate.join",
)
row.set_subtitle(
sub + "\nClan already exists. Joining again will update it"
)
avatar = Adw.Avatar()
avatar.set_text(str(join_val.url.machine_name))
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
cancel_button = Gtk.Button(label="Cancel")
cancel_button.add_css_class("error")
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
self.cancel_button = cancel_button
trust_button = Gtk.Button(label="Join")
trust_button.add_css_class("success")
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
box.append(cancel_button)
box.append(trust_button)
row.add_suffix(box)
return row
def on_join_request(self, source: Any, url: str) -> None:
log.debug("Join request: %s", url)
clan_uri = ClanURI.from_str(url)
JoinList.use().push(clan_uri, self.on_after_join)
def on_after_join(self, source: JoinValue) -> None:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"Updated {source.url.machine_name}").toast,
"success.join",
)
# If the join request list is empty disable the shadow artefact
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
source.set_sensitive(False)
self.cancel_button.set_sensitive(False)
value.join()
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
JoinList.use().discard(value)
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")

View File

@@ -1,65 +0,0 @@
import logging
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from clan_vm_manager.singletons.use_views import ViewStack
log = logging.getLogger(__name__)
class Logs(Gtk.Box):
"""
Simple log view
This includes a banner and a text view and a button to close the log and navigate back to the overview
"""
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
self.banner = Adw.Banner.new("")
self.banner.set_use_markup(True)
self.banner.set_revealed(True)
self.banner.set_button_label("Close")
self.banner.connect(
"button-clicked",
lambda _: ViewStack.use().view.set_visible_child_name("list"),
)
self.text_view = Gtk.TextView()
self.text_view.set_editable(False)
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
self.text_view.add_css_class("log-view")
self.append(self.banner)
self.append(self.text_view)
def set_title(self, title: str) -> None:
self.banner.set_title(title)
def set_message(self, message: str) -> None:
"""
Set the log message. This will delete any previous message
"""
buffer = self.text_view.get_buffer()
buffer.set_text(message)
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
def append_message(self, message: str) -> None:
"""
Append to the end of a potentially existent log message
"""
buffer = self.text_view.get_buffer()
end_iter = buffer.get_end_iter()
buffer.insert(end_iter, message) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)

View File

@@ -1,88 +0,0 @@
import logging
import threading
import gi
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.history import list_history
from clan_vm_manager.singletons.toast import ToastOverlay
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.singletons.use_vms import ClanStore
from clan_vm_manager.views.details import Details
from clan_vm_manager.views.list import ClanList
from clan_vm_manager.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GLib, Gtk
from clan_vm_manager.components.trayicon import TrayIcon
log = logging.getLogger(__name__)
class MainWindow(Adw.ApplicationWindow):
def __init__(self, config: ClanConfig) -> None:
super().__init__()
self.set_title("Clan Manager")
self.set_default_size(980, 850)
overlay = ToastOverlay.use().overlay
view = Adw.ToolbarView()
overlay.set_child(view)
self.set_content(overlay)
header = Adw.HeaderBar()
view.add_top_bar(header)
app = Gio.Application.get_default()
assert app is not None
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore
threading.Thread(target=self._populate_vms).start()
# Initialize all views
stack_view = ViewStack.use().view
# @hsjobeki: Do not remove clamp it is needed to limit the width
clamp = Adw.Clamp()
clamp.set_child(stack_view)
clamp.set_maximum_size(1000)
scroll = Gtk.ScrolledWindow()
scroll.set_propagate_natural_height(True)
scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
scroll.set_child(clamp)
stack_view.add_named(ClanList(config), "list")
stack_view.add_named(Details(), "details")
stack_view.add_named(Logs(), "logs")
stack_view.set_visible_child_name(config.initial_view)
view.set_content(scroll)
self.connect("destroy", self.on_destroy)
def _set_clan_store_ready(self) -> bool:
ClanStore.use().emit("is_ready")
return GLib.SOURCE_REMOVE
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
GLib.idle_add(ClanStore.use().create_vm_task, entry)
GLib.idle_add(self._set_clan_store_ready)
def kill_vms(self) -> None:
log.debug("Killing all VMs")
ClanStore.use().kill_all()
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.info("====Destroying Adw.ApplicationWindow===")
ClanStore.use().kill_all()
self.tray_icon.destroy()

View File

@@ -1,164 +0,0 @@
{
adwaita-icon-theme,
clan-cli,
copyDesktopItems,
fontconfig,
gobject-introspection,
gtk4,
libadwaita,
makeDesktopItem,
pygobject-stubs,
pygobject3,
pytest, # Testing framework
pytest-subprocess, # fake the real subprocess behavior to make your tests more independent.
pytest-timeout, # Add timeouts to your tests
pytest-xdist, # Run tests in parallel on multiple cores
buildPythonApplication,
runCommand,
setuptools,
webkitgtk_6_0,
wrapGAppsHook,
python,
lib,
stdenv,
}:
let
source = ./.;
desktop-file = makeDesktopItem {
name = "org.clan.vm-manager";
exec = "clan-vm-manager %u";
icon = "clan-white";
desktopName = "Clan Manager";
startupWMClass = "clan";
mimeTypes = [ "x-scheme-handler/clan" ];
};
# Dependencies that are directly used in the project but nor from internal python packages
externalPythonDeps = [
pygobject3
pygobject-stubs
gtk4
libadwaita
adwaita-icon-theme
]
++ clan-cli.propagatedBuildInputs
++ lib.optionals (!stdenv.isDarwin) [
webkitgtk_6_0
];
# Deps including python packages from the local project
allPythonDeps = [ (python.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps;
# Runtime binary dependencies required by the application
runtimeDependencies = [
];
# Dependencies required for running tests
externalTestDeps =
externalPythonDeps
++ runtimeDependencies
++ [
pytest # Testing framework
pytest-subprocess # fake the real subprocess behavior to make your tests more independent.
pytest-xdist # Run tests in parallel on multiple cores
pytest-timeout # Add timeouts to your tests
];
# Dependencies required for running tests
testDependencies = runtimeDependencies ++ allPythonDeps ++ externalTestDeps;
# Setup Python environment with all dependencies for running tests
pythonWithTestDeps = python.withPackages (_ps: testDependencies);
in
buildPythonApplication rec {
name = "clan-vm-manager";
src = source;
format = "pyproject";
makeWrapperArgs = [
"--set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf"
# This prevents problems with mixed glibc versions that might occur when the
# cli is called through a browser built against another glibc
"--unset LD_LIBRARY_PATH"
];
# Deps needed only at build time
nativeBuildInputs = [
setuptools
copyDesktopItems
wrapGAppsHook
gobject-introspection
];
# The necessity of setting buildInputs and propagatedBuildInputs to the
# same values for your Python package within Nix largely stems from ensuring
# that all necessary dependencies are consistently available both
# at build time and runtime,
buildInputs = allPythonDeps ++ runtimeDependencies;
propagatedBuildInputs = allPythonDeps ++ runtimeDependencies ++ [ ];
# also re-expose dependencies so we test them in CI
passthru = {
tests = {
clan-vm-manager-pytest =
runCommand "clan-vm-manager-pytest" { inherit buildInputs propagatedBuildInputs nativeBuildInputs; }
''
cp -r ${source} ./src
chmod +w -R ./src
cd ./src
export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf
export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts
mkdir -p .home/.local/share/fonts
export HOME=.home
fc-cache --verbose
# > fc-cache succeeded
echo "Loaded the following fonts ..."
fc-list
echo "STARTING ..."
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests
touch $out
'';
};
};
# Additional pass-through attributes
passthru.desktop-file = desktop-file;
passthru.externalPythonDeps = externalPythonDeps;
passthru.externalTestDeps = externalTestDeps;
passthru.runtimeDependencies = runtimeDependencies;
passthru.testDependencies = testDependencies;
postInstall = ''
mkdir -p $out/share/icons/hicolor
cp -r ./clan_vm_manager/assets/white-favicons/* $out/share/icons/hicolor
'';
# Don't leak python packages into a devshell.
# It can be very confusing if you `nix run` than load the cli from the devshell instead.
postFixup = ''
rm $out/nix-support/propagated-build-inputs
'';
checkPhase = ''
export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf
export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts
mkdir -p .home/.local/share/fonts
export HOME=.home
fc-cache --verbose
# > fc-cache succeeded
echo "Loaded the following fonts ..."
fc-list
PYTHONPATH= $out/bin/clan-vm-manager --help
'';
desktopItems = [ desktop-file ];
}

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env bash
set -e -o pipefail
check_git_tag() {
local repo_path="$1"
local target_tag="$2"
# Change directory to the specified Git repository
pushd "$repo_path" > /dev/null 2>&1
# shellcheck disable=SC2181
if [ $? -ne 0 ]; then
echo "Error: Failed to change directory to $repo_path"
return 1
fi
# Get the current Git tag
local current_tag
current_tag=$(git describe --tags --exact-match 2>/dev/null)
# Restore the original directory
popd > /dev/null 2>&1
# Check if the current tag is 2.0
if [ "$current_tag" = "$target_tag" ]; then
echo "Current Git tag in $repo_path is $target_tag"
else
echo "Error: Current Git tag in $repo_path is not $target_tag"
exit 1
fi
}
if [ -z "$1" ]; then
echo "Usage: $0 <democlan>"
exit 1
fi
democlan="$1"
check_git_tag "$democlan" "v2.2"
check_git_tag "." "demo-v2.3"
rm -rf ~/.config/clan
clan history add "clan://$democlan#localsend-wayland1"
clear
cat << EOF
Open up this link in a browser:
"clan://$democlan#localsend-wayland2"
EOF

View File

@@ -1,23 +0,0 @@
{ ... }:
{
perSystem =
{
config,
pkgs,
lib,
system,
...
}:
{
devShells.clan-vm-manager = pkgs.callPackage ./shell.nix {
inherit (config.packages) clan-vm-manager;
};
}
// lib.optionalAttrs (system != lib.platforms.darwin) {
packages.clan-vm-manager = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (config.packages) clan-cli;
};
checks = config.packages.clan-vm-manager.tests;
};
}

View File

@@ -1,22 +0,0 @@
#!/usr/bin/env bash
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
ALREADY_INSTALLED=$(nix profile list --json | jq 'has("elements") and (.elements | has("clan-vm-manager"))')
if [ "$ALREADY_INSTALLED" = "true" ]; then
echo "Upgrading installed clan-vm-manager"
nix profile upgrade clan-vm-manager
else
nix profile install .#clan-vm-manager --priority 4
fi
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=org.clan.vm-manager.desktop
xdg-mime default "$DESKTOP_FILE_NAME" x-scheme-handler/clan

View File

@@ -1,7 +0,0 @@
# Webkit GTK doesn't interop flawless with Solid.js build result
1. Webkit expects script tag to be in `body` only solid.js puts the in the head.
2. script and css files are loaded with type="module" and crossorigin tags being set. WebKit silently fails to load then.
3. Paths to resiources are not allowed to start with "/" because webkit interprets them relative to the system and not the base url.
4. webkit doesn't support native features such as directly handling external urls (i.e opening them in the default browser)
6. Other problems to be found?

View File

@@ -1,41 +0,0 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "clan-vm-manager"
description = "clan vm manager"
dynamic = ["version"]
scripts = { clan-vm-manager = "clan_vm_manager:main", clan-vm-manager-history = "clan_vm_manager.history:main" }
[project.urls]
Homepage = "https://clan.lol/"
Documentation = "https://docs.clan.lol/"
Repository = "https://git.clan.lol/clan/clan-core"
[tool.setuptools.packages.find]
exclude = ["result"]
[tool.setuptools.package-data]
clan_vm_manager = ["**/assets/*"]
[tool.pytest.ini_options]
testpaths = "tests"
faulthandler_timeout = 60
log_level = "DEBUG"
log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s"
addopts = "--durations 5 --color=yes --new-first" # Add --pdb for debugging
norecursedirs = "tests/helpers"
markers = ["impure"]
[tool.mypy]
python_version = "3.13"
warn_redundant_casts = true
disallow_untyped_calls = true
disallow_untyped_defs = true
no_implicit_optional = true
[[tool.mypy.overrides]]
module = "argcomplete.*"
ignore_missing_imports = true

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

View File

@@ -1,53 +0,0 @@
{
lib,
stdenv,
clan-vm-manager,
mkShell,
ruff,
desktop-file-utils,
xdg-utils,
mypy,
python3,
gtk4,
libadwaita,
}:
let
devshellTestDeps =
clan-vm-manager.externalTestDeps
++ (with python3.pkgs; [
rope
mypy
setuptools
wheel
pip
]);
in
mkShell {
inherit (clan-vm-manager) nativeBuildInputs;
name = "clan-vm-manager";
buildInputs = [
ruff
gtk4.dev # has the demo called 'gtk4-widget-factory'
libadwaita.devdoc # has the demo called 'adwaita-1-demo'
]
++ devshellTestDeps
# Dependencies for testing for linux hosts
++ (lib.optionals stdenv.isLinux [
xdg-utils # install desktop files
desktop-file-utils # verify desktop files
]);
shellHook = ''
export GIT_ROOT=$(git rev-parse --show-toplevel)
export PKG_ROOT=$GIT_ROOT/pkgs/clan-vm-manager
# Add clan-vm-manager command to PATH
export PATH="$PKG_ROOT/bin":"$PATH"
# Add clan-cli to the python path so that we can import it without building it in nix first
export PYTHONPATH="$GIT_ROOT/pkgs/clan-cli":"$PYTHONPATH"
'';
}

View File

@@ -1,65 +0,0 @@
import contextlib
import os
import signal
import subprocess
from collections.abc import Iterator
from pathlib import Path
from typing import IO, Any
import pytest
_FILE = None | int | IO[Any]
class Command:
def __init__(self) -> None:
self.processes: list[subprocess.Popen[str]] = []
def run(
self,
command: list[str],
extra_env: dict[str, str] | None = None,
stdin: _FILE = None,
stdout: _FILE = None,
stderr: _FILE = None,
workdir: Path | None = None,
) -> subprocess.Popen[str]:
if extra_env is None:
extra_env = {}
env = os.environ.copy()
env.update(extra_env)
# We start a new session here so that we can than more reliably kill all children as well
p = subprocess.Popen(
command,
env=env,
start_new_session=True,
stdout=stdout,
stderr=stderr,
stdin=stdin,
text=True,
cwd=workdir,
)
self.processes.append(p)
return p
def terminate(self) -> None:
# Stop in reverse order in case there are dependencies.
# We just kill all processes as quickly as possible because we don't
# care about corrupted state and want to make tests fasts.
for p in reversed(self.processes):
with contextlib.suppress(OSError):
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
@pytest.fixture
def command() -> Iterator[Command]:
"""
Starts a background command. The process is automatically terminated in the end.
>>> p = command.run(["some", "daemon"])
>>> print(p.pid)
"""
c = Command()
try:
yield c
finally:
c.terminate()

View File

@@ -1,46 +0,0 @@
import logging
import subprocess
import sys
from pathlib import Path
import pytest
from clan_lib.custom_logger import setup_logging
from clan_lib.nix import nix_shell
sys.path.append(str(Path(__file__).parent / "helpers"))
sys.path.append(
str(Path(__file__).parent.parent)
) # Also add clan vm manager to PYTHONPATH
pytest_plugins = [
"temporary_dir",
"root",
"command",
"wayland",
"stdout",
]
# Executed on pytest session start
def pytest_sessionstart(session: pytest.Session) -> None:
# This function will be called once at the beginning of the test session
print("Starting pytest session")
# You can access the session config, items, testsfailed, etc.
print(f"Session config: {session.config}")
setup_logging(logging.DEBUG)
# fixture for git_repo
@pytest.fixture
def git_repo(tmp_path: Path) -> Path:
# initialize a git repository
cmd = nix_shell(["nixpkgs#git"], ["git", "init"])
subprocess.run(cmd, cwd=tmp_path, check=True)
# set user.name and user.email
cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "test"])
subprocess.run(cmd, cwd=tmp_path, check=True)
cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.email", "test@test.test"])
subprocess.run(cmd, cwd=tmp_path, check=True)
# return the path to the git repository
return tmp_path

View File

@@ -1,25 +0,0 @@
import logging
import os
import shlex
from clan_lib.custom_logger import get_callers
from clan_vm_manager import main
log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i + 1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
class Cli:
def run(self, args: list[str]) -> None:
cmd = shlex.join(["clan", *args])
print_trace(f"$ {cmd}")
main(args)

View File

@@ -1,35 +0,0 @@
import os
from pathlib import Path
import pytest
TEST_ROOT = Path(__file__).parent.resolve()
PROJECT_ROOT = TEST_ROOT.parent
if CLAN_CORE_ := os.environ.get("CLAN_CORE_PATH"):
CLAN_CORE = Path(CLAN_CORE_)
else:
CLAN_CORE = PROJECT_ROOT.parent.parent
@pytest.fixture(scope="session")
def project_root() -> Path:
"""
Root directory the clan-cli
"""
return PROJECT_ROOT
@pytest.fixture(scope="session")
def test_root() -> Path:
"""
Root directory of the tests
"""
return TEST_ROOT
@pytest.fixture(scope="session")
def clan_core() -> Path:
"""
Directory of the clan-core flake
"""
return CLAN_CORE

View File

@@ -1,34 +0,0 @@
import types
import pytest
class CaptureOutput:
def __init__(self, capsys: pytest.CaptureFixture) -> None:
self.capsys = capsys
self.capsys_disabled = capsys.disabled()
self.capsys_disabled.__enter__()
def __enter__(self) -> "CaptureOutput":
self.capsys_disabled.__exit__(None, None, None)
self.capsys.readouterr()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
res = self.capsys.readouterr()
self.out = res.out
self.err = res.err
# Disable capsys again
self.capsys_disabled = self.capsys.disabled()
self.capsys_disabled.__enter__()
@pytest.fixture
def capture_output(capsys: pytest.CaptureFixture) -> CaptureOutput:
return CaptureOutput(capsys)

View File

@@ -1,27 +0,0 @@
import logging
import os
import tempfile
from collections.abc import Iterator
from pathlib import Path
import pytest
log = logging.getLogger(__name__)
@pytest.fixture
def temporary_home(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
env_dir = os.getenv("TEST_TEMPORARY_DIR")
if env_dir is not None:
path = Path(env_dir).resolve()
log.debug("Temp HOME directory: %s", str(path))
monkeypatch.setenv("HOME", str(path))
monkeypatch.chdir(str(path))
yield path
else:
with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath:
monkeypatch.setenv("HOME", str(dirpath))
monkeypatch.setenv("XDG_CONFIG_HOME", str(Path(dirpath) / ".config"))
monkeypatch.chdir(str(dirpath))
log.debug("Temp HOME directory: %s", str(dirpath))
yield Path(dirpath)

View File

@@ -1,99 +0,0 @@
from pathlib import Path
import pytest
from clan_cli.tests.fixtures_flakes import ClanFlake
from clan_lib.flake import Flake
from clan_vm_manager.clan_uri import ClanURI
def test_get_url() -> None:
# Create a ClanURI object from a remote URI with parameters
uri = ClanURI.from_str("clan://https://example.com?password=1234#myVM")
assert uri.get_url() == "https://example.com?password=1234"
uri = ClanURI.from_str("clan://~/Downloads")
assert uri.get_url().endswith("/Downloads")
uri = ClanURI.from_str("clan:///home/user/Downloads")
assert uri.get_url() == "/home/user/Downloads"
uri = ClanURI.from_str("clan://file:///home/user/Downloads")
assert uri.get_url() == "file:///home/user/Downloads"
@pytest.mark.impure
def test_is_local(flake: ClanFlake) -> None:
uri = ClanURI.from_str(f"clan://git+file://{flake.path}")
assert uri.get_url() == str(flake.path)
assert uri.flake.is_local
myflake = Flake(f"git+file://{flake.path}")
assert myflake.is_local
def test_firefox_strip_uri() -> None:
uri = ClanURI.from_str("clan://git+https//git.clan.lol/clan/democlan.git")
assert uri.get_url() == "git+https://git.clan.lol/clan/democlan.git"
def test_local_uri(temp_dir: Path) -> None:
flake_nix = temp_dir / "flake.nix"
flake_nix.write_text("outputs = _: {}")
# Create a ClanURI object from a local URI
uri = ClanURI.from_str(f"clan://file://{temp_dir}")
assert uri.flake.path == temp_dir
def test_is_remote() -> None:
# Create a ClanURI object from a remote URI
uri = ClanURI.from_str("clan://https://example.com")
assert uri.flake.identifier == "https://example.com"
def test_direct_local_path() -> None:
# Create a ClanURI object from a remote URI
uri = ClanURI.from_str("clan://~/Downloads")
assert uri.get_url().endswith("/Downloads")
def test_direct_local_path2() -> None:
# Create a ClanURI object from a remote URI
uri = ClanURI.from_str("clan:///home/user/Downloads")
assert uri.get_url() == "/home/user/Downloads"
def test_remote_with_clanparams() -> None:
# Create a ClanURI object from a remote URI with parameters
uri = ClanURI.from_str("clan://https://example.com")
assert uri.machine_name == "defaultVM"
assert uri.flake.identifier == "https://example.com"
def test_from_str_remote() -> None:
uri = ClanURI.from_str(url="https://example.com", machine_name="myVM")
assert uri.get_url() == "https://example.com"
assert uri.machine_name == "myVM"
assert uri.flake.identifier == "https://example.com"
def test_from_str_local(temp_dir: Path) -> None:
flake_nix = temp_dir / "flake.nix"
flake_nix.write_text("outputs = _: {}")
uri = ClanURI.from_str(url=str(temp_dir), machine_name="myVM")
assert uri.get_url().endswith(str(temp_dir))
assert uri.machine_name == "myVM"
assert uri.flake.is_local
assert str(uri.flake).endswith(str(temp_dir))
def test_from_str_local_no_machine(temp_dir: Path) -> None:
flake_nix = temp_dir / "flake.nix"
flake_nix.write_text("outputs = _: {}")
uri = ClanURI.from_str(str(temp_dir))
assert uri.get_url().endswith(str(temp_dir))
assert uri.machine_name == "defaultVM"
assert uri.flake.is_local
assert str(uri.flake).endswith(str(temp_dir))

View File

@@ -1,8 +0,0 @@
import pytest
from cli import Cli
def test_help(capfd: pytest.CaptureFixture) -> None:
cli = Cli()
with pytest.raises(SystemExit):
cli.run(["clan-vm-manager", "--help"])

View File

@@ -1,8 +0,0 @@
import time
from wayland import GtkProc
def test_open(app: GtkProc) -> None:
time.sleep(0.5)
assert app.poll() is None

View File

@@ -1,27 +0,0 @@
import sys
from collections.abc import Generator
from subprocess import Popen
from typing import NewType
import pytest
@pytest.fixture(scope="session")
def wayland_compositor() -> Generator[Popen]:
# Start the Wayland compositor (e.g., Weston)
# compositor = Popen(["weston", "--backend=headless-backend.so"])
compositor = Popen(["weston"])
yield compositor
# Cleanup: Terminate the compositor
compositor.terminate()
GtkProc = NewType("GtkProc", Popen)
@pytest.fixture
def app() -> Generator[GtkProc]:
rapp = Popen([sys.executable, "-m", "clan_vm_manager"], text=True)
yield GtkProc(rapp)
# Cleanup: Terminate your application
rapp.terminate()

View File

@@ -3,7 +3,6 @@
{
imports = [
./clan-cli/flake-module.nix
./clan-vm-manager/flake-module.nix
./installer/flake-module.nix
./icon-update/flake-module.nix
./generate-test-vars/flake-module.nix