secrets: do not shadow python builtins
This commit is contained in:
@@ -54,8 +54,8 @@ def generate_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def show_command(args: argparse.Namespace) -> None:
|
def show_command(args: argparse.Namespace) -> None:
|
||||||
key, type = sops.maybe_get_public_key()
|
key, key_type = sops.maybe_get_admin_public_key()
|
||||||
type_or_null = f'"{type.name.lower()}"' if type else "null"
|
type_or_null = f'"{key_type.name.lower()}"' if key_type else "null"
|
||||||
print(f'{{"key": "{key}", "type": {type_or_null}}}')
|
print(f'{{"key": "{key}", "type": {type_or_null}}}')
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ def remove_machine(flake_dir: Path, name: str) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def get_machine(flake_dir: Path, name: str) -> str:
|
def get_machine(flake_dir: Path, name: str) -> str:
|
||||||
key, type = read_key(sops_machines_folder(flake_dir) / name)
|
key, _ = read_key(sops_machines_folder(flake_dir) / name)
|
||||||
return key
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -135,8 +135,8 @@ def encrypt_secret(
|
|||||||
|
|
||||||
recipient_keys = collect_keys_for_path(secret_path)
|
recipient_keys = collect_keys_for_path(secret_path)
|
||||||
|
|
||||||
if (key.pubkey, key.type) not in recipient_keys:
|
if (key.pubkey, key.key_type) not in recipient_keys:
|
||||||
recipient_keys.add((key.pubkey, key.type))
|
recipient_keys.add((key.pubkey, key.key_type))
|
||||||
files_to_commit.extend(
|
files_to_commit.extend(
|
||||||
allow_member(
|
allow_member(
|
||||||
users_folder(secret_path),
|
users_folder(secret_path),
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ class KeyType(enum.Enum):
|
|||||||
class SopsKey:
|
class SopsKey:
|
||||||
pubkey: str
|
pubkey: str
|
||||||
username: str
|
username: str
|
||||||
type: KeyType
|
key_type: KeyType
|
||||||
|
|
||||||
|
|
||||||
def get_public_age_key(privkey: str) -> str:
|
def get_public_age_key(privkey: str) -> str:
|
||||||
@@ -93,8 +93,10 @@ def get_user_name(flake_dir: Path, user: str) -> str:
|
|||||||
print(f"{flake_dir / user} already exists")
|
print(f"{flake_dir / user} already exists")
|
||||||
|
|
||||||
|
|
||||||
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> SopsKey | None:
|
def maybe_get_user_or_machine(
|
||||||
key = SopsKey(pub_key, username="", type=type)
|
flake_dir: Path, pub_key: str, key_type: KeyType
|
||||||
|
) -> SopsKey | None:
|
||||||
|
key = SopsKey(pub_key, username="", key_type=key_type)
|
||||||
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
|
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
|
||||||
|
|
||||||
for folder in folders:
|
for folder in folders:
|
||||||
@@ -102,7 +104,7 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> S
|
|||||||
for user in folder.iterdir():
|
for user in folder.iterdir():
|
||||||
if not (user / "key.json").exists():
|
if not (user / "key.json").exists():
|
||||||
continue
|
continue
|
||||||
if read_key(user) == (pub_key, type):
|
if read_key(user) == (pub_key, key_type):
|
||||||
key.username = user.name
|
key.username = user.name
|
||||||
return key
|
return key
|
||||||
|
|
||||||
@@ -161,9 +163,11 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
|||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
|
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
|
||||||
all_keys = {type.lower(): [] for type in KeyType.__members__.keys()}
|
all_keys: dict[str, list[str]] = {
|
||||||
for key, type in keys:
|
key_type.lower(): [] for key_type in KeyType.__members__
|
||||||
all_keys[type.name.lower()].append(key)
|
}
|
||||||
|
for key, key_type in keys:
|
||||||
|
all_keys[key_type.name.lower()].append(key)
|
||||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||||
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
|
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
|
||||||
manifest.flush()
|
manifest.flush()
|
||||||
@@ -260,7 +264,7 @@ def get_meta(secret_path: Path) -> dict:
|
|||||||
return json.load(f)
|
return json.load(f)
|
||||||
|
|
||||||
|
|
||||||
def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> None:
|
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
|
||||||
path.mkdir(parents=True, exist_ok=True)
|
path.mkdir(parents=True, exist_ok=True)
|
||||||
try:
|
try:
|
||||||
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
|
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
|
||||||
@@ -271,7 +275,7 @@ def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> Non
|
|||||||
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
|
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
|
||||||
raise ClanError(msg) from e
|
raise ClanError(msg) from e
|
||||||
with os.fdopen(fd, "w") as f:
|
with os.fdopen(fd, "w") as f:
|
||||||
contents = {"publickey": publickey, "type": type.name.lower()}
|
contents = {"publickey": publickey, "type": key_type.name.lower()}
|
||||||
json.dump(contents, f, indent=2)
|
json.dump(contents, f, indent=2)
|
||||||
|
|
||||||
|
|
||||||
@@ -282,12 +286,12 @@ def read_key(path: Path) -> tuple[str, KeyType]:
|
|||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
msg = f"Failed to decode {path.name}: {e}"
|
msg = f"Failed to decode {path.name}: {e}"
|
||||||
raise ClanError(msg) from e
|
raise ClanError(msg) from e
|
||||||
type = KeyType.validate(key.get("type"))
|
key_type = KeyType.validate(key.get("type"))
|
||||||
if type is None:
|
if key_type is None:
|
||||||
msg = f"Invalid key type in {path.name}: \"{type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
|
msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
publickey = key.get("publickey")
|
publickey = key.get("publickey")
|
||||||
if not publickey:
|
if not publickey:
|
||||||
msg = f"{path.name} does not contain a public key"
|
msg = f"{path.name} does not contain a public key"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
return publickey, type
|
return publickey, key_type
|
||||||
|
|||||||
@@ -111,8 +111,8 @@ def get_command(args: argparse.Namespace) -> None:
|
|||||||
if args.flake is None:
|
if args.flake is None:
|
||||||
msg = "Could not find clan flake toplevel directory"
|
msg = "Could not find clan flake toplevel directory"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
key, type = get_user(args.flake.path, args.user)
|
key, key_type = get_user(args.flake.path, args.user)
|
||||||
type_or_null = f'"{type.name.lower()}"' if type else "null"
|
type_or_null = '"{key_type.name.lower()}"' if key_type else "null"
|
||||||
print(f'{{"key": "{key}", "type": {type_or_null}}}')
|
print(f'{{"key": "{key}", "type": {type_or_null}}}')
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user