sandbox_exec: refactor to use context manager for cleanup

Changed sandbox_exec_cmd to return a context manager that automatically
handles profile file cleanup. This ensures the temporary profile is
always removed, even if exceptions occur.
This commit is contained in:
Jörg Thalheim
2025-07-09 11:57:26 +02:00
committed by lassulus
parent 194647dc71
commit f646890bb3
3 changed files with 41 additions and 63 deletions

View File

@@ -3,6 +3,7 @@ import logging
import os
import shutil
import sys
from contextlib import ExitStack
from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path
@@ -179,12 +180,6 @@ def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]:
# fmt: on
def sandbox_exec_cmd(generator: str, tmpdir: Path) -> tuple[list[str], str]:
from clan_lib.sandbox_exec import sandbox_exec_cmd as _sandbox_exec_cmd
return _sandbox_exec_cmd(generator, tmpdir)
# TODO: implement caching to not decrypt the same secret multiple times
def decrypt_dependencies(
machine: "Machine",
@@ -262,7 +257,8 @@ def execute_generator(
raise ClanError(msg) from e
env = os.environ.copy()
with TemporaryDirectory(prefix="vars-") as _tmpdir:
with ExitStack() as stack:
_tmpdir = stack.enter_context(TemporaryDirectory(prefix="vars-"))
tmpdir = Path(_tmpdir).resolve()
tmpdir_in = tmpdir / "in"
tmpdir_prompts = tmpdir / "prompts"
@@ -286,11 +282,12 @@ def execute_generator(
final_script = generator.final_script()
profile_path = None
if sys.platform == "linux" and bwrap.bubblewrap_works():
cmd = bubblewrap_cmd(str(final_script), tmpdir)
elif sys.platform == "darwin":
cmd, profile_path = sandbox_exec_cmd(str(final_script), tmpdir)
from clan_lib.sandbox_exec import sandbox_exec_cmd
cmd = stack.enter_context(sandbox_exec_cmd(str(final_script), tmpdir))
else:
# For non-sandboxed execution (Linux without bubblewrap or other platforms)
if not no_sandbox:
@@ -301,15 +298,7 @@ def execute_generator(
raise ClanError(msg)
cmd = ["bash", "-c", str(final_script)]
try:
run(cmd, RunOpts(env=env, cwd=tmpdir))
finally:
# Clean up the temporary profile file if needed
if profile_path:
try:
os.unlink(profile_path)
except OSError:
pass
run(cmd, RunOpts(env=env, cwd=tmpdir))
files_to_commit = []
# store secrets
files = generator.files

View File

@@ -1,5 +1,8 @@
import contextlib
import os
import shutil
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
@@ -93,11 +96,12 @@ def create_sandbox_profile() -> str:
return profile_content
def sandbox_exec_cmd(generator: str, tmpdir: Path) -> tuple[list[str], str]:
@contextmanager
def sandbox_exec_cmd(generator: str, tmpdir: Path) -> Iterator[list[str]]:
"""Create a sandbox-exec command for running a generator.
Returns:
tuple: (command_list, profile_path) where profile_path should be cleaned up after use
Yields:
list[str]: The command to execute
"""
profile_content = create_sandbox_profile()
@@ -106,23 +110,28 @@ def sandbox_exec_cmd(generator: str, tmpdir: Path) -> tuple[list[str], str]:
f.write(profile_content)
profile_path = f.name
real_bash_path = Path("bash")
if os.environ.get("IN_NIX_SANDBOX"):
bash_executable_path = Path(str(shutil.which("bash")))
real_bash_path = bash_executable_path.resolve()
try:
real_bash_path = Path("bash")
if os.environ.get("IN_NIX_SANDBOX"):
bash_executable_path = Path(str(shutil.which("bash")))
real_bash_path = bash_executable_path.resolve()
# Use the sandbox profile parameter to define TMPDIR and execute from within it
# Resolve the tmpdir to handle macOS symlinks (/var/folders -> /private/var/folders)
resolved_tmpdir = tmpdir.resolve()
cmd = [
"/usr/bin/sandbox-exec",
"-f",
profile_path,
"-D",
f"_TMPDIR={resolved_tmpdir}",
str(real_bash_path),
"-c",
generator,
]
# Use the sandbox profile parameter to define TMPDIR and execute from within it
# Resolve the tmpdir to handle macOS symlinks (/var/folders -> /private/var/folders)
resolved_tmpdir = tmpdir.resolve()
cmd = [
"/usr/bin/sandbox-exec",
"-f",
profile_path,
"-D",
f"_TMPDIR={resolved_tmpdir}",
str(real_bash_path),
"-c",
generator,
]
return cmd, profile_path
yield cmd
finally:
# Clean up the profile file
with contextlib.suppress(OSError):
Path(profile_path).unlink()

View File

@@ -4,7 +4,6 @@ import tempfile
from pathlib import Path
import pytest
from clan_lib.sandbox_exec import sandbox_exec_cmd
@@ -18,19 +17,12 @@ def test_sandbox_allows_write_to_tmpdir() -> None:
# Create a script that writes to the tmpdir using absolute path
script = f'echo "test content" > "{test_file}"'
cmd, profile_path = sandbox_exec_cmd(script, tmpdir_path)
try:
with sandbox_exec_cmd(script, tmpdir_path) as cmd:
result = subprocess.run(cmd, capture_output=True, text=True)
assert result.returncode == 0, f"Command failed: {result.stderr}"
assert test_file.exists(), "File was not created in tmpdir"
assert test_file.read_text().strip() == "test content"
finally:
# Clean up the profile
try:
Path(profile_path).unlink()
except OSError:
pass
@pytest.mark.impure
@@ -43,14 +35,14 @@ def test_sandbox_denies_write_to_home() -> None:
# Try to write to a file in home directory (should be denied)
forbidden_file = Path.home() / ".sandbox_test_forbidden.txt"
script = f'echo "forbidden" > "{forbidden_file}" 2>&1 || echo "write denied"'
cmd, profile_path = sandbox_exec_cmd(script, tmpdir_path)
try:
# Ensure the file doesn't exist before test
if forbidden_file.exists():
forbidden_file.unlink()
result = subprocess.run(cmd, capture_output=True, text=True)
with sandbox_exec_cmd(script, tmpdir_path) as cmd:
result = subprocess.run(cmd, capture_output=True, text=True)
# Check that either the write was denied or the file wasn't created
# macOS sandbox-exec with (allow default) has limitations
@@ -64,11 +56,6 @@ def test_sandbox_denies_write_to_home() -> None:
# Good - file was not created
assert "write denied" in result.stdout or result.returncode != 0
finally:
# Clean up
try:
Path(profile_path).unlink()
except OSError:
pass
# Clean up test file if it was created
try:
if forbidden_file.exists():
@@ -87,16 +74,9 @@ def test_sandbox_allows_nix_store_read() -> None:
# Use ls to read from nix store (should work) and write result to file
success_file = tmpdir_path / "nix_test.txt"
script = f'ls /nix/store >/dev/null 2>&1 && echo "success" > "{success_file}"'
cmd, profile_path = sandbox_exec_cmd(script, tmpdir_path)
try:
with sandbox_exec_cmd(script, tmpdir_path) as cmd:
result = subprocess.run(cmd, capture_output=True, text=True)
assert result.returncode == 0, f"Command failed: {result.stderr}"
assert success_file.exists(), "Success file was not created"
assert success_file.read_text().strip() == "success"
finally:
# Clean up the profile
try:
Path(profile_path).unlink()
except OSError:
pass