enable comprehensions linting rules
This commit is contained in:
@@ -158,7 +158,7 @@ API.register(open_file)
|
|||||||
"$comment": "An object containing API methods. ",
|
"$comment": "An object containing API methods. ",
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"additionalProperties": False,
|
"additionalProperties": False,
|
||||||
"required": [func_name for func_name in self._registry.keys()],
|
"required": list(self._registry.keys()),
|
||||||
"properties": {},
|
"properties": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ def get_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
# the raw_facts are bytestrings making them not json serializable
|
# the raw_facts are bytestrings making them not json serializable
|
||||||
raw_facts = get_all_facts(machine)
|
raw_facts = get_all_facts(machine)
|
||||||
facts = dict()
|
facts = {}
|
||||||
for key in raw_facts["TODO"]:
|
for key in raw_facts["TODO"]:
|
||||||
facts[key] = raw_facts["TODO"][key].decode("utf8")
|
facts[key] = raw_facts["TODO"][key].decode("utf8")
|
||||||
|
|
||||||
|
|||||||
@@ -123,7 +123,7 @@ def flash_machine(
|
|||||||
|
|
||||||
if system_config.ssh_keys_path:
|
if system_config.ssh_keys_path:
|
||||||
root_keys = []
|
root_keys = []
|
||||||
for key_path in map(lambda x: Path(x), system_config.ssh_keys_path):
|
for key_path in (Path(x) for x in system_config.ssh_keys_path):
|
||||||
try:
|
try:
|
||||||
root_keys.append(key_path.read_text())
|
root_keys.append(key_path.read_text())
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ T = TypeVar("T")
|
|||||||
|
|
||||||
class MachineGroup:
|
class MachineGroup:
|
||||||
def __init__(self, machines: list[Machine]) -> None:
|
def __init__(self, machines: list[Machine]) -> None:
|
||||||
self.group = HostGroup(list(m.target_host for m in machines))
|
self.group = HostGroup([m.target_host for m in machines])
|
||||||
|
|
||||||
def run_function(
|
def run_function(
|
||||||
self, func: Callable[[Machine], T], check: bool = True
|
self, func: Callable[[Machine], T], check: bool = True
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ class Machine:
|
|||||||
config = nix_config()
|
config = nix_config()
|
||||||
system = config["system"]
|
system = config["system"]
|
||||||
|
|
||||||
file_info = dict()
|
file_info = {}
|
||||||
with NamedTemporaryFile(mode="w") as config_json:
|
with NamedTemporaryFile(mode="w") as config_json:
|
||||||
if extra_config is not None:
|
if extra_config is not None:
|
||||||
json.dump(extra_config, config_json, indent=2)
|
json.dump(extra_config, config_json, indent=2)
|
||||||
|
|||||||
@@ -103,7 +103,7 @@ def update_group_keys(flake_dir: Path, group: str) -> list[Path]:
|
|||||||
if (secret / "groups" / group).is_symlink():
|
if (secret / "groups" / group).is_symlink():
|
||||||
updated_paths += update_keys(
|
updated_paths += update_keys(
|
||||||
secret,
|
secret,
|
||||||
list(sorted(secrets.collect_keys_for_path(secret))),
|
sorted(secrets.collect_keys_for_path(secret)),
|
||||||
)
|
)
|
||||||
return updated_paths
|
return updated_paths
|
||||||
|
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ def update_secrets(
|
|||||||
changed_files.extend(
|
changed_files.extend(
|
||||||
update_keys(
|
update_keys(
|
||||||
secret_path,
|
secret_path,
|
||||||
list(sorted(collect_keys_for_path(secret_path))),
|
sorted(collect_keys_for_path(secret_path)),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return changed_files
|
return changed_files
|
||||||
@@ -69,7 +69,7 @@ def collect_keys_for_type(folder: Path) -> set[str]:
|
|||||||
|
|
||||||
|
|
||||||
def collect_keys_for_path(path: Path) -> set[str]:
|
def collect_keys_for_path(path: Path) -> set[str]:
|
||||||
keys = set([])
|
keys = set()
|
||||||
keys.update(collect_keys_for_type(path / "machines"))
|
keys.update(collect_keys_for_type(path / "machines"))
|
||||||
keys.update(collect_keys_for_type(path / "users"))
|
keys.update(collect_keys_for_type(path / "users"))
|
||||||
groups = path / "groups"
|
groups = path / "groups"
|
||||||
@@ -99,7 +99,7 @@ def encrypt_secret(
|
|||||||
if add_users is None:
|
if add_users is None:
|
||||||
add_users = []
|
add_users = []
|
||||||
key = ensure_sops_key(flake_dir)
|
key = ensure_sops_key(flake_dir)
|
||||||
recipient_keys = set([])
|
recipient_keys = set()
|
||||||
|
|
||||||
files_to_commit = []
|
files_to_commit = []
|
||||||
for user in add_users:
|
for user in add_users:
|
||||||
@@ -146,7 +146,7 @@ def encrypt_secret(
|
|||||||
)
|
)
|
||||||
|
|
||||||
secret_path = secret_path / "secret"
|
secret_path = secret_path / "secret"
|
||||||
encrypt_file(secret_path, value, list(sorted(recipient_keys)), meta)
|
encrypt_file(secret_path, value, sorted(recipient_keys), meta)
|
||||||
files_to_commit.append(secret_path)
|
files_to_commit.append(secret_path)
|
||||||
commit_files(
|
commit_files(
|
||||||
files_to_commit,
|
files_to_commit,
|
||||||
@@ -226,7 +226,7 @@ def allow_member(
|
|||||||
changed.extend(
|
changed.extend(
|
||||||
update_keys(
|
update_keys(
|
||||||
group_folder.parent,
|
group_folder.parent,
|
||||||
list(sorted(collect_keys_for_path(group_folder.parent))),
|
sorted(collect_keys_for_path(group_folder.parent)),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return changed
|
return changed
|
||||||
@@ -254,7 +254,7 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
|
|||||||
os.rmdir(group_folder.parent)
|
os.rmdir(group_folder.parent)
|
||||||
|
|
||||||
return update_keys(
|
return update_keys(
|
||||||
target.parent.parent, list(sorted(collect_keys_for_path(group_folder.parent)))
|
target.parent.parent, sorted(collect_keys_for_path(group_folder.parent))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -116,7 +116,7 @@ def ensure_sops_key(flake_dir: Path) -> SopsKey:
|
|||||||
def sops_manifest(keys: list[str]) -> Iterator[Path]:
|
def sops_manifest(keys: list[str]) -> Iterator[Path]:
|
||||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||||
json.dump(
|
json.dump(
|
||||||
dict(creation_rules=[dict(key_groups=[dict(age=keys)])]), manifest, indent=2
|
{"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2
|
||||||
)
|
)
|
||||||
manifest.flush()
|
manifest.flush()
|
||||||
yield Path(manifest.name)
|
yield Path(manifest.name)
|
||||||
|
|||||||
@@ -222,12 +222,12 @@ class Host:
|
|||||||
for line in lines:
|
for line in lines:
|
||||||
if not is_err:
|
if not is_err:
|
||||||
cmdlog.info(
|
cmdlog.info(
|
||||||
line, extra=dict(command_prefix=self.command_prefix)
|
line, extra={"command_prefix": self.command_prefix}
|
||||||
)
|
)
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
cmdlog.error(
|
cmdlog.error(
|
||||||
line, extra=dict(command_prefix=self.command_prefix)
|
line, extra={"command_prefix": self.command_prefix}
|
||||||
)
|
)
|
||||||
print_buf = ""
|
print_buf = ""
|
||||||
last_output = time.time()
|
last_output = time.time()
|
||||||
@@ -248,7 +248,7 @@ class Host:
|
|||||||
elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed))
|
elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed))
|
||||||
cmdlog.warn(
|
cmdlog.warn(
|
||||||
f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)",
|
f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)",
|
||||||
extra=dict(command_prefix=self.command_prefix),
|
extra={"command_prefix": self.command_prefix},
|
||||||
)
|
)
|
||||||
|
|
||||||
def handle_fd(fd: IO[Any] | None, readlist: list[IO[Any]]) -> str:
|
def handle_fd(fd: IO[Any] | None, readlist: list[IO[Any]]) -> str:
|
||||||
@@ -350,7 +350,7 @@ class Host:
|
|||||||
else:
|
else:
|
||||||
cmdlog.warning(
|
cmdlog.warning(
|
||||||
f"[Command failed: {ret}] {displayed_cmd}",
|
f"[Command failed: {ret}] {displayed_cmd}",
|
||||||
extra=dict(command_prefix=self.command_prefix),
|
extra={"command_prefix": self.command_prefix},
|
||||||
)
|
)
|
||||||
return subprocess.CompletedProcess(
|
return subprocess.CompletedProcess(
|
||||||
cmd, ret, stdout=stdout_data, stderr=stderr_data
|
cmd, ret, stdout=stdout_data, stderr=stderr_data
|
||||||
@@ -386,9 +386,7 @@ class Host:
|
|||||||
cmd = [cmd]
|
cmd = [cmd]
|
||||||
shell = True
|
shell = True
|
||||||
displayed_cmd = " ".join(cmd)
|
displayed_cmd = " ".join(cmd)
|
||||||
cmdlog.info(
|
cmdlog.info(f"$ {displayed_cmd}", extra={"command_prefix": self.command_prefix})
|
||||||
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
|
|
||||||
)
|
|
||||||
return self._run(
|
return self._run(
|
||||||
cmd,
|
cmd,
|
||||||
displayed_cmd,
|
displayed_cmd,
|
||||||
@@ -446,9 +444,7 @@ class Host:
|
|||||||
displayed_cmd += " ".join(cmd)
|
displayed_cmd += " ".join(cmd)
|
||||||
else:
|
else:
|
||||||
displayed_cmd += cmd
|
displayed_cmd += cmd
|
||||||
cmdlog.info(
|
cmdlog.info(f"$ {displayed_cmd}", extra={"command_prefix": self.command_prefix})
|
||||||
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
|
|
||||||
)
|
|
||||||
|
|
||||||
bash_cmd = export_cmd
|
bash_cmd = export_cmd
|
||||||
bash_args = []
|
bash_args = []
|
||||||
@@ -624,7 +620,7 @@ class HostGroup:
|
|||||||
if e:
|
if e:
|
||||||
cmdlog.error(
|
cmdlog.error(
|
||||||
f"failed with: {e}",
|
f"failed with: {e}",
|
||||||
extra=dict(command_prefix=result.host.command_prefix),
|
extra={"command_prefix": result.host.command_prefix},
|
||||||
)
|
)
|
||||||
errors += 1
|
errors += 1
|
||||||
if errors > 0:
|
if errors > 0:
|
||||||
@@ -653,19 +649,19 @@ class HostGroup:
|
|||||||
fn = self._run_local if local else self._run_remote
|
fn = self._run_local if local else self._run_remote
|
||||||
thread = Thread(
|
thread = Thread(
|
||||||
target=fn,
|
target=fn,
|
||||||
kwargs=dict(
|
kwargs={
|
||||||
results=results,
|
"results": results,
|
||||||
cmd=cmd,
|
"cmd": cmd,
|
||||||
host=host,
|
"host": host,
|
||||||
stdout=stdout,
|
"stdout": stdout,
|
||||||
stderr=stderr,
|
"stderr": stderr,
|
||||||
extra_env=extra_env,
|
"extra_env": extra_env,
|
||||||
cwd=cwd,
|
"cwd": cwd,
|
||||||
check=check,
|
"check": check,
|
||||||
timeout=timeout,
|
"timeout": timeout,
|
||||||
verbose_ssh=verbose_ssh,
|
"verbose_ssh": verbose_ssh,
|
||||||
tty=tty,
|
"tty": tty,
|
||||||
),
|
},
|
||||||
)
|
)
|
||||||
thread.start()
|
thread.start()
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
|
|||||||
@@ -67,9 +67,9 @@ class SecretStore(SecretStoreBase):
|
|||||||
value,
|
value,
|
||||||
add_machines=[self.machine.name],
|
add_machines=[self.machine.name],
|
||||||
add_groups=groups,
|
add_groups=groups,
|
||||||
meta=dict(
|
meta={
|
||||||
deploy=deployed,
|
"deploy": deployed,
|
||||||
),
|
},
|
||||||
)
|
)
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ def run_command(
|
|||||||
|
|
||||||
vm: VmConfig = inspect_vm(machine=machine_obj)
|
vm: VmConfig = inspect_vm(machine=machine_obj)
|
||||||
|
|
||||||
portmap = [(h, g) for h, g in (p.split(":") for p in args.publish)]
|
portmap = [p.split(":") for p in args.publish]
|
||||||
|
|
||||||
run_vm(vm, nix_options=args.option, portmap=portmap)
|
run_vm(vm, nix_options=args.option, portmap=portmap)
|
||||||
|
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ lint.select = [
|
|||||||
"A",
|
"A",
|
||||||
"ANN",
|
"ANN",
|
||||||
"B",
|
"B",
|
||||||
|
"C4",
|
||||||
"E",
|
"E",
|
||||||
"F",
|
"F",
|
||||||
"I",
|
"I",
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
|
|||||||
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
||||||
host_key.chmod(0o600)
|
host_key.chmod(0o600)
|
||||||
template = (test_root / "data" / "sshd_config").read_text()
|
template = (test_root / "data" / "sshd_config").read_text()
|
||||||
content = string.Template(template).substitute(dict(host_key=host_key))
|
content = string.Template(template).substitute({"host_key": host_key})
|
||||||
config = tmpdir / "sshd_config"
|
config = tmpdir / "sshd_config"
|
||||||
config.write_text(content)
|
config.write_text(content)
|
||||||
login_shell = tmpdir / "shell"
|
login_shell = tmpdir / "shell"
|
||||||
@@ -100,10 +100,10 @@ def sshd(
|
|||||||
sshd = shutil.which("sshd")
|
sshd = shutil.which("sshd")
|
||||||
assert sshd is not None, "no sshd binary found"
|
assert sshd is not None, "no sshd binary found"
|
||||||
env = {}
|
env = {}
|
||||||
env = dict(
|
env = {
|
||||||
LD_PRELOAD=str(sshd_config.preload_lib),
|
"LD_PRELOAD": str(sshd_config.preload_lib),
|
||||||
LOGIN_SHELL=str(sshd_config.login_shell),
|
"LOGIN_SHELL": str(sshd_config.login_shell),
|
||||||
)
|
}
|
||||||
proc = command.run(
|
proc = command.run(
|
||||||
[sshd, "-f", str(sshd_config.path), "-D", "-p", str(port)], extra_env=env
|
[sshd, "-f", str(sshd_config.path), "-D", "-p", str(port)], extra_env=env
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -10,21 +10,21 @@ example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
|
|||||||
|
|
||||||
|
|
||||||
def test_walk_jsonschema_all_types() -> None:
|
def test_walk_jsonschema_all_types() -> None:
|
||||||
schema = dict(
|
schema = {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
array=dict(
|
"array": {
|
||||||
type="array",
|
"type": "array",
|
||||||
items=dict(
|
"items": {
|
||||||
type="string",
|
"type": "string",
|
||||||
),
|
},
|
||||||
),
|
},
|
||||||
boolean=dict(type="boolean"),
|
"boolean": {"type": "boolean"},
|
||||||
integer=dict(type="integer"),
|
"integer": {"type": "integer"},
|
||||||
number=dict(type="number"),
|
"number": {"type": "number"},
|
||||||
string=dict(type="string"),
|
"string": {"type": "string"},
|
||||||
),
|
},
|
||||||
)
|
}
|
||||||
expected = {
|
expected = {
|
||||||
"array": list[str],
|
"array": list[str],
|
||||||
"boolean": bool,
|
"boolean": bool,
|
||||||
@@ -36,19 +36,19 @@ def test_walk_jsonschema_all_types() -> None:
|
|||||||
|
|
||||||
|
|
||||||
def test_walk_jsonschema_nested() -> None:
|
def test_walk_jsonschema_nested() -> None:
|
||||||
schema = dict(
|
schema = {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
name=dict(
|
"name": {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
first=dict(type="string"),
|
"first": {"type": "string"},
|
||||||
last=dict(type="string"),
|
"last": {"type": "string"},
|
||||||
),
|
},
|
||||||
),
|
},
|
||||||
age=dict(type="integer"),
|
"age": {"type": "integer"},
|
||||||
),
|
},
|
||||||
)
|
}
|
||||||
expected = {
|
expected = {
|
||||||
"age": int,
|
"age": int,
|
||||||
"name.first": str,
|
"name.first": str,
|
||||||
@@ -59,16 +59,16 @@ def test_walk_jsonschema_nested() -> None:
|
|||||||
|
|
||||||
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
|
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
|
||||||
def test_walk_jsonschema_dynamic_attrs() -> None:
|
def test_walk_jsonschema_dynamic_attrs() -> None:
|
||||||
schema = dict(
|
schema = {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
age=dict(type="integer"),
|
"age": {"type": "integer"},
|
||||||
users=dict(
|
"users": {
|
||||||
type="object",
|
"type": "object",
|
||||||
additionalProperties=dict(type="string"),
|
"additionalProperties": {"type": "string"},
|
||||||
),
|
},
|
||||||
),
|
},
|
||||||
)
|
}
|
||||||
expected = {
|
expected = {
|
||||||
"age": int,
|
"age": int,
|
||||||
"users.<name>": str, # <name> is a placeholder for any string
|
"users.<name>": str, # <name> is a placeholder for any string
|
||||||
@@ -77,41 +77,41 @@ def test_walk_jsonschema_dynamic_attrs() -> None:
|
|||||||
|
|
||||||
|
|
||||||
def test_type_from_schema_path_simple() -> None:
|
def test_type_from_schema_path_simple() -> None:
|
||||||
schema = dict(
|
schema = {
|
||||||
type="boolean",
|
"type": "boolean",
|
||||||
)
|
}
|
||||||
assert parsing.type_from_schema_path(schema, []) is bool
|
assert parsing.type_from_schema_path(schema, []) is bool
|
||||||
|
|
||||||
|
|
||||||
def test_type_from_schema_path_nested() -> None:
|
def test_type_from_schema_path_nested() -> None:
|
||||||
schema = dict(
|
schema = {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
name=dict(
|
"name": {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
first=dict(type="string"),
|
"first": {"type": "string"},
|
||||||
last=dict(type="string"),
|
"last": {"type": "string"},
|
||||||
),
|
},
|
||||||
),
|
},
|
||||||
age=dict(type="integer"),
|
"age": {"type": "integer"},
|
||||||
),
|
},
|
||||||
)
|
}
|
||||||
assert parsing.type_from_schema_path(schema, ["age"]) is int
|
assert parsing.type_from_schema_path(schema, ["age"]) is int
|
||||||
assert parsing.type_from_schema_path(schema, ["name", "first"]) is str
|
assert parsing.type_from_schema_path(schema, ["name", "first"]) is str
|
||||||
|
|
||||||
|
|
||||||
def test_type_from_schema_path_dynamic_attrs() -> None:
|
def test_type_from_schema_path_dynamic_attrs() -> None:
|
||||||
schema = dict(
|
schema = {
|
||||||
type="object",
|
"type": "object",
|
||||||
properties=dict(
|
"properties": {
|
||||||
age=dict(type="integer"),
|
"age": {"type": "integer"},
|
||||||
users=dict(
|
"users": {
|
||||||
type="object",
|
"type": "object",
|
||||||
additionalProperties=dict(type="string"),
|
"additionalProperties": {"type": "string"},
|
||||||
),
|
},
|
||||||
),
|
},
|
||||||
)
|
}
|
||||||
assert parsing.type_from_schema_path(schema, ["age"]) is int
|
assert parsing.type_from_schema_path(schema, ["age"]) is int
|
||||||
assert parsing.type_from_schema_path(schema, ["users", "foo"]) is str
|
assert parsing.type_from_schema_path(schema, ["users", "foo"]) is str
|
||||||
|
|
||||||
|
|||||||
@@ -7,13 +7,11 @@ hosts = HostGroup([Host("some_host")])
|
|||||||
|
|
||||||
def test_run_environment() -> None:
|
def test_run_environment() -> None:
|
||||||
p2 = hosts.run_local(
|
p2 = hosts.run_local(
|
||||||
"echo $env_var", extra_env=dict(env_var="true"), stdout=subprocess.PIPE
|
"echo $env_var", extra_env={"env_var": "true"}, stdout=subprocess.PIPE
|
||||||
)
|
)
|
||||||
assert p2[0].result.stdout == "true\n"
|
assert p2[0].result.stdout == "true\n"
|
||||||
|
|
||||||
p3 = hosts.run_local(
|
p3 = hosts.run_local(["env"], extra_env={"env_var": "true"}, stdout=subprocess.PIPE)
|
||||||
["env"], extra_env=dict(env_var="true"), stdout=subprocess.PIPE
|
|
||||||
)
|
|
||||||
assert "env_var=true" in p3[0].result.stdout
|
assert "env_var=true" in p3[0].result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -19,10 +19,10 @@ def test_run(host_group: HostGroup) -> None:
|
|||||||
|
|
||||||
def test_run_environment(host_group: HostGroup) -> None:
|
def test_run_environment(host_group: HostGroup) -> None:
|
||||||
p1 = host_group.run(
|
p1 = host_group.run(
|
||||||
"echo $env_var", stdout=subprocess.PIPE, extra_env=dict(env_var="true")
|
"echo $env_var", stdout=subprocess.PIPE, extra_env={"env_var": "true"}
|
||||||
)
|
)
|
||||||
assert p1[0].result.stdout == "true\n"
|
assert p1[0].result.stdout == "true\n"
|
||||||
p2 = host_group.run(["env"], stdout=subprocess.PIPE, extra_env=dict(env_var="true"))
|
p2 = host_group.run(["env"], stdout=subprocess.PIPE, extra_env={"env_var": "true"})
|
||||||
assert "env_var=true" in p2[0].result.stdout
|
assert "env_var=true" in p2[0].result.stdout
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -22,12 +22,12 @@ from clan_cli.vars.secret_modules import password_store, sops
|
|||||||
def test_get_subgraph() -> None:
|
def test_get_subgraph() -> None:
|
||||||
from clan_cli.vars.generate import _get_subgraph
|
from clan_cli.vars.generate import _get_subgraph
|
||||||
|
|
||||||
graph = dict(
|
graph = {
|
||||||
a={"b", "c"},
|
"a": {"b", "c"},
|
||||||
b={"c"},
|
"b": {"c"},
|
||||||
c=set(),
|
"c": set(),
|
||||||
d=set(),
|
"d": set(),
|
||||||
)
|
}
|
||||||
assert _get_subgraph(graph, "a") == {
|
assert _get_subgraph(graph, "a") == {
|
||||||
"a": {"b", "c"},
|
"a": {"b", "c"},
|
||||||
"b": {"c"},
|
"b": {"c"},
|
||||||
@@ -39,16 +39,16 @@ def test_get_subgraph() -> None:
|
|||||||
def test_dependencies_as_files() -> None:
|
def test_dependencies_as_files() -> None:
|
||||||
from clan_cli.vars.generate import dependencies_as_dir
|
from clan_cli.vars.generate import dependencies_as_dir
|
||||||
|
|
||||||
decrypted_dependencies = dict(
|
decrypted_dependencies = {
|
||||||
gen_1=dict(
|
"gen_1": {
|
||||||
var_1a=b"var_1a",
|
"var_1a": b"var_1a",
|
||||||
var_1b=b"var_1b",
|
"var_1b": b"var_1b",
|
||||||
),
|
},
|
||||||
gen_2=dict(
|
"gen_2": {
|
||||||
var_2a=b"var_2a",
|
"var_2a": b"var_2a",
|
||||||
var_2b=b"var_2b",
|
"var_2b": b"var_2b",
|
||||||
),
|
},
|
||||||
)
|
}
|
||||||
with TemporaryDirectory() as tmpdir:
|
with TemporaryDirectory() as tmpdir:
|
||||||
dep_tmpdir = Path(tmpdir)
|
dep_tmpdir = Path(tmpdir)
|
||||||
dependencies_as_dir(decrypted_dependencies, dep_tmpdir)
|
dependencies_as_dir(decrypted_dependencies, dep_tmpdir)
|
||||||
@@ -76,7 +76,7 @@ def test_generate_public_var(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
machine = Machine(name="my_machine", flake=FlakeId(str(flake.path)))
|
machine = Machine(name="my_machine", flake=FlakeId(str(flake.path)))
|
||||||
@@ -105,7 +105,7 @@ def test_generate_secret_var_sops(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
sops_setup.init()
|
sops_setup.init()
|
||||||
@@ -140,7 +140,7 @@ def test_generate_secret_var_sops_with_default_group(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
sops_setup.init()
|
sops_setup.init()
|
||||||
@@ -170,7 +170,7 @@ def test_generate_secret_var_password_store(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
gnupghome = temporary_home / "gpg"
|
gnupghome = temporary_home / "gpg"
|
||||||
@@ -237,7 +237,7 @@ def test_generate_secret_for_multiple_machines(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(machine1=machine1_config, machine2=machine2_config),
|
machine_configs={"machine1": machine1_config, "machine2": machine2_config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
sops_setup.init()
|
sops_setup.init()
|
||||||
@@ -282,7 +282,7 @@ def test_dependant_generators(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
|
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
|
||||||
@@ -320,7 +320,7 @@ def test_prompt(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
monkeypatch.setattr("sys.stdin", StringIO(input_value))
|
monkeypatch.setattr("sys.stdin", StringIO(input_value))
|
||||||
@@ -358,7 +358,7 @@ def test_share_flag(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
sops_setup.init()
|
sops_setup.init()
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ def test_vm_deployment(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(my_machine=config),
|
machine_configs={"my_machine": config},
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
sops_setup.init()
|
sops_setup.init()
|
||||||
@@ -57,7 +57,7 @@ def test_vm_deployment(
|
|||||||
)
|
)
|
||||||
).stdout.strip()
|
).stdout.strip()
|
||||||
)
|
)
|
||||||
assert sops_secrets != dict()
|
assert sops_secrets != {}
|
||||||
my_secret_path = run(
|
my_secret_path = run(
|
||||||
nix_eval(
|
nix_eval(
|
||||||
[
|
[
|
||||||
|
|||||||
@@ -65,15 +65,15 @@ def test_vm_qmp(
|
|||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "minimal",
|
flake_template=CLAN_CORE / "templates" / "minimal",
|
||||||
machine_configs=dict(
|
machine_configs={
|
||||||
my_machine=dict(
|
"my_machine": {
|
||||||
clan=dict(
|
"clan": {
|
||||||
virtualisation=dict(graphics=False),
|
"virtualisation": {"graphics": False},
|
||||||
networking=dict(targetHost="client"),
|
"networking": {"targetHost": "client"},
|
||||||
),
|
},
|
||||||
services=dict(getty=dict(autologinUser="root")),
|
"services": {"getty": {"autologinUser": "root"}},
|
||||||
)
|
}
|
||||||
),
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# 'clan vms run' must be executed from within the flake
|
# 'clan vms run' must be executed from within the flake
|
||||||
|
|||||||
Reference in New Issue
Block a user