vars: support secrets for partitioning the disk

This commit is contained in:
Michael Hoang
2024-12-22 15:46:41 +11:00
committed by clan-bot
parent 45cf6adc13
commit 7ee0e2afbf
13 changed files with 175 additions and 78 deletions

View File

@@ -53,16 +53,20 @@ def install_machine(opts: InstallOptions) -> None:
generate_facts([machine])
generate_vars([machine])
with TemporaryDirectory(prefix="nixos-install-") as tmpdir_:
tmpdir = Path(tmpdir_)
upload_dir_ = machine.secrets_upload_directory
if upload_dir_.startswith("/"):
upload_dir_ = upload_dir_[1:]
upload_dir = tmpdir / upload_dir_
with TemporaryDirectory(prefix="nixos-install-") as base_directory:
activation_secrets = Path(base_directory) / "activation_secrets"
upload_dir = activation_secrets / machine.secrets_upload_directory.lstrip("/")
upload_dir.mkdir(parents=True)
machine.secret_facts_store.upload(upload_dir)
machine.secret_vars_store.populate_dir(upload_dir)
machine.secret_vars_store.populate_dir(
upload_dir, phases=["activation", "users", "services"]
)
partitioning_secrets = Path(base_directory) / "partitioning_secrets"
partitioning_secrets.mkdir(parents=True)
machine.secret_vars_store.populate_dir(
partitioning_secrets, phases=["partitioning"]
)
if opts.password:
os.environ["SSHPASS"] = opts.password
@@ -72,9 +76,22 @@ def install_machine(opts: InstallOptions) -> None:
"--flake",
f"{machine.flake}#{machine.name}",
"--extra-files",
str(tmpdir),
str(activation_secrets),
]
for path in partitioning_secrets.rglob("*"):
if path.is_file():
cmd.extend(
[
"--disk-encryption-keys",
str(
"/run/partitioning-secrets"
/ path.relative_to(partitioning_secrets)
),
str(path),
]
)
if opts.no_reboot:
cmd.append("--no-reboot")

View File

@@ -155,5 +155,9 @@ class StoreBase(ABC):
return stored_hash == target_hash
@abstractmethod
def populate_dir(self, output_dir: Path) -> None:
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
pass
@abstractmethod
def upload(self, phases: list[str]) -> None:
pass

View File

@@ -50,6 +50,10 @@ class FactStore(StoreBase):
def exists(self, generator: Generator, name: str) -> bool:
return (self.directory(generator, name) / "value").exists()
def populate_dir(self, output_dir: Path) -> None:
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
msg = "populate_dir is not implemented for public vars stores"
raise NotImplementedError(msg)
def upload(self, phases: list[str]) -> None:
msg = "upload is not implemented for public vars stores"
raise NotImplementedError(msg)

View File

@@ -48,6 +48,10 @@ class FactStore(StoreBase):
msg = f"Fact {name} for service {generator.name} not found"
raise ClanError(msg)
def populate_dir(self, output_dir: Path) -> None:
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
msg = "populate_dir is not implemented for public vars stores"
raise NotImplementedError(msg)
def upload(self, phases: list[str]) -> None:
msg = "upload is not implemented for public vars stores"
raise NotImplementedError(msg)

View File

