lib/Remote: Unify class method _parse_ssh_uri with class file
This commit is contained in:
@@ -1,74 +0,0 @@
|
|||||||
import re
|
|
||||||
import urllib.parse
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from clan_lib.errors import ClanError
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from clan_lib.ssh.remote import Remote
|
|
||||||
|
|
||||||
|
|
||||||
def parse_ssh_uri(
|
|
||||||
*,
|
|
||||||
machine_name: str,
|
|
||||||
address: str,
|
|
||||||
) -> "Remote":
|
|
||||||
"""
|
|
||||||
Parses an SSH URI into a Remote object.
|
|
||||||
The address can be in the form of:
|
|
||||||
- `ssh://[user@]hostname[:port]?option=value&option2=value2`
|
|
||||||
- `[user@]hostname[:port]`
|
|
||||||
The specification can be found here: https://www.ietf.org/archive/id/draft-salowey-secsh-uri-00.html
|
|
||||||
"""
|
|
||||||
if address.startswith("ssh://"):
|
|
||||||
# Strip the `ssh://` prefix if it exists
|
|
||||||
address = address[len("ssh://") :]
|
|
||||||
|
|
||||||
parts = address.split("?", maxsplit=1)
|
|
||||||
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
|
|
||||||
|
|
||||||
parts = endpoint.split("@")
|
|
||||||
match len(parts):
|
|
||||||
case 2:
|
|
||||||
user, host_port = parts
|
|
||||||
case 1:
|
|
||||||
user, host_port = "root", parts[0]
|
|
||||||
case _:
|
|
||||||
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
|
|
||||||
raise ClanError(msg)
|
|
||||||
|
|
||||||
# Make this check now rather than failing with a `ValueError`
|
|
||||||
# when looking up the port from the `urlsplit` result below:
|
|
||||||
if host_port.count(":") > 1 and not re.match(r".*\[.*]", host_port):
|
|
||||||
msg = f"Invalid hostname: {address}. IPv6 addresses must be enclosed in brackets , e.g. [::1]"
|
|
||||||
raise ClanError(msg)
|
|
||||||
|
|
||||||
options: dict[str, str] = {}
|
|
||||||
for o in maybe_options.split("&"):
|
|
||||||
if len(o) == 0:
|
|
||||||
continue
|
|
||||||
parts = o.split("=", maxsplit=1)
|
|
||||||
if len(parts) != 2:
|
|
||||||
msg = (
|
|
||||||
f"Invalid option in host `{address}`: option `{o}` does not have "
|
|
||||||
f"a value (i.e. expected something like `name=value`)"
|
|
||||||
)
|
|
||||||
raise ClanError(msg)
|
|
||||||
name, value = parts
|
|
||||||
options[name] = value
|
|
||||||
|
|
||||||
result = urllib.parse.urlsplit(f"//{host_port}")
|
|
||||||
if not result.hostname:
|
|
||||||
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
|
|
||||||
raise ClanError(msg)
|
|
||||||
hostname = result.hostname
|
|
||||||
port = result.port
|
|
||||||
from clan_lib.ssh.remote import Remote
|
|
||||||
|
|
||||||
return Remote(
|
|
||||||
address=hostname,
|
|
||||||
user=user,
|
|
||||||
port=port,
|
|
||||||
command_prefix=machine_name,
|
|
||||||
ssh_options=options,
|
|
||||||
)
|
|
||||||
@@ -1,9 +1,11 @@
|
|||||||
import ipaddress
|
import ipaddress
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import shlex
|
import shlex
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
import urllib.parse
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
@@ -17,7 +19,6 @@ from clan_lib.colors import AnsiColor
|
|||||||
from clan_lib.errors import ClanError, indent_command # Assuming these are available
|
from clan_lib.errors import ClanError, indent_command # Assuming these are available
|
||||||
from clan_lib.nix import nix_shell
|
from clan_lib.nix import nix_shell
|
||||||
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
|
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
|
||||||
from clan_lib.ssh.parse import parse_ssh_uri
|
|
||||||
from clan_lib.ssh.socks_wrapper import SocksWrapper
|
from clan_lib.ssh.socks_wrapper import SocksWrapper
|
||||||
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
|
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
|
||||||
|
|
||||||
@@ -108,7 +109,7 @@ class Remote:
|
|||||||
Parse a deployment address and return a Remote object.
|
Parse a deployment address and return a Remote object.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return parse_ssh_uri(machine_name=machine_name, address=address)
|
return _parse_ssh_uri(machine_name=machine_name, address=address)
|
||||||
|
|
||||||
def run_local(
|
def run_local(
|
||||||
self,
|
self,
|
||||||
@@ -468,3 +469,69 @@ class Remote:
|
|||||||
from clan_lib.network.check import check_machine_ssh_login
|
from clan_lib.network.check import check_machine_ssh_login
|
||||||
|
|
||||||
return check_machine_ssh_login(self)
|
return check_machine_ssh_login(self)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_ssh_uri(
|
||||||
|
*,
|
||||||
|
machine_name: str,
|
||||||
|
address: str,
|
||||||
|
) -> "Remote":
|
||||||
|
"""
|
||||||
|
Parses an SSH URI into a Remote object.
|
||||||
|
The address can be in the form of:
|
||||||
|
- `ssh://[user@]hostname[:port]?option=value&option2=value2`
|
||||||
|
- `[user@]hostname[:port]`
|
||||||
|
The specification can be found here: https://www.ietf.org/archive/id/draft-salowey-secsh-uri-00.html
|
||||||
|
"""
|
||||||
|
if address.startswith("ssh://"):
|
||||||
|
# Strip the `ssh://` prefix if it exists
|
||||||
|
address = address[len("ssh://") :]
|
||||||
|
|
||||||
|
parts = address.split("?", maxsplit=1)
|
||||||
|
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
|
||||||
|
|
||||||
|
parts = endpoint.split("@")
|
||||||
|
match len(parts):
|
||||||
|
case 2:
|
||||||
|
user, host_port = parts
|
||||||
|
case 1:
|
||||||
|
user, host_port = "root", parts[0]
|
||||||
|
case _:
|
||||||
|
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
|
||||||
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
# Make this check now rather than failing with a `ValueError`
|
||||||
|
# when looking up the port from the `urlsplit` result below:
|
||||||
|
if host_port.count(":") > 1 and not re.match(r".*\[.*]", host_port):
|
||||||
|
msg = f"Invalid hostname: {address}. IPv6 addresses must be enclosed in brackets , e.g. [::1]"
|
||||||
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
options: dict[str, str] = {}
|
||||||
|
for o in maybe_options.split("&"):
|
||||||
|
if len(o) == 0:
|
||||||
|
continue
|
||||||
|
parts = o.split("=", maxsplit=1)
|
||||||
|
if len(parts) != 2:
|
||||||
|
msg = (
|
||||||
|
f"Invalid option in host `{address}`: option `{o}` does not have "
|
||||||
|
f"a value (i.e. expected something like `name=value`)"
|
||||||
|
)
|
||||||
|
raise ClanError(msg)
|
||||||
|
name, value = parts
|
||||||
|
options[name] = value
|
||||||
|
|
||||||
|
result = urllib.parse.urlsplit(f"//{host_port}")
|
||||||
|
if not result.hostname:
|
||||||
|
msg = f"Invalid host, got `{address}` but expected something like `[user@]hostname[:port]`"
|
||||||
|
raise ClanError(msg)
|
||||||
|
hostname = result.hostname
|
||||||
|
port = result.port
|
||||||
|
from clan_lib.ssh.remote import Remote
|
||||||
|
|
||||||
|
return Remote(
|
||||||
|
address=hostname,
|
||||||
|
user=user,
|
||||||
|
port=port,
|
||||||
|
command_prefix=machine_name,
|
||||||
|
ssh_options=options,
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user