test list/remove secret
This commit is contained in:
@@ -1,45 +1,6 @@
|
|||||||
# !/usr/bin/env python3
|
# !/usr/bin/env python3
|
||||||
import argparse
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from . import admin, secrets, ssh
|
|
||||||
from .errors import ClanError
|
|
||||||
|
|
||||||
has_argcomplete = True
|
|
||||||
try:
|
|
||||||
import argcomplete
|
|
||||||
except ImportError:
|
|
||||||
has_argcomplete = False
|
|
||||||
|
|
||||||
|
|
||||||
# this will be the entrypoint under /bin/clan (see pyproject.toml config)
|
|
||||||
def main() -> None:
|
|
||||||
parser = argparse.ArgumentParser(description="cLAN tool")
|
|
||||||
subparsers = parser.add_subparsers()
|
|
||||||
|
|
||||||
parser_admin = subparsers.add_parser("admin")
|
|
||||||
admin.register_parser(parser_admin)
|
|
||||||
|
|
||||||
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
|
|
||||||
ssh.register_parser(parser_ssh)
|
|
||||||
|
|
||||||
parser_secrets = subparsers.add_parser("secrets", help="manage secrets")
|
|
||||||
secrets.register_parser(parser_secrets)
|
|
||||||
|
|
||||||
if has_argcomplete:
|
|
||||||
argcomplete.autocomplete(parser)
|
|
||||||
|
|
||||||
if len(sys.argv) == 1:
|
|
||||||
parser.print_help()
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
if hasattr(args, "func"):
|
|
||||||
try:
|
|
||||||
args.func(args)
|
|
||||||
except ClanError as e:
|
|
||||||
print(f"{sys.argv[0]}: {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
from .cli import main
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
44
pkgs/clan-cli/clan_cli/cli.py
Normal file
44
pkgs/clan-cli/clan_cli/cli.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from . import admin, secrets, ssh
|
||||||
|
from .errors import ClanError
|
||||||
|
|
||||||
|
has_argcomplete = True
|
||||||
|
try:
|
||||||
|
import argcomplete
|
||||||
|
except ImportError:
|
||||||
|
has_argcomplete = False
|
||||||
|
|
||||||
|
|
||||||
|
# this will be the entrypoint under /bin/clan (see pyproject.toml config)
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(description="cLAN tool")
|
||||||
|
subparsers = parser.add_subparsers()
|
||||||
|
|
||||||
|
parser_admin = subparsers.add_parser("admin")
|
||||||
|
admin.register_parser(parser_admin)
|
||||||
|
|
||||||
|
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
|
||||||
|
ssh.register_parser(parser_ssh)
|
||||||
|
|
||||||
|
parser_secrets = subparsers.add_parser("secrets", help="manage secrets")
|
||||||
|
secrets.register_parser(parser_secrets)
|
||||||
|
|
||||||
|
if has_argcomplete:
|
||||||
|
argcomplete.autocomplete(parser)
|
||||||
|
|
||||||
|
if len(sys.argv) == 1:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
if hasattr(args, "func"):
|
||||||
|
try:
|
||||||
|
args.func(args)
|
||||||
|
except ClanError as e:
|
||||||
|
print(f"{sys.argv[0]}: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import getpass
|
import getpass
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
@@ -43,16 +44,15 @@ def encrypt_secret(key: SopsKey, secret: Path, value: IO[str]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def set_command(args: argparse.Namespace) -> None:
|
def set_command(args: argparse.Namespace) -> None:
|
||||||
secret: str = args.secret
|
|
||||||
key = ensure_sops_key()
|
key = ensure_sops_key()
|
||||||
secret_value = os.environ.get("SOPS_NIX_SECRET")
|
secret_value = os.environ.get("SOPS_NIX_SECRET")
|
||||||
if secret_value:
|
if secret_value:
|
||||||
encrypt_secret(key, sops_secrets_folder() / secret, StringIO(secret_value))
|
encrypt_secret(key, sops_secrets_folder() / args.secret, StringIO(secret_value))
|
||||||
elif tty.is_interactive():
|
elif tty.is_interactive():
|
||||||
secret = getpass.getpass(prompt="Paste your secret: ")
|
secret = getpass.getpass(prompt="Paste your secret: ")
|
||||||
encrypt_secret(key, sops_secrets_folder() / secret, StringIO(secret))
|
encrypt_secret(key, sops_secrets_folder() / args.secret, StringIO(secret))
|
||||||
else:
|
else:
|
||||||
encrypt_secret(key, sops_secrets_folder() / secret, sys.stdin)
|
encrypt_secret(key, sops_secrets_folder() / args.secret, sys.stdin)
|
||||||
|
|
||||||
|
|
||||||
def remove_command(args: argparse.Namespace) -> None:
|
def remove_command(args: argparse.Namespace) -> None:
|
||||||
@@ -60,7 +60,7 @@ def remove_command(args: argparse.Namespace) -> None:
|
|||||||
path = sops_secrets_folder() / secret
|
path = sops_secrets_folder() / secret
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
raise ClanError(f"Secret '{secret}' does not exist")
|
raise ClanError(f"Secret '{secret}' does not exist")
|
||||||
path.unlink()
|
shutil.rmtree(path)
|
||||||
|
|
||||||
|
|
||||||
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ def get_user_name(user: str) -> str:
|
|||||||
"""Ask the user for their name until a unique one is provided."""
|
"""Ask the user for their name until a unique one is provided."""
|
||||||
while True:
|
while True:
|
||||||
name = input(
|
name = input(
|
||||||
f"Enter your user name for which the key will be stored as [{user}]: "
|
f"Enter your user name for which your sops key will be stored in the repository [default: {user}]: "
|
||||||
)
|
)
|
||||||
if name:
|
if name:
|
||||||
user = name
|
user = name
|
||||||
|
|||||||
@@ -114,7 +114,17 @@ def test_secrets(
|
|||||||
with mock_env(
|
with mock_env(
|
||||||
SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key")
|
SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key")
|
||||||
):
|
):
|
||||||
cli.run(["set", "nonexisting"])
|
cli.run(["set", "key"])
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["get", "nonexisting"])
|
cli.run(["get", "key"])
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
|
|
||||||
|
capsys.readouterr() # empty the buffer
|
||||||
|
cli.run(["list"])
|
||||||
|
assert capsys.readouterr().out == "key\n"
|
||||||
|
|
||||||
|
cli.run(["remove", "key"])
|
||||||
|
|
||||||
|
capsys.readouterr() # empty the buffer
|
||||||
|
cli.run(["list"])
|
||||||
|
assert capsys.readouterr().out == ""
|
||||||
|
|||||||
Reference in New Issue
Block a user