Merge pull request 'clan-cli: "fix" ssh option parsing' (#2899) from lopter/clan-core:lo-fix-ssh-option-parsing into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2899
This commit is contained in:
hsjobeki
2025-02-19 01:41:07 +00:00
2 changed files with 38 additions and 25 deletions

View File

@@ -9,11 +9,11 @@
By default, the node's attribute name will be used.
If set to null, only local deployment will be supported.
format: user@host:port&SSH_OPTION=SSH_VALUE
format: user@host:port?SSH_OPTION=SSH_VALUE[&SSH_OPTION_2=VALUE_2]
examples:
- machine.example.com
- user@machine2.example.com
- root@example.com:2222&IdentityFile=/path/to/private/key
- root@example.com:2222?IdentityFile=/path/to/private/key&StrictHostKeyChecking=yes
'';
default = null;
type = lib.types.nullOr lib.types.str;
@@ -24,11 +24,11 @@
If set to null, the targetHost will be used.
format: user@host:port&SSH_OPTION=SSH_VALUE
format: user@host:port?SSH_OPTION=SSH_VALUE&SSH_OPTION_2=VALUE_2
examples:
- machine.example.com
- user@machine2.example.com
- root@example.com:2222&IdentityFile=/path/to/private/key
- root@example.com:2222?IdentityFile=/path/to/private/key&StrictHostKeyChecking=yes
'';
type = lib.types.nullOr lib.types.str;
default = null;

View File

@@ -14,33 +14,46 @@ def parse_deployment_address(
forward_agent: bool = True,
meta: dict[str, Any] | None = None,
) -> Host:
if meta is None:
meta = {}
parts = host.split("@")
user: str | None = None
# count the number of : in the hostname
if host.count(":") > 1 and not re.match(r".*\[.*\]", host):
parts = host.split("?", maxsplit=1)
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
parts = endpoint.split("@")
match len(parts):
case 2:
user, host_port = parts
case 1:
user, host_port = "", parts[0]
case _:
msg = f"Invalid host, got `{host}` but expected something like `[user@]hostname[:port]`"
raise ClanError(msg)
# Make this check now rather than failing with a `ValueError`
# when looking up the port from the `urlsplit` result below:
if host_port.count(":") > 1 and not re.match(r".*\[.*]", host_port):
msg = f"Invalid hostname: {host}. IPv6 addresses must be enclosed in brackets , e.g. [::1]"
raise ClanError(msg)
if len(parts) > 1:
user = parts[0]
hostname = parts[1]
else:
hostname = parts[0]
maybe_options = hostname.split("?")
options: dict[str, str] = {}
if len(maybe_options) > 1:
hostname = maybe_options[0]
for option in maybe_options[1].split("&"):
k, v = option.split("=")
options[k] = v
result = urllib.parse.urlsplit("//" + hostname)
for o in maybe_options.split("&"):
if len(o) == 0:
continue
parts = o.split("=", maxsplit=1)
if len(parts) != 2:
msg = (
f"Invalid option in host `{host}`: option `{o}` does not have "
f"a value (i.e. expected something like `name=value`)"
)
raise ClanError(msg)
name, value = parts
options[name] = value
result = urllib.parse.urlsplit(f"//{host_port}")
if not result.hostname:
msg = f"Invalid hostname: {hostname}"
msg = f"Invalid host, got `{host}` but expected something like `[user@]hostname[:port]`"
raise ClanError(msg)
hostname = result.hostname
port = result.port
meta = meta.copy()
return Host(
hostname,
user=user,
@@ -48,6 +61,6 @@ def parse_deployment_address(
host_key_check=host_key_check,
command_prefix=machine_name,
forward_agent=forward_agent,
meta=meta,
meta={} if meta is None else meta.copy(),
ssh_options=options,
)