@@ -152,52 +152,77 @@ class SecretStore(StoreBase):
return local_hash.decode() != remote_hash
def populate_dir(self, output_dir: Path) -> None:
with (
tarfile.open(output_dir / "secrets.tar.gz", "w:gz") as tar,
tarfile.open(output_dir / "secrets_for_users.tar.gz", "w:gz") as user_tar,
):
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if "users" in phases:
with tarfile.open(
output_dir / "secrets_for_users.tar.gz", "w:gz"
) as user_tar:
for generator in self.machine.vars_generators:
dir_exists = False
for file in generator.files:
if not file.deploy:
continue
if not file.secret:
continue
tar_file = tarfile.TarInfo(name=f"{generator.name}/{file.name}")
content = self.get(generator, file.name)
tar_file.size = len(content)
tar_file.mode = file.mode
user_tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content))
if "services" in phases:
with tarfile.open(output_dir / "secrets.tar.gz", "w:gz") as tar:
for generator in self.machine.vars_generators:
dir_exists = False
for file in generator.files:
if not file.deploy:
continue
if not file.secret:
continue
if not dir_exists:
tar_dir = tarfile.TarInfo(name=generator.name)
tar_dir.type = tarfile.DIRTYPE
tar_dir.mode = 0o511
tar.addfile(tarinfo=tar_dir)
dir_exists = True
tar_file = tarfile.TarInfo(name=f"{generator.name}/{file.name}")
content = self.get(generator, file.name)
tar_file.size = len(content)
tar_file.mode = file.mode
tar_file.uname = file.owner
tar_file.gname = file.group
tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content))
if "activation" in phases:
for generator in self.machine.vars_generators:
dir_exists = False
for file in generator.files:
if file.needed_for == "activation":
(output_dir / generator.name / file.name).parent.mkdir(
parents=True,
exist_ok=True,
out_file = (
output_dir / "activation" / generator.name / file.name
)
(output_dir / generator.name / file.name).write_bytes(
self.get(generator, file.name)
out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_bytes(self.get(generator, file.name))
if "partitioning" in phases:
for generator in self.machine.vars_generators:
for file in generator.files:
if file.needed_for == "partitioning":
out_file = (
output_dir / "partitioning" / generator.name / file.name
)
continue
if not file.deploy:
continue
if not file.secret:
continue
if not dir_exists and file.needed_for == "services":
tar_dir = tarfile.TarInfo(name=generator.name)
tar_dir.type = tarfile.DIRTYPE
tar_dir.mode = 0o511
tar.addfile(tarinfo=tar_dir)
dir_exists = True
tar_file = tarfile.TarInfo(name=f"{generator.name}/{file.name}")
content = self.get(generator, file.name)
tar_file.size = len(content)
tar_file.mode = file.mode
tar_file.uname = file.owner
tar_file.gname = file.group
if file.needed_for == "users":
user_tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content))
else:
tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content))
out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_bytes(self.get(generator, file.name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())
def upload(self) -> None:
def upload(self, phases: list[str]) -> None:
if "partitioning" in phases:
msg = "Cannot upload partitioning secrets"
raise NotImplementedError(msg)
if not self.needs_upload():
log.info("Secrets already uploaded")
return
with TemporaryDirectory(prefix="vars-upload-") as tempdir:
pass_dir = Path(tempdir)
self.populate_dir(pass_dir)
self.populate_dir(pass_dir, phases)
upload_dir = Path(
self.machine.deployment["password-store"]["secretLocation"]
)

View File

@@ -163,34 +163,56 @@ class SecretStore(StoreBase):
self.machine.flake_dir, self.secret_path(generator, name)
).encode("utf-8")
def populate_dir(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(sops_secrets_folder(self.machine.flake_dir) / key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / key_name,
)
(output_dir / "key.txt").touch(mode=0o600)
(output_dir / "key.txt").write_text(key)
for generator in self.machine.vars_generators:
for file in generator.files:
if file.needed_for == "activation":
target_path = output_dir / generator.name / file.name
target_path.parent.mkdir(
parents=True,
exist_ok=True,
)
# chmod after in case it doesn't have u+w
target_path.touch(mode=0o600)
target_path.write_bytes(self.get(generator, file.name))
target_path.chmod(file.mode)
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if "users" in phases or "services" in phases:
key_name = f"{self.machine.name}-age.key"
if not has_secret(sops_secrets_folder(self.machine.flake_dir) / key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / key_name,
)
(output_dir / "key.txt").touch(mode=0o600)
(output_dir / "key.txt").write_text(key)
def upload(self) -> None:
if "activation" in phases:
for generator in self.machine.vars_generators:
for file in generator.files:
if file.needed_for == "activation":
target_path = (
output_dir / "activation" / generator.name / file.name
)
target_path.parent.mkdir(
parents=True,
exist_ok=True,
)
# chmod after in case it doesn't have u+w
target_path.touch(mode=0o600)
target_path.write_bytes(self.get(generator, file.name))
target_path.chmod(file.mode)
if "partitioning" in phases:
for generator in self.machine.vars_generators:
for file in generator.files:
if file.needed_for == "partitioning":
target_path = output_dir / generator.name / file.name
target_path.parent.mkdir(
parents=True,
exist_ok=True,
)
# chmod after in case it doesn't have u+w
target_path.touch(mode=0o600)
target_path.write_bytes(self.get(generator, file.name))
target_path.chmod(file.mode)
def upload(self, phases: list[str]) -> None:
if "partitioning" in phases:
msg = "Cannot upload partitioning secrets"
raise NotImplementedError(msg)
with TemporaryDirectory(prefix="sops-upload-") as tempdir:
sops_upload_dir = Path(tempdir)
self.populate_dir(sops_upload_dir)
self.populate_dir(sops_upload_dir, phases)
upload(self.machine.target_host, sops_upload_dir, Path("/var/lib/sops-nix"))
def exists(self, generator: Generator, name: str) -> bool:

View File

@@ -32,11 +32,18 @@ class SecretStore(StoreBase):
secret_file.write_bytes(value)
return None # we manage the files outside of the git repo
def exists(self, generator: "Generator", name: str) -> bool:
return (self.dir / generator.name / name).exists()
def get(self, generator: Generator, name: str) -> bytes:
secret_file = self.dir / generator.name / name
return secret_file.read_bytes()
def populate_dir(self, output_dir: Path) -> None:
def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if output_dir.exists():
shutil.rmtree(output_dir)
shutil.copytree(self.dir, output_dir)
def upload(self, phases: list[str]) -> None:
msg = "Cannot upload secrets to VMs"
raise NotImplementedError(msg)

View File

@@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
def upload_secret_vars(machine: Machine) -> None:
secret_store_module = importlib.import_module(machine.secret_vars_module)
secret_store = secret_store_module.SecretStore(machine=machine)
secret_store.upload()
secret_store.upload(phases=["activation", "users", "services"])
def upload_command(args: argparse.Namespace) -> None: