Merge pull request 'clan-cli: cmd.run now has its options extracted to a dataclass' (#2515) from Qubasa/clan-core:Qubasa-main into main
This commit is contained in:
@@ -5,7 +5,7 @@ from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_shell, run_no_stdout
|
||||
from clan_cli.nix import nix_shell, run_no_output
|
||||
|
||||
from . import API
|
||||
|
||||
@@ -154,7 +154,7 @@ def show_block_devices(options: BlockDeviceOptions) -> Blockdevices:
|
||||
"PATH,NAME,RM,SIZE,RO,MOUNTPOINTS,TYPE,ID-LINK",
|
||||
],
|
||||
)
|
||||
proc = run_no_stdout(cmd, needs_user_terminal=True)
|
||||
proc = run_no_output(cmd, needs_user_terminal=True)
|
||||
res = proc.stdout.strip()
|
||||
|
||||
blk_info: dict[str, Any] = json.loads(res)
|
||||
|
||||
@@ -2,7 +2,7 @@ import argparse
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
from . import API
|
||||
@@ -100,7 +100,7 @@ def show_mdns() -> DNSInfo:
|
||||
"--terminate",
|
||||
],
|
||||
)
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
data = parse_avahi_output(proc.stdout)
|
||||
|
||||
return data
|
||||
|
||||
@@ -5,7 +5,7 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, TypedDict, get_args, get_type_hints
|
||||
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
from clan_cli.inventory import Inventory, load_inventory_json, set_inventory
|
||||
from clan_cli.inventory.classes import Service
|
||||
@@ -150,7 +150,7 @@ def get_modules(base_path: str) -> dict[str, str]:
|
||||
]
|
||||
)
|
||||
try:
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = proc.stdout.strip()
|
||||
except ClanCmdError as e:
|
||||
msg = "clanInternals might not have inventory.modules attributes"
|
||||
@@ -171,7 +171,7 @@ def get_module_interface(base_path: str, module_name: str) -> dict[str, Any]:
|
||||
"""
|
||||
cmd = nix_eval([f"{base_path}#clanInternals.moduleSchemas.{module_name}", "--json"])
|
||||
try:
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = proc.stdout.strip()
|
||||
except ClanCmdError as e:
|
||||
msg = "clanInternals might not have moduleSchemas attributes"
|
||||
|
||||
@@ -5,7 +5,7 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import CmdOut, run
|
||||
from clan_cli.cmd import CmdOut, RunOpts, run
|
||||
from clan_cli.dirs import TemplateType, clan_templates
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.inventory import Inventory, init_inventory
|
||||
@@ -61,10 +61,10 @@ def create_clan(options: CreateOptions) -> CreateClanResponse:
|
||||
template_url,
|
||||
]
|
||||
)
|
||||
flake_init = run(command, cwd=directory)
|
||||
flake_init = run(command, RunOpts(cwd=directory))
|
||||
|
||||
flake_update = run(
|
||||
nix_shell(["nixpkgs#nix"], ["nix", "flake", "update"]), cwd=directory
|
||||
nix_shell(["nixpkgs#nix"], ["nix", "flake", "update"]), RunOpts(cwd=directory)
|
||||
)
|
||||
|
||||
if options.initial:
|
||||
@@ -81,14 +81,18 @@ def create_clan(options: CreateOptions) -> CreateClanResponse:
|
||||
response.git_add = run(git_command(directory, "add", "."))
|
||||
|
||||
# check if username is set
|
||||
has_username = run(git_command(directory, "config", "user.name"), check=False)
|
||||
has_username = run(
|
||||
git_command(directory, "config", "user.name"), RunOpts(check=False)
|
||||
)
|
||||
response.git_config_username = None
|
||||
if has_username.returncode != 0:
|
||||
response.git_config_username = run(
|
||||
git_command(directory, "config", "user.name", "clan-tool")
|
||||
)
|
||||
|
||||
has_username = run(git_command(directory, "config", "user.email"), check=False)
|
||||
has_username = run(
|
||||
git_command(directory, "config", "user.email"), RunOpts(check=False)
|
||||
)
|
||||
if has_username.returncode != 0:
|
||||
response.git_config_email = run(
|
||||
git_command(directory, "config", "user.email", "clan@example.com")
|
||||
|
||||
@@ -5,7 +5,7 @@ from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
from clan_cli.inventory import Meta
|
||||
from clan_cli.nix import nix_eval
|
||||
@@ -24,7 +24,7 @@ def show_clan_meta(uri: str | Path) -> Meta:
|
||||
res = "{}"
|
||||
|
||||
try:
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = proc.stdout.strip()
|
||||
except ClanCmdError as e:
|
||||
msg = "Evaluation failed on meta attribute"
|
||||
|
||||
@@ -241,57 +241,65 @@ if os.environ.get("CLAN_CLI_PERF"):
|
||||
TIME_TABLE = TimeTable()
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunOpts:
|
||||
input: bytes | None = None
|
||||
stdout: IO[bytes] | None = None
|
||||
stderr: IO[bytes] | None = None
|
||||
env: dict[str, str] | None = None
|
||||
cwd: Path | None = None
|
||||
log: Log = Log.STDERR
|
||||
prefix: str | None = None
|
||||
msg_color: MsgColor | None = None
|
||||
check: bool = True
|
||||
error_msg: str | None = None
|
||||
needs_user_terminal: bool = False
|
||||
timeout: float = math.inf
|
||||
shell: bool = False
|
||||
|
||||
|
||||
def run(
|
||||
cmd: list[str],
|
||||
*,
|
||||
input: bytes | None = None, # noqa: A002
|
||||
stdout: IO[bytes] | None = None,
|
||||
stderr: IO[bytes] | None = None,
|
||||
env: dict[str, str] | None = None,
|
||||
cwd: Path | None = None,
|
||||
log: Log = Log.STDERR,
|
||||
prefix: str | None = None,
|
||||
msg_color: MsgColor | None = None,
|
||||
check: bool = True,
|
||||
error_msg: str | None = None,
|
||||
needs_user_terminal: bool = False,
|
||||
timeout: float = math.inf,
|
||||
shell: bool = False,
|
||||
options: RunOpts | None = None,
|
||||
) -> CmdOut:
|
||||
if cwd is None:
|
||||
cwd = Path.cwd()
|
||||
if options is None:
|
||||
options = RunOpts()
|
||||
if options.cwd is None:
|
||||
options.cwd = Path.cwd()
|
||||
|
||||
if prefix is None:
|
||||
prefix = "$"
|
||||
if options.prefix is None:
|
||||
options.prefix = "$"
|
||||
|
||||
if input:
|
||||
if any(not ch.isprintable() for ch in input.decode("ascii", "replace")):
|
||||
if options.input:
|
||||
if any(not ch.isprintable() for ch in options.input.decode("ascii", "replace")):
|
||||
filtered_input = "<<binary_blob>>"
|
||||
else:
|
||||
filtered_input = input.decode("ascii", "replace")
|
||||
filtered_input = options.input.decode("ascii", "replace")
|
||||
print_trace(
|
||||
f"$: echo '{filtered_input}' | {indent_command(cmd)}", cmdlog, prefix
|
||||
f"$: echo '{filtered_input}' | {indent_command(cmd)}",
|
||||
cmdlog,
|
||||
options.prefix,
|
||||
)
|
||||
elif cmdlog.isEnabledFor(logging.DEBUG):
|
||||
print_trace(f"$: {indent_command(cmd)}", cmdlog, prefix)
|
||||
print_trace(f"$: {indent_command(cmd)}", cmdlog, options.prefix)
|
||||
|
||||
start = timeit.default_timer()
|
||||
with ExitStack() as stack:
|
||||
stdin = subprocess.PIPE if input is not None else None
|
||||
stdin = subprocess.PIPE if options.input is not None else None
|
||||
process = stack.enter_context(
|
||||
subprocess.Popen(
|
||||
cmd,
|
||||
cwd=str(cwd),
|
||||
env=env,
|
||||
cwd=str(options.cwd),
|
||||
env=options.env,
|
||||
stdin=stdin,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
start_new_session=not needs_user_terminal,
|
||||
shell=shell,
|
||||
start_new_session=not options.needs_user_terminal,
|
||||
shell=options.shell,
|
||||
)
|
||||
)
|
||||
|
||||
if needs_user_terminal:
|
||||
if options.needs_user_terminal:
|
||||
# we didn't allocate a new session, so we can't terminate the process group
|
||||
stack.enter_context(terminate_process(process))
|
||||
else:
|
||||
@@ -299,13 +307,13 @@ def run(
|
||||
|
||||
stdout_buf, stderr_buf = handle_io(
|
||||
process,
|
||||
log,
|
||||
prefix=prefix,
|
||||
msg_color=msg_color,
|
||||
timeout=timeout,
|
||||
input_bytes=input,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
options.log,
|
||||
prefix=options.prefix,
|
||||
msg_color=options.msg_color,
|
||||
timeout=options.timeout,
|
||||
input_bytes=options.input,
|
||||
stdout=options.stdout,
|
||||
stderr=options.stderr,
|
||||
)
|
||||
process.wait()
|
||||
|
||||
@@ -317,20 +325,20 @@ def run(
|
||||
cmd_out = CmdOut(
|
||||
stdout=stdout_buf,
|
||||
stderr=stderr_buf,
|
||||
cwd=cwd,
|
||||
env=env,
|
||||
cwd=options.cwd,
|
||||
env=options.env,
|
||||
command_list=cmd,
|
||||
returncode=process.returncode,
|
||||
msg=error_msg,
|
||||
msg=options.error_msg,
|
||||
)
|
||||
|
||||
if check and process.returncode != 0:
|
||||
if options.check and process.returncode != 0:
|
||||
raise ClanCmdError(cmd_out)
|
||||
|
||||
return cmd_out
|
||||
|
||||
|
||||
def run_no_stdout(
|
||||
def run_no_output(
|
||||
cmd: list[str],
|
||||
*,
|
||||
env: dict[str, str] | None = None,
|
||||
@@ -344,21 +352,23 @@ def run_no_stdout(
|
||||
shell: bool = False,
|
||||
) -> CmdOut:
|
||||
"""
|
||||
Like run, but automatically suppresses stdout, if not in DEBUG log level.
|
||||
Like run, but automatically suppresses all output, if not in DEBUG log level.
|
||||
If in DEBUG log level the stdout of commands will be shown.
|
||||
"""
|
||||
if cwd is None:
|
||||
cwd = Path.cwd()
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
return run(cmd, env=env, log=log, check=check, error_msg=error_msg)
|
||||
return run(cmd, RunOpts(env=env, log=log, check=check, error_msg=error_msg))
|
||||
log = Log.NONE
|
||||
return run(
|
||||
cmd,
|
||||
env=env,
|
||||
log=log,
|
||||
check=check,
|
||||
prefix=prefix,
|
||||
error_msg=error_msg,
|
||||
needs_user_terminal=needs_user_terminal,
|
||||
shell=shell,
|
||||
RunOpts(
|
||||
env=env,
|
||||
log=log,
|
||||
check=check,
|
||||
prefix=prefix,
|
||||
error_msg=error_msg,
|
||||
needs_user_terminal=needs_user_terminal,
|
||||
shell=shell,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -8,7 +8,7 @@ from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.cmd import RunOpts, run
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
@@ -103,7 +103,7 @@ def generate_service_facts(
|
||||
cmd = ["bash", "-c", generator]
|
||||
run(
|
||||
cmd,
|
||||
env=env,
|
||||
RunOpts(env=env),
|
||||
)
|
||||
files_to_commit = []
|
||||
# store secrets
|
||||
|
||||
@@ -4,7 +4,7 @@ from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -30,11 +30,11 @@ def pause_automounting(devices: list[Path]) -> Generator[None, None, None]:
|
||||
|
||||
str_devs = [str(dev) for dev in devices]
|
||||
cmd = ["sudo", str(inhibit_path), "enable", *str_devs]
|
||||
result = run(cmd, log=Log.BOTH, check=False, needs_user_terminal=True)
|
||||
result = run(cmd, RunOpts(log=Log.BOTH, check=False, needs_user_terminal=True))
|
||||
if result.returncode != 0:
|
||||
log.error("Failed to inhibit automounting")
|
||||
yield None
|
||||
cmd = ["sudo", str(inhibit_path), "disable", *str_devs]
|
||||
result = run(cmd, log=Log.BOTH, check=False)
|
||||
result = run(cmd, RunOpts(log=Log.BOTH, check=False))
|
||||
if result.returncode != 0:
|
||||
log.error("Failed to re-enable automounting")
|
||||
|
||||
@@ -9,7 +9,7 @@ from tempfile import TemporaryDirectory
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.facts.generate import generate_facts
|
||||
from clan_cli.facts.secret_modules import SecretStoreBase
|
||||
@@ -156,7 +156,9 @@ def flash_machine(
|
||||
)
|
||||
run(
|
||||
cmd,
|
||||
log=Log.BOTH,
|
||||
error_msg=f"Failed to flash {machine}",
|
||||
needs_user_terminal=True,
|
||||
RunOpts(
|
||||
log=Log.BOTH,
|
||||
error_msg=f"Failed to flash {machine}",
|
||||
needs_user_terminal=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -4,7 +4,7 @@ import os
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_build
|
||||
|
||||
@@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
|
||||
@API.register
|
||||
def list_possible_languages() -> list[str]:
|
||||
cmd = nix_build(["nixpkgs#glibcLocales"])
|
||||
result = run(cmd, log=Log.STDERR, error_msg="Failed to find glibc locales")
|
||||
result = run(cmd, RunOpts(log=Log.STDERR, error_msg="Failed to find glibc locales"))
|
||||
locale_file = Path(result.stdout.strip()) / "share" / "i18n" / "SUPPORTED"
|
||||
|
||||
if not locale_file.exists():
|
||||
@@ -40,7 +40,7 @@ def list_possible_languages() -> list[str]:
|
||||
@API.register
|
||||
def list_possible_keymaps() -> list[str]:
|
||||
cmd = nix_build(["nixpkgs#kbd"])
|
||||
result = run(cmd, log=Log.STDERR, error_msg="Failed to find kbdinfo")
|
||||
result = run(cmd, RunOpts(log=Log.STDERR, error_msg="Failed to find kbdinfo"))
|
||||
keymaps_dir = Path(result.stdout.strip()) / "share" / "keymaps"
|
||||
|
||||
if not keymaps_dir.exists():
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from pathlib import Path
|
||||
|
||||
from .cmd import Log, run
|
||||
from .cmd import Log, RunOpts, run
|
||||
from .errors import ClanError
|
||||
from .locked_open import locked_open
|
||||
from .nix import run_cmd
|
||||
@@ -67,8 +67,10 @@ def _commit_file_to_git(
|
||||
|
||||
run(
|
||||
cmd,
|
||||
log=Log.BOTH,
|
||||
error_msg=f"Failed to add {file_path} file to git index",
|
||||
RunOpts(
|
||||
log=Log.BOTH,
|
||||
error_msg=f"Failed to add {file_path} file to git index",
|
||||
),
|
||||
)
|
||||
|
||||
# check if there is a diff
|
||||
@@ -77,7 +79,7 @@ def _commit_file_to_git(
|
||||
["git", "-C", str(repo_dir), "diff", "--cached", "--exit-code", "--"]
|
||||
+ [str(file_path) for file_path in file_paths],
|
||||
)
|
||||
result = run(cmd, check=False, cwd=repo_dir)
|
||||
result = run(cmd, RunOpts(check=False, cwd=repo_dir))
|
||||
# if there is no diff, return
|
||||
if result.returncode == 0:
|
||||
return
|
||||
@@ -98,5 +100,8 @@ def _commit_file_to_git(
|
||||
)
|
||||
|
||||
run(
|
||||
cmd, error_msg=f"Failed to commit {file_paths} to git repository {repo_dir}"
|
||||
cmd,
|
||||
RunOpts(
|
||||
error_msg=f"Failed to commit {file_paths} to git repository {repo_dir}"
|
||||
),
|
||||
)
|
||||
|
||||
@@ -18,7 +18,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.api import API, dataclass_to_dict, from_dict
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
from clan_cli.git import commit_file
|
||||
from clan_cli.nix import nix_eval
|
||||
@@ -78,7 +78,7 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
||||
]
|
||||
)
|
||||
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
|
||||
try:
|
||||
res = proc.stdout.strip()
|
||||
|
||||
@@ -9,7 +9,7 @@ from tempfile import TemporaryDirectory
|
||||
from clan_cli.api import API
|
||||
from clan_cli.clan.create import git_command
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.completions import add_dynamic_completer, complete_tags
|
||||
from clan_cli.dirs import TemplateType, clan_templates, get_clan_flake_toplevel_or_env
|
||||
from clan_cli.errors import ClanError
|
||||
@@ -103,7 +103,7 @@ def create_machine(opts: CreateOptions) -> None:
|
||||
# Check if debug logging is enabled
|
||||
is_debug_enabled = log.isEnabledFor(logging.DEBUG)
|
||||
log_flag = Log.BOTH if is_debug_enabled else Log.NONE
|
||||
run(command, log=log_flag, cwd=tmpdirp)
|
||||
run(command, RunOpts(log=log_flag, cwd=tmpdirp))
|
||||
|
||||
validate_directory(tmpdirp)
|
||||
|
||||
@@ -126,7 +126,7 @@ def create_machine(opts: CreateOptions) -> None:
|
||||
|
||||
shutil.copytree(src, dst, ignore_dangling_symlinks=True, copy_function=log_copy)
|
||||
|
||||
run(git_command(clan_dir, "add", f"machines/{machine_name}"), cwd=clan_dir)
|
||||
run(git_command(clan_dir, "add", f"machines/{machine_name}"), RunOpts(cwd=clan_dir))
|
||||
|
||||
inventory = load_inventory_json(clan_dir)
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from pathlib import Path
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.cmd import run, run_no_stdout
|
||||
from clan_cli.cmd import RunOpts, run, run_no_output
|
||||
from clan_cli.completions import add_dynamic_completer, complete_machines
|
||||
from clan_cli.dirs import specific_machine_dir
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
@@ -71,7 +71,7 @@ def show_machine_deployment_target(clan_dir: Path, machine_name: str) -> str | N
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = proc.stdout.strip()
|
||||
|
||||
target_host = json.loads(res)
|
||||
@@ -93,7 +93,7 @@ def show_machine_hardware_platform(clan_dir: Path, machine_name: str) -> str | N
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = proc.stdout.strip()
|
||||
|
||||
host_platform = json.loads(res)
|
||||
@@ -160,7 +160,7 @@ def generate_machine_hardware_info(opts: HardwareGenerateOptions) -> HardwareCon
|
||||
*config_command,
|
||||
],
|
||||
)
|
||||
out = run(cmd, needs_user_terminal=True)
|
||||
out = run(cmd, RunOpts(needs_user_terminal=True))
|
||||
if out.returncode != 0:
|
||||
log.error(out)
|
||||
msg = f"Failed to inspect {opts.machine}. Address: {opts.target_host}"
|
||||
|
||||
@@ -9,7 +9,7 @@ from tempfile import TemporaryDirectory
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
@@ -119,7 +119,7 @@ def install_machine(opts: InstallOptions) -> None:
|
||||
["nixpkgs#nixos-anywhere"],
|
||||
cmd,
|
||||
),
|
||||
log=Log.BOTH,
|
||||
RunOpts(log=Log.BOTH),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.completions import add_dynamic_completer, complete_tags
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
from clan_cli.inventory import Machine, load_inventory_eval, set_inventory
|
||||
@@ -69,7 +69,7 @@ def list_nixos_machines(flake_url: str | Path) -> list[str]:
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
|
||||
try:
|
||||
res = proc.stdout.strip()
|
||||
@@ -123,7 +123,7 @@ def check_machine_online(
|
||||
],
|
||||
)
|
||||
try:
|
||||
proc = run_no_stdout(cmd, needs_user_terminal=True)
|
||||
proc = run_no_output(cmd, needs_user_terminal=True)
|
||||
if proc.returncode != 0:
|
||||
return "Offline"
|
||||
except ClanCmdError:
|
||||
|
||||
@@ -9,7 +9,7 @@ from tempfile import NamedTemporaryFile
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.facts import public_modules as facts_public_modules
|
||||
from clan_cli.facts import secret_modules as facts_secret_modules
|
||||
@@ -70,7 +70,7 @@ class Machine:
|
||||
attr = f'(builtins.getFlake "{self.flake}").nixosConfigurations.{self.name}.pkgs.hostPlatform.system'
|
||||
output = self._eval_cache.get(attr)
|
||||
if output is None:
|
||||
output = run_no_stdout(
|
||||
output = run_no_output(
|
||||
nix_eval(["--impure", "--expr", attr])
|
||||
).stdout.strip()
|
||||
self._eval_cache[attr] = output
|
||||
@@ -243,7 +243,7 @@ class Machine:
|
||||
config_json.flush()
|
||||
|
||||
file_info = json.loads(
|
||||
run_no_stdout(
|
||||
run_no_output(
|
||||
nix_eval(
|
||||
[
|
||||
"--impure",
|
||||
@@ -288,9 +288,9 @@ class Machine:
|
||||
args += nix_options + self.nix_options
|
||||
|
||||
if method == "eval":
|
||||
output = run_no_stdout(nix_eval(args)).stdout.strip()
|
||||
output = run_no_output(nix_eval(args)).stdout.strip()
|
||||
return output
|
||||
return Path(run_no_stdout(nix_build(args)).stdout.strip())
|
||||
return Path(run_no_output(nix_build(args)).stdout.strip())
|
||||
|
||||
def eval_nix(
|
||||
self,
|
||||
|
||||
@@ -7,7 +7,7 @@ import sys
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.cmd import MsgColor, run
|
||||
from clan_cli.cmd import MsgColor, RunOpts, run
|
||||
from clan_cli.colors import AnsiColor
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
@@ -64,7 +64,10 @@ def upload_sources(machine: Machine) -> str:
|
||||
path,
|
||||
]
|
||||
)
|
||||
run(cmd, env=env, error_msg="failed to upload sources", prefix=machine.name)
|
||||
run(
|
||||
cmd,
|
||||
RunOpts(env=env, error_msg="failed to upload sources", prefix=machine.name),
|
||||
)
|
||||
return path
|
||||
|
||||
# Slow path: we need to upload all sources to the remote machine
|
||||
@@ -78,7 +81,7 @@ def upload_sources(machine: Machine) -> str:
|
||||
flake_url,
|
||||
]
|
||||
)
|
||||
proc = run(cmd, env=env, error_msg="failed to upload sources")
|
||||
proc = run(cmd, RunOpts(env=env, error_msg="failed to upload sources"))
|
||||
|
||||
try:
|
||||
return json.loads(proc.stdout)["path"]
|
||||
|
||||
@@ -4,7 +4,7 @@ import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.cmd import run, run_no_stdout
|
||||
from clan_cli.cmd import run, run_no_output
|
||||
from clan_cli.dirs import nixpkgs_flake, nixpkgs_source
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
@@ -63,7 +63,7 @@ def nix_add_to_gcroots(nix_path: Path, dest: Path) -> None:
|
||||
|
||||
def nix_config() -> dict[str, Any]:
|
||||
cmd = nix_command(["show-config", "--json"])
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
data = json.loads(proc.stdout)
|
||||
config = {}
|
||||
for key, value in data.items():
|
||||
|
||||
@@ -3,7 +3,7 @@ import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.cmd import RunOpts, run
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_groups,
|
||||
@@ -32,7 +32,7 @@ def import_sops(args: argparse.Namespace) -> None:
|
||||
cmd += ["--output-type", "json", "--decrypt", args.sops_file]
|
||||
cmd = nix_shell(["nixpkgs#sops"], cmd)
|
||||
|
||||
res = run(cmd, error_msg=f"Could not import sops file {file}")
|
||||
res = run(cmd, RunOpts(error_msg=f"Could not import sops file {file}"))
|
||||
secrets = json.loads(res.stdout)
|
||||
for k, v in secrets.items():
|
||||
k = args.prefix + k
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
import dataclasses
|
||||
import enum
|
||||
import functools
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Iterable, Sequence
|
||||
from contextlib import suppress
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO, Protocol
|
||||
from typing import IO
|
||||
|
||||
import clan_cli.cmd
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.dirs import user_config_dir
|
||||
from clan_cli.errors import ClanError, CmdOut
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
from .folders import sops_machines_folder, sops_users_folder
|
||||
@@ -163,10 +163,10 @@ class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
|
||||
return ExitStatus(code) if code in ExitStatus else None
|
||||
|
||||
|
||||
class Executer(Protocol):
|
||||
def __call__(
|
||||
self, cmd: list[str], *, env: dict[str, str] | None = None
|
||||
) -> CmdOut: ...
|
||||
# class Executer(Protocol):
|
||||
# def __call__(
|
||||
# self, cmd: list[str], *, env: dict[str, str] | None = None
|
||||
# ) -> CmdOut: ...
|
||||
|
||||
|
||||
class Operation(enum.StrEnum):
|
||||
@@ -176,11 +176,11 @@ class Operation(enum.StrEnum):
|
||||
UPDATE_KEYS = "updatekeys"
|
||||
|
||||
|
||||
def run(
|
||||
def sops_run(
|
||||
call: Operation,
|
||||
secret_path: Path,
|
||||
public_keys: Iterable[tuple[str, KeyType]],
|
||||
executer: Executer,
|
||||
run_opts: RunOpts | None = None,
|
||||
) -> tuple[int, str]:
|
||||
"""Call the sops binary for the given operation."""
|
||||
# louis(2024-11-19): I regrouped the call into the sops binary into this
|
||||
@@ -235,7 +235,12 @@ def run(
|
||||
sops_cmd.append(str(secret_path))
|
||||
|
||||
cmd = nix_shell(["nixpkgs#sops"], sops_cmd)
|
||||
p = executer(cmd, env=environ)
|
||||
opts = (
|
||||
dataclasses.replace(run_opts, env=environ)
|
||||
if run_opts
|
||||
else RunOpts(env=environ)
|
||||
)
|
||||
p = run(cmd, opts)
|
||||
return p.returncode, p.stdout
|
||||
|
||||
|
||||
@@ -254,7 +259,7 @@ def get_public_age_key(privkey: str) -> str:
|
||||
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
|
||||
try:
|
||||
proc = clan_cli.cmd.run(cmd)
|
||||
proc = run(cmd)
|
||||
res = proc.stdout.strip()
|
||||
pubkey = None
|
||||
private_key = None
|
||||
@@ -355,10 +360,13 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
|
||||
secret_path = secret_path / "secret"
|
||||
error_msg = f"Could not update keys for {secret_path}"
|
||||
executer = functools.partial(
|
||||
clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg
|
||||
|
||||
rc, _ = sops_run(
|
||||
Operation.UPDATE_KEYS,
|
||||
secret_path,
|
||||
keys,
|
||||
RunOpts(log=Log.BOTH, error_msg=error_msg),
|
||||
)
|
||||
rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer)
|
||||
was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED
|
||||
return [secret_path] if was_modified else []
|
||||
|
||||
@@ -372,20 +380,19 @@ def encrypt_file(
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not content:
|
||||
# Don't use our `run` here, because it breaks editor integration.
|
||||
# We never need this in our UI.
|
||||
def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut:
|
||||
return CmdOut(
|
||||
stdout="",
|
||||
stderr="",
|
||||
cwd=Path.cwd(),
|
||||
env=env,
|
||||
command_list=cmd,
|
||||
returncode=subprocess.run(cmd, env=env, check=False).returncode,
|
||||
msg=None,
|
||||
)
|
||||
|
||||
rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer)
|
||||
# Use direct stdout / stderr, as else it breaks editor integration.
|
||||
# We never need this in our UI. TUI only.
|
||||
rc, _ = sops_run(
|
||||
Operation.EDIT,
|
||||
secret_path,
|
||||
pubkeys,
|
||||
RunOpts(
|
||||
stdout=sys.stdout.buffer,
|
||||
stderr=sys.stderr.buffer,
|
||||
check=False,
|
||||
log=Log.NONE,
|
||||
),
|
||||
)
|
||||
status = ExitStatus.parse(rc)
|
||||
if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED:
|
||||
return
|
||||
@@ -418,8 +425,12 @@ def encrypt_file(
|
||||
else:
|
||||
msg = f"Invalid content type: {type(content)}"
|
||||
raise ClanError(msg)
|
||||
executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH)
|
||||
run(Operation.ENCRYPT, Path(source.name), pubkeys, executer)
|
||||
sops_run(
|
||||
Operation.ENCRYPT,
|
||||
Path(source.name),
|
||||
pubkeys,
|
||||
RunOpts(log=Log.BOTH),
|
||||
)
|
||||
# atomic copy of the encrypted file
|
||||
with NamedTemporaryFile(dir=folder, delete=False) as dest:
|
||||
shutil.copyfile(source.name, dest.name)
|
||||
@@ -432,10 +443,13 @@ def encrypt_file(
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
# decryption uses private keys from the environment or default paths:
|
||||
no_public_keys_needed: list[tuple[str, KeyType]] = []
|
||||
executer = functools.partial(
|
||||
clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}"
|
||||
|
||||
_, stdout = sops_run(
|
||||
Operation.DECRYPT,
|
||||
secret_path,
|
||||
no_public_keys_needed,
|
||||
RunOpts(error_msg=f"Could not decrypt {secret_path}"),
|
||||
)
|
||||
_, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer)
|
||||
return stdout
|
||||
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from pathlib import Path
|
||||
from shlex import quote
|
||||
from typing import IO, Any
|
||||
|
||||
from clan_cli.cmd import CmdOut, Log, MsgColor
|
||||
from clan_cli.cmd import CmdOut, Log, MsgColor, RunOpts
|
||||
from clan_cli.cmd import run as local_run
|
||||
from clan_cli.colors import AnsiColor
|
||||
from clan_cli.ssh.host_key import HostKeyCheck
|
||||
@@ -68,19 +68,21 @@ class Host:
|
||||
) -> CmdOut:
|
||||
res = local_run(
|
||||
cmd,
|
||||
shell=shell,
|
||||
stdout=stdout,
|
||||
prefix=self.command_prefix,
|
||||
timeout=timeout,
|
||||
stderr=stderr,
|
||||
input=input,
|
||||
env=env,
|
||||
cwd=cwd,
|
||||
log=log,
|
||||
check=check,
|
||||
error_msg=error_msg,
|
||||
msg_color=msg_color,
|
||||
needs_user_terminal=needs_user_terminal,
|
||||
RunOpts(
|
||||
shell=shell,
|
||||
stdout=stdout,
|
||||
prefix=self.command_prefix,
|
||||
timeout=timeout,
|
||||
stderr=stderr,
|
||||
input=input,
|
||||
env=env,
|
||||
cwd=cwd,
|
||||
log=log,
|
||||
check=check,
|
||||
error_msg=error_msg,
|
||||
msg_color=msg_color,
|
||||
needs_user_terminal=needs_user_terminal,
|
||||
),
|
||||
)
|
||||
return res
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import tarfile
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from clan_cli.cmd import Log
|
||||
from clan_cli.cmd import Log, RunOpts
|
||||
from clan_cli.cmd import run as run_local
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.ssh.host import Host
|
||||
@@ -77,8 +77,10 @@ def upload(
|
||||
with tar_path.open("rb") as f:
|
||||
run_local(
|
||||
cmd,
|
||||
input=f.read(),
|
||||
log=Log.BOTH,
|
||||
prefix=host.command_prefix,
|
||||
needs_user_terminal=True,
|
||||
RunOpts(
|
||||
input=f.read(),
|
||||
log=Log.BOTH,
|
||||
prefix=host.command_prefix,
|
||||
needs_user_terminal=True,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -3,7 +3,7 @@ import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
@@ -31,7 +31,7 @@ def list_state_folders(machine: str, service: None | str = None) -> None:
|
||||
res = "{}"
|
||||
|
||||
try:
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = proc.stdout.strip()
|
||||
except ClanCmdError as e:
|
||||
msg = "Clan might not have meta attributes"
|
||||
|
||||
@@ -2,7 +2,7 @@ import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.cmd import run_no_output
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
@@ -18,7 +18,7 @@ def list_tagged_machines(flake_url: str | Path) -> dict[str, Any]:
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
|
||||
try:
|
||||
res = proc.stdout.strip()
|
||||
|
||||
@@ -8,7 +8,7 @@ from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.cmd import RunOpts, run
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
@@ -192,10 +192,7 @@ def execute_generator(
|
||||
cmd = bubblewrap_cmd(generator.final_script, tmpdir)
|
||||
else:
|
||||
cmd = ["bash", "-c", generator.final_script]
|
||||
run(
|
||||
cmd,
|
||||
env=env,
|
||||
)
|
||||
run(cmd, RunOpts(env=env))
|
||||
files_to_commit = []
|
||||
# store secrets
|
||||
files = generator.files
|
||||
|
||||
@@ -6,7 +6,7 @@ from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import override
|
||||
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.vars.generate import Generator, Var
|
||||
@@ -50,8 +50,7 @@ class SecretStore(SecretStoreBase):
|
||||
str(self.entry_dir(generator, var.name)),
|
||||
],
|
||||
),
|
||||
input=value,
|
||||
check=True,
|
||||
RunOpts(input=value, check=True),
|
||||
)
|
||||
return None # we manage the files outside of the git repo
|
||||
|
||||
@@ -88,7 +87,7 @@ class SecretStore(SecretStoreBase):
|
||||
self.entry_prefix,
|
||||
],
|
||||
),
|
||||
check=False,
|
||||
RunOpts(check=False),
|
||||
)
|
||||
.stdout.strip()
|
||||
.encode()
|
||||
@@ -116,7 +115,7 @@ class SecretStore(SecretStoreBase):
|
||||
str(symlink),
|
||||
],
|
||||
),
|
||||
check=False,
|
||||
RunOpts(check=False),
|
||||
)
|
||||
.stdout.strip()
|
||||
.encode()
|
||||
|
||||
@@ -13,7 +13,7 @@ from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from clan_cli.cmd import CmdOut, Log, handle_io, run
|
||||
from clan_cli.cmd import CmdOut, Log, RunOpts, handle_io, run
|
||||
from clan_cli.completions import add_dynamic_completer, complete_machines
|
||||
from clan_cli.dirs import module_root, user_cache_dir, vm_state_dir
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
@@ -109,8 +109,7 @@ def prepare_disk(
|
||||
)
|
||||
run(
|
||||
cmd,
|
||||
log=Log.BOTH,
|
||||
error_msg=f"Could not create disk image at {disk_img}",
|
||||
RunOpts(log=Log.BOTH, error_msg=f"Could not create disk image at {disk_img}"),
|
||||
)
|
||||
|
||||
return disk_img
|
||||
|
||||
@@ -119,6 +119,7 @@ def test_all_dataclasses() -> None:
|
||||
# - API includes Type Generic wrappers, that are not known in the init file.
|
||||
excludes = [
|
||||
"api/__init__.py",
|
||||
"cmd.py", # We don't want the UI to have access to the cmd module anyway
|
||||
]
|
||||
|
||||
cli_path = Path("clan_cli").resolve()
|
||||
|
||||
@@ -12,7 +12,7 @@ from clan_cli.inventory import (
|
||||
set_inventory,
|
||||
)
|
||||
from clan_cli.machines.create import CreateOptions, create_machine
|
||||
from clan_cli.nix import nix_eval, run_no_stdout
|
||||
from clan_cli.nix import nix_eval, run_no_output
|
||||
from fixtures_flakes import FlakeForTest
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -97,7 +97,7 @@ def test_add_module_to_inventory(
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
proc = run_no_stdout(cmd)
|
||||
proc = run_no_output(cmd)
|
||||
res = json.loads(proc.stdout.strip())
|
||||
|
||||
assert res["machine1"]["authorizedKeys"] == [ssh_key]
|
||||
|
||||
@@ -94,7 +94,7 @@ def test_vm_deployment(
|
||||
).stdout.strip()
|
||||
assert "no-such-path" not in shared_secret_path
|
||||
# run nix flake lock
|
||||
cmd.run(["nix", "flake", "lock"], cwd=flake.path)
|
||||
cmd.run(["nix", "flake", "lock"], cmd.RunOpts(cwd=flake.path))
|
||||
|
||||
vm1_config = inspect_vm(machine=Machine("m1_machine", FlakeId(str(flake.path))))
|
||||
vm2_config = inspect_vm(machine=Machine("m2_machine", FlakeId(str(flake.path))))
|
||||
|
||||
Reference in New Issue
Block a user