Add --ssh-pubkey FILE argument

This commit is contained in:
Qubasa
2024-04-23 18:55:00 +02:00
parent 038e63612f
commit b1dd5c2832
15 changed files with 192 additions and 31 deletions

View File

@@ -58,6 +58,7 @@ def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
"--debug",
help="Enable debug logging",
action="store_true",
default=False,
)
parser.add_argument(

View File

@@ -1,8 +1,8 @@
import argparse
import importlib
import json
import logging
import os
import shlex
import shutil
import textwrap
from collections.abc import Sequence
@@ -20,8 +20,48 @@ from .nix import nix_shell
log = logging.getLogger(__name__)
def list_available_ssh_keys(ssh_dir: Path = Path("~/.ssh").expanduser()) -> list[Path]:
"""
Function to list all available SSH public keys in the default .ssh directory.
Returns a list of paths to available public key files.
"""
public_key_patterns = ["*.pub"]
available_keys: list[Path] = []
# Check for public key files
for pattern in public_key_patterns:
for key_path in ssh_dir.glob(pattern):
if key_path.is_file():
available_keys.append(key_path)
return available_keys
def read_public_key_contents(public_keys: list[Path]) -> list[str]:
"""
Function to read and return the contents of available SSH public keys.
Returns a list containing the contents of each public key.
"""
public_key_contents = []
for key_path in public_keys:
try:
with open(key_path.expanduser()) as key_file:
public_key_contents.append(key_file.read().strip())
except FileNotFoundError:
log.error(f"Public key file not found: {key_path}")
return public_key_contents
def flash_machine(
machine: Machine, mode: str, disks: dict[str, str], dry_run: bool, debug: bool
machine: Machine,
*,
mode: str,
disks: dict[str, str],
system_config: dict[str, Any],
dry_run: bool,
debug: bool,
) -> None:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store: SecretStoreBase = secret_facts_module.SecretStore(
@@ -58,12 +98,17 @@ def flash_machine(
disko_install.extend(["--extra-files", str(local_dir), upload_dir])
disko_install.extend(["--flake", str(machine.flake) + "#" + machine.name])
disko_install.extend(["--mode", str(mode)])
disko_install.extend(
[
"--system-config",
json.dumps(system_config),
]
)
cmd = nix_shell(
["nixpkgs#disko"],
disko_install,
)
print("$", " ".join(map(shlex.quote, cmd)))
run(cmd, log=Log.BOTH, error_msg=f"Failed to flash {machine}")
@@ -72,6 +117,7 @@ class FlashOptions:
flake: Path
machine: str
disks: dict[str, str]
ssh_keys_path: list[Path]
dry_run: bool
confirm: bool
debug: bool
@@ -99,11 +145,13 @@ def flash_command(args: argparse.Namespace) -> None:
flake=args.flake,
machine=args.machine,
disks=args.disk,
ssh_keys_path=args.ssh_pubkey,
dry_run=args.dry_run,
confirm=not args.yes,
debug=args.debug,
mode=args.mode,
)
machine = Machine(opts.machine, flake=opts.flake)
if opts.confirm and not opts.dry_run:
disk_str = ", ".join(f"{name}={device}" for name, device in opts.disks.items())
@@ -114,8 +162,38 @@ def flash_command(args: argparse.Namespace) -> None:
ask = input(msg)
if ask != "y":
return
root_keys = read_public_key_contents(opts.ssh_keys_path)
if opts.confirm and not root_keys:
msg = "Should we add your SSH public keys to the root user? [y/N] "
ask = input(msg)
if ask == "y":
pubkeys = list_available_ssh_keys()
root_keys.extend(read_public_key_contents(pubkeys))
elif not opts.confirm and not root_keys:
pubkeys = list_available_ssh_keys()
root_keys.extend(read_public_key_contents(pubkeys))
# If ssh-pubkeys set, we don't need to ask for confirmation
elif opts.confirm and root_keys:
pass
elif not opts.confirm and root_keys:
pass
else:
raise ClanError("Invalid state")
user_keys = {
"users": {
"users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}}
}
}
flash_machine(
machine, opts.mode, disks=opts.disks, dry_run=opts.dry_run, debug=opts.debug
machine,
mode=opts.mode,
disks=opts.disks,
system_config=user_keys,
dry_run=opts.dry_run,
debug=opts.debug,
)
@@ -147,7 +225,13 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
choices=["format", "mount"],
default="format",
)
parser.add_argument(
"--ssh-pubkey",
type=Path,
action="append",
default=[],
help="ssh pubkey file to add to the root user. Can be used multiple times",
)
parser.add_argument(
"--yes",
action="store_true",
@@ -160,10 +244,4 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
default=False,
action="store_true",
)
parser.add_argument(
"--debug",
help="Print debug information",
default=False,
action="store_true",
)
parser.set_defaults(func=flash_command)

View File

@@ -106,13 +106,7 @@ def nix_shell(packages: list[str], cmd: list[str]) -> list[str]:
if os.environ.get("IN_NIX_SANDBOX"):
return cmd
return [
*nix_command(
[
"shell",
"--inputs-from",
f"{nixpkgs_flake()!s}",
]
),
*nix_command(["shell", "--offline", "--inputs-from", f"{nixpkgs_flake()!s}"]),
*packages,
"-c",
*cmd,

View File

@@ -23,11 +23,11 @@
zbar,
tor,
git,
nixpkgs,
qemu,
gnupg,
e2fsprogs,
mypy,
nixpkgs,
clan-core-path,
}:
let
@@ -95,17 +95,16 @@ let
description = "dependencies for the clan-cli";
inputs = {
nixpkgs.url = "nixpkgs";
nixpkgs.url = "path://${nixpkgs}";
};
outputs = _inputs: { };
}
EOF
ln -s ${nixpkgs} $out/path
nix flake lock $out \
nix flake update $out \
--store ./. \
--extra-experimental-features 'nix-command flakes' \
--override-input nixpkgs ${nixpkgs}
--extra-experimental-features 'nix-command flakes'
'';
in
python3.pkgs.buildPythonApplication {

View File

@@ -38,6 +38,7 @@
'';
in
{
devShells.clan-cli = pkgs.callPackage ./shell.nix { inherit (self'.packages) clan-cli; };
packages = {
clan-cli = pkgs.python3.pkgs.callPackage ./default.nix {

View File

@@ -12,6 +12,7 @@ let
rope
setuptools
wheel
ipdb
pip
]);
in
@@ -21,6 +22,8 @@ mkShell {
ruff
] ++ devshellTestDeps;
PYTHONBREAKPOINT = "ipdb.set_trace";
shellHook = ''
export GIT_ROOT="$(git rev-parse --show-toplevel)"
export PKG_ROOT="$GIT_ROOT/pkgs/clan-cli"