secrets: do not shadow python builtins

This commit is contained in:
Jörg Thalheim
2024-10-01 12:00:27 +02:00
committed by Mic92
parent d18c0fa3ae
commit 4cd4334b1c
5 changed files with 24 additions and 20 deletions

View File

@@ -54,8 +54,8 @@ def generate_command(args: argparse.Namespace) -> None:
def show_command(args: argparse.Namespace) -> None:
key, type = sops.maybe_get_public_key()
type_or_null = f'"{type.name.lower()}"' if type else "null"
key, key_type = sops.maybe_get_admin_public_key()
type_or_null = f'"{key_type.name.lower()}"' if key_type else "null"
print(f'{{"key": "{key}", "type": {type_or_null}}}')

View File

@@ -48,7 +48,7 @@ def remove_machine(flake_dir: Path, name: str) -> None:
def get_machine(flake_dir: Path, name: str) -> str:
key, type = read_key(sops_machines_folder(flake_dir) / name)
key, _ = read_key(sops_machines_folder(flake_dir) / name)
return key

View File

@@ -135,8 +135,8 @@ def encrypt_secret(
recipient_keys = collect_keys_for_path(secret_path)
if (key.pubkey, key.type) not in recipient_keys:
recipient_keys.add((key.pubkey, key.type))
if (key.pubkey, key.key_type) not in recipient_keys:
recipient_keys.add((key.pubkey, key.key_type))
files_to_commit.extend(
allow_member(
users_folder(secret_path),

View File

@@ -37,7 +37,7 @@ class KeyType(enum.Enum):
class SopsKey:
pubkey: str
username: str
type: KeyType
key_type: KeyType
def get_public_age_key(privkey: str) -> str:
@@ -93,8 +93,10 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> SopsKey | None:
key = SopsKey(pub_key, username="", type=type)
def maybe_get_user_or_machine(
flake_dir: Path, pub_key: str, key_type: KeyType
) -> SopsKey | None:
key = SopsKey(pub_key, username="", key_type=key_type)
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
@@ -102,7 +104,7 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> S
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
if read_key(user) == (pub_key, type):
if read_key(user) == (pub_key, key_type):
key.username = user.name
return key
@@ -161,9 +163,11 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey:
@contextmanager
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
all_keys = {type.lower(): [] for type in KeyType.__members__.keys()}
for key, type in keys:
all_keys[type.name.lower()].append(key)
all_keys: dict[str, list[str]] = {
key_type.lower(): [] for key_type in KeyType.__members__
}
for key, key_type in keys:
all_keys[key_type.name.lower()].append(key)
with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
manifest.flush()
@@ -260,7 +264,7 @@ def get_meta(secret_path: Path) -> dict:
return json.load(f)
def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> None:
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
@@ -271,7 +275,7 @@ def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> Non
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
raise ClanError(msg) from e
with os.fdopen(fd, "w") as f:
contents = {"publickey": publickey, "type": type.name.lower()}
contents = {"publickey": publickey, "type": key_type.name.lower()}
json.dump(contents, f, indent=2)
@@ -282,12 +286,12 @@ def read_key(path: Path) -> tuple[str, KeyType]:
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
type = KeyType.validate(key.get("type"))
if type is None:
msg = f"Invalid key type in {path.name}: \"{type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
key_type = KeyType.validate(key.get("type"))
if key_type is None:
msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
raise ClanError(msg)
publickey = key.get("publickey")
if not publickey:
msg = f"{path.name} does not contain a public key"
raise ClanError(msg)
return publickey, type
return publickey, key_type

View File

@@ -111,8 +111,8 @@ def get_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
key, type = get_user(args.flake.path, args.user)
type_or_null = f'"{type.name.lower()}"' if type else "null"
key, key_type = get_user(args.flake.path, args.user)
type_or_null = '"{key_type.name.lower()}"' if key_type else "null"
print(f'{{"key": "{key}", "type": {type_or_null}}}')