Merge pull request 'clan-cli: Make host upload function support uploading single files too' (#2943) from Qubasa/clan-core:main into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2943
This commit is contained in:
Luis Hebendanz
2025-03-01 16:35:19 +00:00

View File

@@ -4,7 +4,6 @@ from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, RunOpts from clan_cli.cmd import Log, RunOpts
from clan_cli.cmd import run as run_local from clan_cli.cmd import run as run_local
from clan_cli.errors import ClanError
from clan_cli.ssh.host import Host from clan_cli.ssh.host import Host
@@ -17,15 +16,6 @@ def upload(
dir_mode: int = 0o700, dir_mode: int = 0o700,
file_mode: int = 0o400, file_mode: int = 0o400,
) -> None: ) -> None:
# check if the remote destination is a directory (no suffix)
if remote_dest.suffix:
msg = "Only directories are allowed"
raise ClanError(msg)
if not local_src.is_dir():
msg = "Only directories are allowed"
raise ClanError(msg)
# Create the tarball from the temporary directory # Create the tarball from the temporary directory
with TemporaryDirectory(prefix="facts-upload-") as tardir: with TemporaryDirectory(prefix="facts-upload-") as tardir:
tar_path = Path(tardir) / "upload.tar.gz" tar_path = Path(tardir) / "upload.tar.gz"
@@ -33,46 +23,77 @@ def upload(
# As first uploading the tarball and then changing the permissions can lead an attacker to # As first uploading the tarball and then changing the permissions can lead an attacker to
# do a race condition attack # do a race condition attack
with tarfile.open(str(tar_path), "w:gz") as tar: with tarfile.open(str(tar_path), "w:gz") as tar:
for root, dirs, files in local_src.walk(): if local_src.is_dir():
for mdir in dirs: # Handle directory upload
dir_path = Path(root) / mdir for root, dirs, files in local_src.walk():
tarinfo = tar.gettarinfo( for mdir in dirs:
dir_path, arcname=str(dir_path.relative_to(str(local_src))) dir_path = Path(root) / mdir
) tarinfo = tar.gettarinfo(
tarinfo.mode = dir_mode dir_path, arcname=str(dir_path.relative_to(str(local_src)))
tarinfo.uname = file_user )
tarinfo.gname = file_group tarinfo.mode = dir_mode
tar.addfile(tarinfo) tarinfo.uname = file_user
for file in files: tarinfo.gname = file_group
file_path = Path(root) / file tar.addfile(tarinfo)
tarinfo = tar.gettarinfo( for file in files:
file_path, file_path = Path(root) / file
arcname=str(file_path.relative_to(str(local_src))), tarinfo = tar.gettarinfo(
) file_path,
tarinfo.mode = file_mode arcname=str(file_path.relative_to(str(local_src))),
tarinfo.uname = file_user )
tarinfo.gname = file_group tarinfo.mode = file_mode
with file_path.open("rb") as f: tarinfo.uname = file_user
tar.addfile(tarinfo, f) tarinfo.gname = file_group
with file_path.open("rb") as f:
tar.addfile(tarinfo, f)
else:
# Handle single file upload
tarinfo = tar.gettarinfo(local_src, arcname=remote_dest.name)
tarinfo.mode = file_mode
tarinfo.uname = file_user
tarinfo.gname = file_group
with local_src.open("rb") as f:
tar.addfile(tarinfo, f)
cmd = [ if local_src.is_dir():
*host.ssh_cmd(), cmd = [
"rm", *host.ssh_cmd(),
"-r", "rm",
str(remote_dest), "-r",
";", str(remote_dest),
"mkdir", ";",
"-m", "mkdir",
f"{dir_mode:o}", "-m",
"-p", f"{dir_mode:o}",
str(remote_dest), "-p",
"&&", str(remote_dest),
"tar", "&&",
"-C", "tar",
str(remote_dest), "-C",
"-xzf", str(remote_dest),
"-", "-xzf",
] "-",
]
else:
# For single file, extract to parent directory and ensure correct name
cmd = [
*host.ssh_cmd(),
"rm",
"-f",
str(remote_dest),
";",
"mkdir",
"-m",
f"{dir_mode:o}",
"-p",
str(remote_dest.parent),
"&&",
"tar",
"-C",
str(remote_dest.parent),
"-xzf",
"-",
]
# TODO accept `input` to be an IO object instead of bytes so that we don't have to read the tarfile into memory. # TODO accept `input` to be an IO object instead of bytes so that we don't have to read the tarfile into memory.
with tar_path.open("rb") as f: with tar_path.open("rb") as f: