clan-cli: Fix bug where --host-key-check is not applied everywhere

This commit is contained in:
Qubasa
2024-10-07 22:29:10 +02:00
committed by Luis Hebendanz
parent 899bfde40c
commit e85e5c13e4
6 changed files with 63 additions and 43 deletions

View File

@@ -135,7 +135,7 @@ def show_block_devices(options: BlockDeviceOptions) -> Blockdevices:
"ssh",
*(["-i", f"{keyfile}"] if keyfile else []),
# Disable strict host key checking
"-o StrictHostKeyChecking=no",
"-o StrictHostKeyChecking=accept-new",
# Disable known hosts file
"-o UserKnownHostsFile=/dev/null",
f"{options.hostname}",

View File

@@ -23,14 +23,13 @@ def upload_secrets(machine: Machine) -> None:
secret_facts_store.upload(Path(tempdir))
host = machine.target_host
ssh_cmd = host.ssh_cmd()
run(
nix_shell(
["nixpkgs#rsync"],
[
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
" ".join(["ssh", *host.ssh_cmd_opts()]),
"--recursive",
"--links",
"--times",
@@ -38,7 +37,7 @@ def upload_secrets(machine: Machine) -> None:
"--delete",
"--chmod=D700,F600",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
f"{host.target}:{machine.secrets_upload_directory}/",
],
),
log=Log.BOTH,

View File

@@ -147,7 +147,7 @@ def generate_machine_hardware_info(opts: HardwareGenerateOptions) -> HardwareRep
"UserKnownHostsFile=/dev/null",
# Disable strict host key checking. The GUI user cannot type "yes" into the ssh terminal.
"-o",
"StrictHostKeyChecking=no",
"StrictHostKeyChecking=accept-new",
*(
["-p", str(machine.target_host.port)]
if machine.target_host.port

View File

@@ -109,7 +109,7 @@ def check_machine_online(
*(["-i", f"{opts.keyfile}"] if opts and opts.keyfile else []),
# Disable strict host key checking
"-o",
"StrictHostKeyChecking=no",
"StrictHostKeyChecking=accept-new",
# Disable known hosts file
"-o",
"UserKnownHostsFile=/dev/null",

View File

@@ -31,10 +31,15 @@ def is_path_input(node: dict[str, dict[str, str]]) -> bool:
return locked["type"] == "path" or locked.get("url", "").startswith("file://")
def upload_sources(
flake_url: str, remote_url: str, always_upload_source: bool = False
) -> str:
def upload_sources(machine: Machine, always_upload_source: bool = False) -> str:
host = machine.build_host
env = os.environ.copy()
env["NIX_SSHOPTS"] = " ".join(host.ssh_cmd_opts())
if not always_upload_source:
flake_url = (
str(machine.flake.path) if machine.flake.is_local() else machine.flake.url
)
flake_data = nix_metadata(flake_url)
url = flake_data["resolvedUrl"]
has_path_inputs = any(
@@ -47,14 +52,11 @@ def upload_sources(
if not has_path_inputs:
# Just copy the flake to the remote machine, we can substitute other inputs there.
path = flake_data["path"]
env = os.environ.copy()
# env["NIX_SSHOPTS"] = " ".join(opts.remote_ssh_options)
assert remote_url
cmd = nix_command(
[
"copy",
"--to",
f"ssh://{remote_url}",
f"ssh://{host.target}",
"--no-check-sigs",
path,
]
@@ -63,19 +65,18 @@ def upload_sources(
return path
# Slow path: we need to upload all sources to the remote machine
assert remote_url
cmd = nix_command(
[
"flake",
"archive",
"--to",
f"ssh://{remote_url}",
f"ssh://{host.target}",
"--json",
flake_url,
]
)
log.info("run %s", shlex.join(cmd))
proc = run(cmd, error_msg="failed to upload sources")
proc = run(cmd, env=env, error_msg="failed to upload sources")
try:
return json.loads(proc.stdout)["path"]
@@ -111,27 +112,14 @@ def deploy_machine(machines: MachineGroup) -> None:
def deploy(machine: Machine) -> None:
host = machine.build_host
target = f"{host.user or 'root'}@{host.host}"
ssh_arg = f"-p {host.port}" if host.port else ""
env = os.environ.copy()
env["NIX_SSHOPTS"] = ssh_arg
generate_facts([machine], None, False)
generate_vars([machine], None, False)
upload_secrets(machine)
path = upload_sources(
flake_url=str(machine.flake.path)
if machine.flake.is_local()
else machine.flake.url,
remote_url=target,
machine,
)
if host.host_key_check != HostKeyCheck.STRICT:
ssh_arg += " -o StrictHostKeyChecking=no"
if host.host_key_check == HostKeyCheck.NONE:
ssh_arg += " -o UserKnownHostsFile=/dev/null"
ssh_arg += " -i " + host.key if host.key else ""
cmd = [
"nixos-rebuild",
@@ -153,6 +141,7 @@ def deploy_machine(machines: MachineGroup) -> None:
if target_host := host.meta.get("target_host"):
target_host = f"{target_host.user or 'root'}@{target_host.host}"
cmd.extend(["--target-host", target_host])
ret = host.run(cmd, check=False)
# re-retry switch if the first time fails
if ret.returncode != 0:
@@ -201,6 +190,8 @@ def update(args: argparse.Namespace) -> None:
else:
machines = get_selected_machines(args.flake, args.option, args.machines)
for machine in machines:
machine.host_key_check = HostKeyCheck.from_str(args.host_key_check)
host_group = MachineGroup(machines)
deploy_machine(host_group)
@@ -219,8 +210,8 @@ def register_update_parser(parser: argparse.ArgumentParser) -> None:
add_dynamic_completer(machines_parser, complete_machines)
parser.add_argument(
"--host-key-check",
choices=["strict", "tofu", "none"],
default="strict",
choices=["strict", "ask", "tofu", "none"],
default="ask",
help="Host key (.ssh/known_hosts) check mode",
)

View File

@@ -128,10 +128,12 @@ NO_OUTPUT_TIMEOUT = 20
class HostKeyCheck(Enum):
# Strictly check ssh host keys, prompt for unknown ones
STRICT = 0
# Ask for confirmation on first use
ASK = 1
# Trust on ssh keys on first use
TOFU = 1
TOFU = 2
# Do not check ssh host keys
NONE = 2
NONE = 3
@staticmethod
def from_str(label: str) -> "HostKeyCheck":
@@ -141,6 +143,25 @@ class HostKeyCheck(Enum):
description = "Choose from: " + ", ".join(HostKeyCheck.__members__)
raise ClanError(msg, description=description)
def to_ssh_opt(self) -> list[str]:
match self:
case HostKeyCheck.STRICT:
return ["-o", "StrictHostKeyChecking=yes"]
case HostKeyCheck.ASK:
return []
case HostKeyCheck.TOFU:
return ["-o", "StrictHostKeyChecking=accept-new"]
case HostKeyCheck.NONE:
return [
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
]
case _:
msg = "Invalid HostKeyCheck"
raise ClanError(msg)
class Host:
def __init__(
@@ -151,7 +172,7 @@ class Host:
key: str | None = None,
forward_agent: bool = False,
command_prefix: str | None = None,
host_key_check: HostKeyCheck = HostKeyCheck.STRICT,
host_key_check: HostKeyCheck = HostKeyCheck.ASK,
meta: dict[str, Any] | None = None,
verbose_ssh: bool = False,
ssh_options: dict[str, str] | None = None,
@@ -190,6 +211,10 @@ class Host:
def __str__(self) -> str:
return f"{self.user}@{self.host}" + str(self.port if self.port else "")
@property
def target(self) -> str:
return f"{self.user or 'root'}@{self.host}"
def _prefix_output(
self,
displayed_cmd: str,
@@ -485,13 +510,11 @@ class Host:
timeout=timeout,
)
def ssh_cmd(
def ssh_cmd_opts(
self,
verbose_ssh: bool = False,
tty: bool = False,
) -> list[str]:
ssh_target = f"{self.user}@{self.host}" if self.user is not None else self.host
ssh_opts = ["-A"] if self.forward_agent else []
for k, v in self.ssh_options.items():
@@ -502,16 +525,23 @@ class Host:
if self.key:
ssh_opts.extend(["-i", self.key])
if self.host_key_check != HostKeyCheck.STRICT:
ssh_opts.extend(["-o", "StrictHostKeyChecking=no"])
if self.host_key_check == HostKeyCheck.NONE:
ssh_opts.extend(["-o", "UserKnownHostsFile=/dev/null"])
ssh_opts.extend(self.host_key_check.to_ssh_opt())
if verbose_ssh or self.verbose_ssh:
ssh_opts.extend(["-v"])
if tty:
ssh_opts.extend(["-t"])
return ssh_opts
return ["ssh", ssh_target, *ssh_opts]
def ssh_cmd(
self,
verbose_ssh: bool = False,
tty: bool = False,
) -> list[str]:
return [
"ssh",
self.target,
*self.ssh_cmd_opts(verbose_ssh=verbose_ssh, tty=tty),
]
T = TypeVar("T")