Merge pull request 'add edit flag to secret cli' (#265) from Mic92-main into main
This commit is contained in:
@@ -4,7 +4,7 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import IO, Union
|
from typing import IO
|
||||||
|
|
||||||
from .. import tty
|
from .. import tty
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
@@ -54,7 +54,7 @@ def collect_keys_for_path(path: Path) -> set[str]:
|
|||||||
|
|
||||||
def encrypt_secret(
|
def encrypt_secret(
|
||||||
secret: Path,
|
secret: Path,
|
||||||
value: Union[IO[str], str],
|
value: IO[str] | str | None,
|
||||||
add_users: list[str] = [],
|
add_users: list[str] = [],
|
||||||
add_machines: list[str] = [],
|
add_machines: list[str] = [],
|
||||||
add_groups: list[str] = [],
|
add_groups: list[str] = [],
|
||||||
@@ -203,8 +203,10 @@ def get_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
def set_command(args: argparse.Namespace) -> None:
|
def set_command(args: argparse.Namespace) -> None:
|
||||||
env_value = os.environ.get("SOPS_NIX_SECRET")
|
env_value = os.environ.get("SOPS_NIX_SECRET")
|
||||||
secret_value: Union[str, IO[str]] = sys.stdin
|
secret_value: str | IO[str] | None = sys.stdin
|
||||||
if env_value:
|
if args.edit:
|
||||||
|
secret_value = None
|
||||||
|
elif env_value:
|
||||||
secret_value = env_value
|
secret_value = env_value
|
||||||
elif tty.is_interactive():
|
elif tty.is_interactive():
|
||||||
secret_value = getpass.getpass(prompt="Paste your secret: ")
|
secret_value = getpass.getpass(prompt="Paste your secret: ")
|
||||||
@@ -258,6 +260,13 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
|||||||
default=[],
|
default=[],
|
||||||
help="the user to import the secrets to (can be repeated)",
|
help="the user to import the secrets to (can be repeated)",
|
||||||
)
|
)
|
||||||
|
parser_set.add_argument(
|
||||||
|
"-e",
|
||||||
|
"--edit",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="edit the secret with $EDITOR instead of pasting it",
|
||||||
|
)
|
||||||
parser_set.set_defaults(func=set_command)
|
parser_set.set_defaults(func=set_command)
|
||||||
|
|
||||||
parser_rename = subparser.add_parser("rename", help="rename a secret")
|
parser_rename = subparser.add_parser("rename", help="rename a secret")
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import subprocess
|
|||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from typing import IO, Iterator, Union
|
from typing import IO, Iterator
|
||||||
|
|
||||||
from ..dirs import user_config_dir
|
from ..dirs import user_config_dir
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
@@ -115,33 +115,46 @@ def update_keys(secret_path: Path, keys: list[str]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def encrypt_file(
|
def encrypt_file(
|
||||||
secret_path: Path, content: Union[IO[str], str], keys: list[str]
|
secret_path: Path, content: IO[str] | str | None, keys: list[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
folder = secret_path.parent
|
folder = secret_path.parent
|
||||||
folder.mkdir(parents=True, exist_ok=True)
|
folder.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
with sops_manifest(keys) as manifest:
|
||||||
with sops_manifest(keys) as manifest, NamedTemporaryFile(delete=False) as f:
|
if not content:
|
||||||
try:
|
|
||||||
with open(f.name, "w") as fd:
|
|
||||||
if isinstance(content, str):
|
|
||||||
fd.write(content)
|
|
||||||
else:
|
|
||||||
shutil.copyfileobj(content, fd)
|
|
||||||
# we pass an empty manifest to pick up existing configuration of the user
|
|
||||||
args = ["sops", "--config", str(manifest)]
|
args = ["sops", "--config", str(manifest)]
|
||||||
args.extend(["-i", "--encrypt", str(f.name)])
|
args.extend([str(secret_path)])
|
||||||
cmd = nix_shell(["sops"], args)
|
cmd = nix_shell(["sops"], args)
|
||||||
subprocess.run(cmd, check=True)
|
p = subprocess.run(cmd)
|
||||||
# atomic copy of the encrypted file
|
# returns 200 if the file is changed
|
||||||
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
if p.returncode != 0 and p.returncode != 200:
|
||||||
shutil.copyfile(f.name, f2.name)
|
raise ClanError(
|
||||||
os.rename(f2.name, secret_path)
|
f"Failed to encrypt {secret_path}: sops exited with {p.returncode}"
|
||||||
finally:
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
||||||
|
with NamedTemporaryFile(delete=False) as f:
|
||||||
try:
|
try:
|
||||||
os.remove(f.name)
|
with open(f.name, "w") as fd:
|
||||||
except OSError:
|
if isinstance(content, str):
|
||||||
pass
|
fd.write(content)
|
||||||
|
else:
|
||||||
|
shutil.copyfileobj(content, fd)
|
||||||
|
# we pass an empty manifest to pick up existing configuration of the user
|
||||||
|
args = ["sops", "--config", str(manifest)]
|
||||||
|
args.extend(["-i", "--encrypt", str(f.name)])
|
||||||
|
cmd = nix_shell(["sops"], args)
|
||||||
|
subprocess.run(cmd, check=True)
|
||||||
|
# atomic copy of the encrypted file
|
||||||
|
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
||||||
|
shutil.copyfile(f.name, f2.name)
|
||||||
|
os.rename(f2.name, secret_path)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
os.remove(f.name)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def decrypt_file(secret_path: Path) -> str:
|
def decrypt_file(secret_path: Path) -> str:
|
||||||
|
|||||||
@@ -145,6 +145,10 @@ def test_secrets(
|
|||||||
assert len(users) == 1, f"users: {users}"
|
assert len(users) == 1, f"users: {users}"
|
||||||
owner = users[0]
|
owner = users[0]
|
||||||
|
|
||||||
|
monkeypatch.setenv("EDITOR", "cat")
|
||||||
|
cli.run(["secrets", "set", "--edit", "initialkey"])
|
||||||
|
monkeypatch.delenv("EDITOR")
|
||||||
|
|
||||||
cli.run(["secrets", "rename", "initialkey", "key"])
|
cli.run(["secrets", "rename", "initialkey", "key"])
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
|
|||||||
Reference in New Issue
Block a user