Merge pull request 'Simplify parse_deployment_address function' (#4052) from Qubasa/clan-core:simplify_parse_deployment_address into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4052
This commit is contained in:
Luis Hebendanz
2025-06-23 13:51:37 +00:00
8 changed files with 27 additions and 44 deletions

View File

@@ -167,7 +167,7 @@ def update_hardware_config_command(args: argparse.Namespace) -> None:
) )
if args.target_host: if args.target_host:
target_host = Remote.from_deployment_address( target_host = Remote.from_ssh_uri(
machine_name=machine.name, address=args.target_host machine_name=machine.name, address=args.target_host
).override(host_key_check=host_key_check) ).override(host_key_check=host_key_check)
else: else:

View File

@@ -185,7 +185,7 @@ def install_command(args: argparse.Namespace) -> None:
host_key_check = args.host_key_check host_key_check = args.host_key_check
if target_host_str is not None: if target_host_str is not None:
target_host = Remote.from_deployment_address( target_host = Remote.from_ssh_uri(
machine_name=machine.name, address=target_host_str machine_name=machine.name, address=target_host_str
).override(host_key_check=host_key_check) ).override(host_key_check=host_key_check)
else: else:

View File

@@ -275,7 +275,7 @@ def update_command(args: argparse.Namespace) -> None:
with AsyncRuntime() as runtime: with AsyncRuntime() as runtime:
for machine in machines: for machine in machines:
if args.target_host: if args.target_host:
target_host = Remote.from_deployment_address( target_host = Remote.from_ssh_uri(
machine_name=machine.name, machine_name=machine.name,
address=args.target_host, address=args.target_host,
).override(host_key_check=host_key_check) ).override(host_key_check=host_key_check)

View File

@@ -46,7 +46,7 @@ class DeployInfo:
if not host: if not host:
msg = "Hostname cannot be empty." msg = "Hostname cannot be empty."
raise ClanError(msg) raise ClanError(msg)
remote = Remote.from_deployment_address( remote = Remote.from_ssh_uri(
machine_name="clan-installer", address=host machine_name="clan-installer", address=host
).override(host_key_check=host_key_check) ).override(host_key_check=host_key_check)
remotes.append(remote) remotes.append(remote)
@@ -59,22 +59,19 @@ class DeployInfo:
for addr in data.get("addrs", []): for addr in data.get("addrs", []):
if isinstance(addr, str): if isinstance(addr, str):
remote = Remote.from_deployment_address( remote = Remote.from_ssh_uri(
machine_name="clan-installer", machine_name="clan-installer",
address=addr, address=addr,
password=password, ).override(host_key_check=host_key_check, password=password)
).override(host_key_check=host_key_check)
addrs.append(remote) addrs.append(remote)
else: else:
msg = f"Invalid address format: {addr}" msg = f"Invalid address format: {addr}"
raise ClanError(msg) raise ClanError(msg)
if tor_addr := data.get("tor"): if tor_addr := data.get("tor"):
remote = Remote.from_deployment_address( remote = Remote.from_ssh_uri(
machine_name="clan-installer", machine_name="clan-installer",
address=tor_addr, address=tor_addr,
password=password, ).override(host_key_check=host_key_check, tor_socks=True, password=password)
tor_socks=True,
).override(host_key_check=host_key_check)
addrs.append(remote) addrs.append(remote)
return DeployInfo(addrs=addrs) return DeployInfo(addrs=addrs)

View File

@@ -258,8 +258,6 @@ def get_host(
return None return None
return RemoteSource( return RemoteSource(
data=Remote.from_deployment_address( data=Remote.from_ssh_uri(machine_name=machine.name, address=host_str),
machine_name=machine.name, address=host_str
),
source=source, source=source,
) )

View File

@@ -1,7 +1,6 @@
import re import re
import urllib.parse import urllib.parse
from pathlib import Path from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from clan_lib.errors import ClanError from clan_lib.errors import ClanError
@@ -9,16 +8,18 @@ if TYPE_CHECKING:
from clan_lib.ssh.remote import Remote from clan_lib.ssh.remote import Remote
def parse_deployment_address( def parse_ssh_uri(
*, *,
machine_name: str, machine_name: str,
address: str, address: str,
forward_agent: bool = True,
meta: dict[str, Any] | None = None,
private_key: Path | None = None,
password: str | None = None,
tor_socks: bool = False,
) -> "Remote": ) -> "Remote":
"""
Parses an SSH URI into a Remote object.
The address can be in the form of:
- `ssh://[user@]hostname[:port]?option=value&option2=value2`
- `[user@]hostname[:port]`
The specification can be found here: https://www.ietf.org/archive/id/draft-salowey-secsh-uri-00.html
"""
if address.startswith("ssh://"): if address.startswith("ssh://"):
# Strip the `ssh://` prefix if it exists # Strip the `ssh://` prefix if it exists
address = address[len("ssh://") :] address = address[len("ssh://") :]
@@ -68,10 +69,6 @@ def parse_deployment_address(
address=hostname, address=hostname,
user=user, user=user,
port=port, port=port,
private_key=private_key,
password=password,
command_prefix=machine_name, command_prefix=machine_name,
forward_agent=forward_agent,
ssh_options=options, ssh_options=options,
tor_socks=tor_socks,
) )

