Compare commits

...

2 Commits

Author SHA1 Message Date
Jörg Thalheim
7b49653aaf add fake_sudo 2025-03-28 16:59:19 +00:00
Qubasa
1beb9a8ca0 test_secrets_upload: Don't prepend sudo inside test; Improve secret upload test 2025-03-28 16:59:19 +00:00
4 changed files with 44 additions and 62 deletions

View File

@@ -30,6 +30,7 @@
# this disables dynamic dependency loading in clan-cli # this disables dynamic dependency loading in clan-cli
export CLAN_NO_DYNAMIC_DEPS=1 export CLAN_NO_DYNAMIC_DEPS=1
export IN_PYTEST=1
nix develop "$ROOT#clan-cli" -c bash -c "TMPDIR=/tmp python -m pytest -m impure ./tests $@" nix develop "$ROOT#clan-cli" -c bash -c "TMPDIR=/tmp python -m pytest -m impure ./tests $@"
''; '';
}; };

View File

@@ -1,21 +1,28 @@
import tarfile import tarfile
from pathlib import Path from pathlib import Path
from shlex import quote
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, RunOpts from clan_cli.cmd import Log, RunOpts
from clan_cli.cmd import run as run_local from clan_cli.cmd import run as run_local
from clan_cli.errors import ClanError
from clan_cli.ssh.host import Host from clan_cli.ssh.host import Host
def upload( def upload(
host: Host, host: Host,
local_src: Path, # must be a directory local_src: Path,
remote_dest: Path, # must be a directory remote_dest: Path, # must be a directory
file_user: str = "root", file_user: str = "root",
file_group: str = "root", file_group: str = "root",
dir_mode: int = 0o700, dir_mode: int = 0o700,
file_mode: int = 0o400, file_mode: int = 0o400,
) -> None: ) -> None:
# Check if the remote destination is at least 3 directories deep
if len(remote_dest.parts) < 3:
msg = f"The remote destination must be at least 3 directories deep. Got: {remote_dest}. Reason: The directory will be deleted with 'rm -rf'."
raise ClanError(msg)
# Create the tarball from the temporary directory # Create the tarball from the temporary directory
with TemporaryDirectory(prefix="facts-upload-") as tardir: with TemporaryDirectory(prefix="facts-upload-") as tardir:
tar_path = Path(tardir) / "upload.tar.gz" tar_path = Path(tardir) / "upload.tar.gz"
@@ -55,64 +62,22 @@ def upload(
with local_src.open("rb") as f: with local_src.open("rb") as f:
tar.addfile(tarinfo, f) tar.addfile(tarinfo, f)
priviledge_escalation = [] sudo = ""
if host.user != "root": if host.user != "root":
priviledge_escalation = ["sudo", "--"] sudo = "sudo -- "
if local_src.is_dir(): cmd = "rm -rf $0 && mkdir -m $1 -p $0 && tar -C $0 -xzf -"
cmd = [
*host.ssh_cmd(),
"--",
*priviledge_escalation,
"bash",
"-c",
'exec "$@"',
"--",
"rm",
"-r",
str(remote_dest),
"mkdir",
"-m",
f"{dir_mode:o}",
"-p",
str(remote_dest),
"&&",
"tar",
"-C",
str(remote_dest),
"-xzf",
"-",
]
else:
# For single file, extract to parent directory and ensure correct name
cmd = [
*host.ssh_cmd(),
"--",
*priviledge_escalation,
"bash",
"-c",
'exec "$@"',
"--",
"rm",
"-r",
str(remote_dest),
"mkdir",
"-m",
f"{dir_mode:o}",
"-p",
str(remote_dest.parent),
"&&",
"tar",
"-C",
str(remote_dest.parent),
"-xzf",
"-",
]
# TODO accept `input` to be an IO object instead of bytes so that we don't have to read the tarfile into memory. # TODO accept `input` to be an IO object instead of bytes so that we don't have to read the tarfile into memory.
with tar_path.open("rb") as f: with tar_path.open("rb") as f:
run_local( run_local(
cmd, [
*host.ssh_cmd(),
"--",
f"{sudo}bash -c {quote(cmd)}",
str(remote_dest),
f"{dir_mode:o}",
],
RunOpts( RunOpts(
input=f.read(), input=f.read(),
log=Log.BOTH, log=Log.BOTH,

View File

@@ -57,7 +57,10 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
) )
config = tmpdir / "sshd_config" config = tmpdir / "sshd_config"
config.write_text(content) config.write_text(content)
login_shell = tmpdir / "shell" bin_path = tmpdir / "bin"
login_shell = bin_path / "shell"
fake_sudo = bin_path / "sudo"
login_shell.parent.mkdir(parents=True)
bash = shutil.which("bash") bash = shutil.which("bash")
path = os.environ["PATH"] path = os.environ["PATH"]
@@ -65,19 +68,23 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
login_shell.write_text( login_shell.write_text(
f"""#!{bash} f"""#!{bash}
set -x
if [[ -f /etc/profile ]]; then if [[ -f /etc/profile ]]; then
source /etc/profile source /etc/profile
fi fi
if [[ -n "$REALPATH" ]]; then export PATH="{bin_path}:{path}"
export PATH="$REALPATH:${path}"
else
export PATH="${path}"
fi
exec {bash} -l "${{@}}" exec {bash} -l "${{@}}"
""" """
) )
login_shell.chmod(0o755) login_shell.chmod(0o755)
fake_sudo.write_text(
f"""#!{bash}
exec "${{@}}"
"""
)
fake_sudo.chmod(0o755)
lib_path = None lib_path = None
extension = ".so" extension = ".so"

View File

@@ -26,6 +26,17 @@ def test_secrets_upload(
monkeypatch.chdir(str(flake.path)) monkeypatch.chdir(str(flake.path))
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
sops_dir = flake.path / "facts"
# the flake defines this path as the location where the sops key should be installed
sops_key = sops_dir / "key.txt"
sops_key2 = sops_dir / "key2.txt"
# Create old state, which should be cleaned up
sops_dir.mkdir()
sops_key.write_text("OLD STATE")
sops_key2.write_text("OLD STATE2")
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -56,8 +67,6 @@ def test_secrets_upload(
cli.run(["facts", "upload", "--flake", str(flake_path), "vm1"]) cli.run(["facts", "upload", "--flake", str(flake_path), "vm1"])
# the flake defines this path as the location where the sops key should be installed
sops_key = flake.path / "facts" / "key.txt"
assert sops_key.exists() assert sops_key.exists()
assert sops_key.read_text() == age_keys[0].privkey assert sops_key.read_text() == age_keys[0].privkey
assert not sops_key2.exists()