Compare commits
2 Commits
validation
...
Qubasa-mai
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b49653aaf | ||
|
|
1beb9a8ca0 |
@@ -30,6 +30,7 @@
|
||||
# this disables dynamic dependency loading in clan-cli
|
||||
export CLAN_NO_DYNAMIC_DEPS=1
|
||||
|
||||
export IN_PYTEST=1
|
||||
nix develop "$ROOT#clan-cli" -c bash -c "TMPDIR=/tmp python -m pytest -m impure ./tests $@"
|
||||
'';
|
||||
};
|
||||
|
||||
@@ -1,21 +1,28 @@
|
||||
import tarfile
|
||||
from pathlib import Path
|
||||
from shlex import quote
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from clan_cli.cmd import Log, RunOpts
|
||||
from clan_cli.cmd import run as run_local
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.ssh.host import Host
|
||||
|
||||
|
||||
def upload(
|
||||
host: Host,
|
||||
local_src: Path, # must be a directory
|
||||
local_src: Path,
|
||||
remote_dest: Path, # must be a directory
|
||||
file_user: str = "root",
|
||||
file_group: str = "root",
|
||||
dir_mode: int = 0o700,
|
||||
file_mode: int = 0o400,
|
||||
) -> None:
|
||||
# Check if the remote destination is at least 3 directories deep
|
||||
if len(remote_dest.parts) < 3:
|
||||
msg = f"The remote destination must be at least 3 directories deep. Got: {remote_dest}. Reason: The directory will be deleted with 'rm -rf'."
|
||||
raise ClanError(msg)
|
||||
|
||||
# Create the tarball from the temporary directory
|
||||
with TemporaryDirectory(prefix="facts-upload-") as tardir:
|
||||
tar_path = Path(tardir) / "upload.tar.gz"
|
||||
@@ -55,64 +62,22 @@ def upload(
|
||||
with local_src.open("rb") as f:
|
||||
tar.addfile(tarinfo, f)
|
||||
|
||||
priviledge_escalation = []
|
||||
sudo = ""
|
||||
if host.user != "root":
|
||||
priviledge_escalation = ["sudo", "--"]
|
||||
sudo = "sudo -- "
|
||||
|
||||
if local_src.is_dir():
|
||||
cmd = [
|
||||
*host.ssh_cmd(),
|
||||
"--",
|
||||
*priviledge_escalation,
|
||||
"bash",
|
||||
"-c",
|
||||
'exec "$@"',
|
||||
"--",
|
||||
"rm",
|
||||
"-r",
|
||||
str(remote_dest),
|
||||
"mkdir",
|
||||
"-m",
|
||||
f"{dir_mode:o}",
|
||||
"-p",
|
||||
str(remote_dest),
|
||||
"&&",
|
||||
"tar",
|
||||
"-C",
|
||||
str(remote_dest),
|
||||
"-xzf",
|
||||
"-",
|
||||
]
|
||||
else:
|
||||
# For single file, extract to parent directory and ensure correct name
|
||||
cmd = [
|
||||
*host.ssh_cmd(),
|
||||
"--",
|
||||
*priviledge_escalation,
|
||||
"bash",
|
||||
"-c",
|
||||
'exec "$@"',
|
||||
"--",
|
||||
"rm",
|
||||
"-r",
|
||||
str(remote_dest),
|
||||
"mkdir",
|
||||
"-m",
|
||||
f"{dir_mode:o}",
|
||||
"-p",
|
||||
str(remote_dest.parent),
|
||||
"&&",
|
||||
"tar",
|
||||
"-C",
|
||||
str(remote_dest.parent),
|
||||
"-xzf",
|
||||
"-",
|
||||
]
|
||||
cmd = "rm -rf $0 && mkdir -m $1 -p $0 && tar -C $0 -xzf -"
|
||||
|
||||
# TODO accept `input` to be an IO object instead of bytes so that we don't have to read the tarfile into memory.
|
||||
with tar_path.open("rb") as f:
|
||||
run_local(
|
||||
cmd,
|
||||
[
|
||||
*host.ssh_cmd(),
|
||||
"--",
|
||||
f"{sudo}bash -c {quote(cmd)}",
|
||||
str(remote_dest),
|
||||
f"{dir_mode:o}",
|
||||
],
|
||||
RunOpts(
|
||||
input=f.read(),
|
||||
log=Log.BOTH,
|
||||
|
||||
@@ -57,7 +57,10 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
|
||||
)
|
||||
config = tmpdir / "sshd_config"
|
||||
config.write_text(content)
|
||||
login_shell = tmpdir / "shell"
|
||||
bin_path = tmpdir / "bin"
|
||||
login_shell = bin_path / "shell"
|
||||
fake_sudo = bin_path / "sudo"
|
||||
login_shell.parent.mkdir(parents=True)
|
||||
|
||||
bash = shutil.which("bash")
|
||||
path = os.environ["PATH"]
|
||||
@@ -65,19 +68,23 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
|
||||
|
||||
login_shell.write_text(
|
||||
f"""#!{bash}
|
||||
set -x
|
||||
if [[ -f /etc/profile ]]; then
|
||||
source /etc/profile
|
||||
fi
|
||||
if [[ -n "$REALPATH" ]]; then
|
||||
export PATH="$REALPATH:${path}"
|
||||
else
|
||||
export PATH="${path}"
|
||||
fi
|
||||
export PATH="{bin_path}:{path}"
|
||||
exec {bash} -l "${{@}}"
|
||||
"""
|
||||
)
|
||||
login_shell.chmod(0o755)
|
||||
|
||||
fake_sudo.write_text(
|
||||
f"""#!{bash}
|
||||
exec "${{@}}"
|
||||
"""
|
||||
)
|
||||
fake_sudo.chmod(0o755)
|
||||
|
||||
lib_path = None
|
||||
|
||||
extension = ".so"
|
||||
|
||||
@@ -26,6 +26,17 @@ def test_secrets_upload(
|
||||
monkeypatch.chdir(str(flake.path))
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||
|
||||
sops_dir = flake.path / "facts"
|
||||
|
||||
# the flake defines this path as the location where the sops key should be installed
|
||||
sops_key = sops_dir / "key.txt"
|
||||
sops_key2 = sops_dir / "key2.txt"
|
||||
|
||||
# Create old state, which should be cleaned up
|
||||
sops_dir.mkdir()
|
||||
sops_key.write_text("OLD STATE")
|
||||
sops_key2.write_text("OLD STATE2")
|
||||
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
@@ -56,8 +67,6 @@ def test_secrets_upload(
|
||||
|
||||
cli.run(["facts", "upload", "--flake", str(flake_path), "vm1"])
|
||||
|
||||
# the flake defines this path as the location where the sops key should be installed
|
||||
sops_key = flake.path / "facts" / "key.txt"
|
||||
|
||||
assert sops_key.exists()
|
||||
assert sops_key.read_text() == age_keys[0].privkey
|
||||
assert not sops_key2.exists()
|
||||
|
||||
Reference in New Issue
Block a user