View File

@@ -21,7 +21,7 @@ from clan_lib.colors import AnsiColor
from clan_lib.errors import ClanError # Assuming these are available from clan_lib.errors import ClanError # Assuming these are available
from clan_lib.nix import nix_shell from clan_lib.nix import nix_shell
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
from clan_lib.ssh.parse import parse_deployment_address from clan_lib.ssh.parse import parse_ssh_uri
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
cmdlog = logging.getLogger(__name__) cmdlog = logging.getLogger(__name__)
@@ -61,6 +61,8 @@ class Remote:
*, *,
host_key_check: HostKeyCheck | None = None, host_key_check: HostKeyCheck | None = None,
private_key: Path | None = None, private_key: Path | None = None,
password: str | None = None,
tor_socks: bool | None = None,
) -> "Remote": ) -> "Remote":
""" """
Returns a new Remote instance with the same data but with a different host_key_check. Returns a new Remote instance with the same data but with a different host_key_check.
@@ -71,14 +73,14 @@ class Remote:
command_prefix=self.command_prefix, command_prefix=self.command_prefix,
port=self.port, port=self.port,
private_key=private_key if private_key is not None else self.private_key, private_key=private_key if private_key is not None else self.private_key,
password=self.password, password=password if password is not None else self.password,
forward_agent=self.forward_agent, forward_agent=self.forward_agent,
host_key_check=host_key_check host_key_check=host_key_check
if host_key_check is not None if host_key_check is not None
else self.host_key_check, else self.host_key_check,
verbose_ssh=self.verbose_ssh, verbose_ssh=self.verbose_ssh,
ssh_options=self.ssh_options, ssh_options=self.ssh_options,
tor_socks=self.tor_socks, tor_socks=tor_socks if tor_socks is not None else self.tor_socks,
_control_path_dir=self._control_path_dir, _control_path_dir=self._control_path_dir,
_askpass_path=self._askpass_path, _askpass_path=self._askpass_path,
) )
@@ -88,28 +90,17 @@ class Remote:
return f"{self.user}@{self.address}" return f"{self.user}@{self.address}"
@classmethod @classmethod
def from_deployment_address( def from_ssh_uri(
cls, cls,
*, *,
machine_name: str, machine_name: str,
address: str, address: str,
forward_agent: bool = True,
private_key: Path | None = None,
password: str | None = None,
tor_socks: bool = False,
) -> "Remote": ) -> "Remote":
""" """
Parse a deployment address and return a Host object. Parse a deployment address and return a Host object.
""" """
return parse_deployment_address( return parse_ssh_uri(machine_name=machine_name, address=address)
machine_name=machine_name,
address=address,
forward_agent=forward_agent,
private_key=private_key,
password=password,
tor_socks=tor_socks,
)
def run_local( def run_local(
self, self,

View File

@@ -110,7 +110,7 @@ def test_parse_deployment_address(
with maybe_check_exception: with maybe_check_exception:
machine_name = "foo" machine_name = "foo"
result = Remote.from_deployment_address( result = Remote.from_ssh_uri(
machine_name=machine_name, machine_name=machine_name,
address=test_addr, address=test_addr,
).override(host_key_check="strict") ).override(host_key_check="strict")
@@ -130,7 +130,7 @@ def test_parse_deployment_address(
def test_parse_ssh_options() -> None: def test_parse_ssh_options() -> None:
addr = "root@example.com:2222?IdentityFile=/path/to/private/key&StrictRemoteKeyChecking=yes" addr = "root@example.com:2222?IdentityFile=/path/to/private/key&StrictRemoteKeyChecking=yes"
host = Remote.from_deployment_address(machine_name="foo", address=addr).override( host = Remote.from_ssh_uri(machine_name="foo", address=addr).override(
host_key_check="strict" host_key_check="strict"
) )
assert host.address == "example.com" assert host.address == "example.com"