cmd.py refactor part 6
This commit is contained in:
@@ -8,6 +8,7 @@ from pathlib import Path
|
|||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from typing import IO
|
from typing import IO
|
||||||
|
|
||||||
|
from ..cmd import Log, run
|
||||||
from ..dirs import user_config_dir
|
from ..dirs import user_config_dir
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
@@ -36,7 +37,7 @@ def get_public_key(privkey: str) -> str:
|
|||||||
def generate_private_key() -> tuple[str, str]:
|
def generate_private_key() -> tuple[str, str]:
|
||||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
|
cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
|
||||||
try:
|
try:
|
||||||
proc = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
|
proc = run(cmd)
|
||||||
res = proc.stdout.strip()
|
res = proc.stdout.strip()
|
||||||
pubkey = None
|
pubkey = None
|
||||||
private_key = None
|
private_key = None
|
||||||
@@ -129,11 +130,7 @@ def update_keys(secret_path: Path, keys: list[str]) -> None:
|
|||||||
str(secret_path / "secret"),
|
str(secret_path / "secret"),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
res = subprocess.run(cmd)
|
run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}")
|
||||||
if res.returncode != 0:
|
|
||||||
raise ClanError(
|
|
||||||
f"Failed to update keys for {secret_path}: sops exited with {res.returncode}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def encrypt_file(
|
def encrypt_file(
|
||||||
@@ -147,7 +144,7 @@ def encrypt_file(
|
|||||||
args = ["sops", "--config", str(manifest)]
|
args = ["sops", "--config", str(manifest)]
|
||||||
args.extend([str(secret_path)])
|
args.extend([str(secret_path)])
|
||||||
cmd = nix_shell(["nixpkgs#sops"], args)
|
cmd = nix_shell(["nixpkgs#sops"], args)
|
||||||
p = subprocess.run(cmd)
|
p = run(cmd, log=Log.BOTH, check=False)
|
||||||
# returns 200 if the file is changed
|
# returns 200 if the file is changed
|
||||||
if p.returncode != 0 and p.returncode != 200:
|
if p.returncode != 0 and p.returncode != 200:
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
@@ -167,7 +164,7 @@ def encrypt_file(
|
|||||||
args = ["sops", "--config", str(manifest)]
|
args = ["sops", "--config", str(manifest)]
|
||||||
args.extend(["-i", "--encrypt", str(f.name)])
|
args.extend(["-i", "--encrypt", str(f.name)])
|
||||||
cmd = nix_shell(["nixpkgs#sops"], args)
|
cmd = nix_shell(["nixpkgs#sops"], args)
|
||||||
subprocess.run(cmd, check=True)
|
run(cmd, log=Log.BOTH)
|
||||||
# atomic copy of the encrypted file
|
# atomic copy of the encrypted file
|
||||||
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
||||||
shutil.copyfile(f.name, f2.name)
|
shutil.copyfile(f.name, f2.name)
|
||||||
@@ -185,11 +182,7 @@ def decrypt_file(secret_path: Path) -> str:
|
|||||||
["nixpkgs#sops"],
|
["nixpkgs#sops"],
|
||||||
["sops", "--config", str(manifest), "--decrypt", str(secret_path)],
|
["sops", "--config", str(manifest), "--decrypt", str(secret_path)],
|
||||||
)
|
)
|
||||||
res = subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
|
res = run(cmd, error_msg=f"Could not decrypt {secret_path}")
|
||||||
if res.returncode != 0:
|
|
||||||
raise ClanError(
|
|
||||||
f"Failed to decrypt {secret_path}: sops exited with {res.returncode}"
|
|
||||||
)
|
|
||||||
return res.stdout
|
return res.stdout
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from pathlib import Path
|
|||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from clan_cli.cmd import run
|
from clan_cli.cmd import Log, run
|
||||||
from clan_cli.nix import nix_shell
|
from clan_cli.nix import nix_shell
|
||||||
|
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
@@ -61,7 +61,7 @@ export secrets={shlex.quote(str(secrets_dir))}
|
|||||||
{generator}
|
{generator}
|
||||||
"""
|
"""
|
||||||
cmd = nix_shell(["nixpkgs#bash"], ["bash", "-c", text])
|
cmd = nix_shell(["nixpkgs#bash"], ["bash", "-c", text])
|
||||||
run(cmd)
|
run(cmd, log=Log.BOTH)
|
||||||
|
|
||||||
for name in secrets:
|
for name in secrets:
|
||||||
secret_file = secrets_dir / name
|
secret_file = secrets_dir / name
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
from ..cmd import Log, run
|
||||||
from ..machines.machines import Machine
|
from ..machines.machines import Machine
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
|
|
||||||
@@ -19,7 +19,7 @@ def upload_secrets(machine: Machine) -> None:
|
|||||||
host = machine.host
|
host = machine.host
|
||||||
|
|
||||||
ssh_cmd = host.ssh_cmd()
|
ssh_cmd = host.ssh_cmd()
|
||||||
subprocess.run(
|
run(
|
||||||
nix_shell(
|
nix_shell(
|
||||||
["nixpkgs#rsync"],
|
["nixpkgs#rsync"],
|
||||||
[
|
[
|
||||||
@@ -32,7 +32,7 @@ def upload_secrets(machine: Machine) -> None:
|
|||||||
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
|
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
check=True,
|
log=Log.BOTH,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
import subprocess
|
|
||||||
|
|
||||||
|
from ..cmd import Log, run
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
|
|
||||||
|
|
||||||
@@ -30,12 +30,11 @@ def ssh(
|
|||||||
f"{user}@{host}",
|
f"{user}@{host}",
|
||||||
]
|
]
|
||||||
cmd = nix_shell(packages, ["torify", *password_args, *_ssh_args])
|
cmd = nix_shell(packages, ["torify", *password_args, *_ssh_args])
|
||||||
subprocess.run(cmd)
|
run(cmd, log=Log.BOTH)
|
||||||
|
|
||||||
|
|
||||||
def qrcode_scan(picture_file: str) -> str:
|
def qrcode_scan(picture_file: str) -> str:
|
||||||
return (
|
return run(
|
||||||
subprocess.run(
|
|
||||||
nix_shell(
|
nix_shell(
|
||||||
["nixpkgs#zbar"],
|
["nixpkgs#zbar"],
|
||||||
[
|
[
|
||||||
@@ -45,12 +44,7 @@ def qrcode_scan(picture_file: str) -> str:
|
|||||||
picture_file,
|
picture_file,
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
stdout=subprocess.PIPE,
|
).stdout.strip()
|
||||||
check=True,
|
|
||||||
)
|
|
||||||
.stdout.decode()
|
|
||||||
.strip()
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def main(args: argparse.Namespace) -> None:
|
def main(args: argparse.Namespace) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user