enable ASYNC, DTZ, YTT and EM lints
This commit is contained in:
@@ -30,9 +30,8 @@ def get_public_key(privkey: str) -> str:
|
||||
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ClanError(
|
||||
"Failed to get public key for age private key. Is the key malformed?"
|
||||
) from e
|
||||
msg = "Failed to get public key for age private key. Is the key malformed?"
|
||||
raise ClanError(msg) from e
|
||||
return res.stdout.strip()
|
||||
|
||||
|
||||
@@ -49,15 +48,18 @@ def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
if not line.startswith("#"):
|
||||
private_key = line
|
||||
if not pubkey:
|
||||
raise ClanError("Could not find public key in age-keygen output")
|
||||
msg = "Could not find public key in age-keygen output"
|
||||
raise ClanError(msg)
|
||||
if not private_key:
|
||||
raise ClanError("Could not find private key in age-keygen output")
|
||||
msg = "Could not find private key in age-keygen output"
|
||||
raise ClanError(msg)
|
||||
if out_file:
|
||||
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_file.write_text(res)
|
||||
return private_key, pubkey
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise ClanError("Failed to generate private sops key") from e
|
||||
msg = "Failed to generate private sops key"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
|
||||
def get_user_name(flake_dir: Path, user: str) -> str:
|
||||
@@ -86,9 +88,8 @@ def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
|
||||
key.username = user.name
|
||||
return key
|
||||
|
||||
raise ClanError(
|
||||
f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
||||
)
|
||||
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def default_sops_key_path() -> Path:
|
||||
@@ -107,9 +108,8 @@ def ensure_sops_key(flake_dir: Path) -> SopsKey:
|
||||
if path.exists():
|
||||
return ensure_user_or_machine(flake_dir, get_public_key(path.read_text()))
|
||||
else:
|
||||
raise ClanError(
|
||||
"No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
)
|
||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -164,9 +164,10 @@ def encrypt_file(
|
||||
p = subprocess.run(cmd, check=False)
|
||||
# returns 200 if the file is changed
|
||||
if p.returncode != 0 and p.returncode != 200:
|
||||
raise ClanError(
|
||||
msg = (
|
||||
f"Failed to encrypt {secret_path}: sops exited with {p.returncode}"
|
||||
)
|
||||
raise ClanError(msg)
|
||||
return
|
||||
|
||||
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
||||
@@ -182,7 +183,8 @@ def encrypt_file(
|
||||
with open(f.name, "w") as fd:
|
||||
shutil.copyfileobj(content, fd)
|
||||
else:
|
||||
raise ClanError(f"Invalid content type: {type(content)}")
|
||||
msg = f"Invalid content type: {type(content)}"
|
||||
raise ClanError(msg)
|
||||
# we pass an empty manifest to pick up existing configuration of the user
|
||||
args = ["sops", "--config", str(manifest)]
|
||||
args.extend(["-i", "--encrypt", str(f.name)])
|
||||
@@ -228,9 +230,8 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None:
|
||||
flags |= os.O_EXCL
|
||||
fd = os.open(path / "key.json", flags)
|
||||
except FileExistsError as e:
|
||||
raise ClanError(
|
||||
f"{path.name} already exists in {path}. Use --force to overwrite."
|
||||
) from e
|
||||
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
|
||||
raise ClanError(msg) from e
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
|
||||
|
||||
@@ -240,12 +241,13 @@ def read_key(path: Path) -> str:
|
||||
try:
|
||||
key = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ClanError(f"Failed to decode {path.name}: {e}") from e
|
||||
msg = f"Failed to decode {path.name}: {e}"
|
||||
raise ClanError(msg) from e
|
||||
if key["type"] != "age":
|
||||
raise ClanError(
|
||||
f"{path.name} is not an age key but {key['type']}. This is not supported"
|
||||
)
|
||||
msg = f"{path.name} is not an age key but {key['type']}. This is not supported"
|
||||
raise ClanError(msg)
|
||||
publickey = key.get("publickey")
|
||||
if not publickey:
|
||||
raise ClanError(f"{path.name} does not contain a public key")
|
||||
msg = f"{path.name} does not contain a public key"
|
||||
raise ClanError(msg)
|
||||
return publickey
|
||||
|
||||
Reference in New Issue
Block a user