prefix nixpkgs# explicitly in nix_shell

This makes the function usage less confusing (you can now tell from the call side what are flags and what is passed to nix-shell) and allows to use different flakes to download packages.
This commit is contained in:
Jörg Thalheim
2023-12-08 15:00:11 +01:00
parent af3001a3ac
commit 0e91f8721f
12 changed files with 45 additions and 39 deletions

View File

@@ -26,28 +26,30 @@ async def create_flake(directory: Path, url: str) -> dict[str, CmdOut]:
out = await run(command, cwd=directory) out = await run(command, cwd=directory)
response["flake init"] = out response["flake init"] = out
command = nix_shell(["git"], ["git", "init"]) command = nix_shell(["nixpkgs#git"], ["git", "init"])
out = await run(command, cwd=directory) out = await run(command, cwd=directory)
response["git init"] = out response["git init"] = out
command = nix_shell(["git"], ["git", "add", "."]) command = nix_shell(["nixpkgs#git"], ["git", "add", "."])
out = await run(command, cwd=directory) out = await run(command, cwd=directory)
response["git add"] = out response["git add"] = out
# command = nix_shell(["git"], ["git", "config", "init.defaultBranch", "main"]) # command = nix_shell(["nixpkgs#git"], ["git", "config", "init.defaultBranch", "main"])
# out = await run(command, cwd=directory) # out = await run(command, cwd=directory)
# response["git config"] = out # response["git config"] = out
command = nix_shell(["git"], ["git", "config", "user.name", "clan-tool"]) command = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "clan-tool"])
out = await run(command, cwd=directory) out = await run(command, cwd=directory)
response["git config"] = out response["git config"] = out
command = nix_shell(["git"], ["git", "config", "user.email", "clan@example.com"]) command = nix_shell(
["nixpkgs#git"], ["git", "config", "user.email", "clan@example.com"]
)
out = await run(command, cwd=directory) out = await run(command, cwd=directory)
response["git config"] = out response["git config"] = out
# TODO: Find out why this fails on Johannes machine # TODO: Find out why this fails on Johannes machine
# command = nix_shell(["git"], ["git", "commit", "-a", "-m", "Initial commit"]) # command = nix_shell(["nixpkgs#git"], ["git", "commit", "-a", "-m", "Initial commit"])
# out = await run(command, cwd=directory) # out = await run(command, cwd=directory)
# response["git commit"] = out # response["git commit"] = out

View File

@@ -38,7 +38,7 @@ def _commit_file_to_git(repo_dir: Path, file_path: Path, commit_message: str) ->
:raises ClanError: If the file is not in the git repository. :raises ClanError: If the file is not in the git repository.
""" """
cmd = nix_shell( cmd = nix_shell(
["git"], ["nixpkgs#git"],
["git", "-C", str(repo_dir), "add", str(file_path)], ["git", "-C", str(repo_dir), "add", str(file_path)],
) )
# add the file to the git index # add the file to the git index
@@ -51,7 +51,7 @@ def _commit_file_to_git(repo_dir: Path, file_path: Path, commit_message: str) ->
# check if there is a diff # check if there is a diff
cmd = nix_shell( cmd = nix_shell(
["git"], ["nixpkgs#git"],
["git", "-C", str(repo_dir), "diff", "--cached", "--exit-code", str(file_path)], ["git", "-C", str(repo_dir), "diff", "--cached", "--exit-code", str(file_path)],
) )
result = subprocess.run(cmd, cwd=repo_dir) result = subprocess.run(cmd, cwd=repo_dir)
@@ -61,7 +61,7 @@ def _commit_file_to_git(repo_dir: Path, file_path: Path, commit_message: str) ->
# commit only that file # commit only that file
cmd = nix_shell( cmd = nix_shell(
["git"], ["nixpkgs#git"],
[ [
"git", "git",
"-C", "-C",

View File

@@ -28,7 +28,7 @@ def install_nixos(machine: Machine) -> None:
subprocess.run( subprocess.run(
nix_shell( nix_shell(
["nixos-anywhere"], ["nixpkgs#nixos-anywhere"],
[ [
"nixos-anywhere", "nixos-anywhere",
"-f", "-f",

View File

@@ -88,16 +88,15 @@ def nix_shell(packages: list[str], cmd: list[str]) -> list[str]:
# in our tests we just make sure we have all the packages # in our tests we just make sure we have all the packages
if os.environ.get("IN_NIX_SANDBOX"): if os.environ.get("IN_NIX_SANDBOX"):
return cmd return cmd
wrapped_packages = [f"nixpkgs#{p}" for p in packages] return [
return ( *nix_command(
nix_command(
[ [
"shell", "shell",
"--inputs-from", "--inputs-from",
f"{nixpkgs_flake()!s}", f"{nixpkgs_flake()!s}",
] ]
) ),
+ wrapped_packages *packages,
+ ["-c"] "-c",
+ cmd *cmd,
) ]

View File

@@ -22,7 +22,7 @@ def import_sops(args: argparse.Namespace) -> None:
if args.input_type: if args.input_type:
cmd += ["--input-type", args.input_type] cmd += ["--input-type", args.input_type]
cmd += ["--output-type", "json", "--decrypt", args.sops_file] cmd += ["--output-type", "json", "--decrypt", args.sops_file]
cmd = nix_shell(["sops"], cmd) cmd = nix_shell(["nixpkgs#sops"], cmd)
try: try:
res = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE) res = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:

View File

@@ -21,7 +21,7 @@ class SopsKey:
def get_public_key(privkey: str) -> str: def get_public_key(privkey: str) -> str:
cmd = nix_shell(["age"], ["age-keygen", "-y"]) cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try: try:
res = subprocess.run( res = subprocess.run(
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True
@@ -34,7 +34,7 @@ def get_public_key(privkey: str) -> str:
def generate_private_key() -> tuple[str, str]: def generate_private_key() -> tuple[str, str]:
cmd = nix_shell(["age"], ["age-keygen"]) cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
try: try:
proc = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True) proc = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
res = proc.stdout.strip() res = proc.stdout.strip()
@@ -119,7 +119,7 @@ def sops_manifest(keys: list[str]) -> Iterator[Path]:
def update_keys(secret_path: Path, keys: list[str]) -> None: def update_keys(secret_path: Path, keys: list[str]) -> None:
with sops_manifest(keys) as manifest: with sops_manifest(keys) as manifest:
cmd = nix_shell( cmd = nix_shell(
["sops"], ["nixpkgs#sops"],
[ [
"sops", "sops",
"--config", "--config",
@@ -146,7 +146,7 @@ def encrypt_file(
if not content: if not content:
args = ["sops", "--config", str(manifest)] args = ["sops", "--config", str(manifest)]
args.extend([str(secret_path)]) args.extend([str(secret_path)])
cmd = nix_shell(["sops"], args) cmd = nix_shell(["nixpkgs#sops"], args)
p = subprocess.run(cmd) p = subprocess.run(cmd)
# returns 200 if the file is changed # returns 200 if the file is changed
if p.returncode != 0 and p.returncode != 200: if p.returncode != 0 and p.returncode != 200:
@@ -166,7 +166,7 @@ def encrypt_file(
# we pass an empty manifest to pick up existing configuration of the user # we pass an empty manifest to pick up existing configuration of the user
args = ["sops", "--config", str(manifest)] args = ["sops", "--config", str(manifest)]
args.extend(["-i", "--encrypt", str(f.name)]) args.extend(["-i", "--encrypt", str(f.name)])
cmd = nix_shell(["sops"], args) cmd = nix_shell(["nixpkgs#sops"], args)
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
# atomic copy of the encrypted file # atomic copy of the encrypted file
with NamedTemporaryFile(dir=folder, delete=False) as f2: with NamedTemporaryFile(dir=folder, delete=False) as f2:
@@ -182,7 +182,8 @@ def encrypt_file(
def decrypt_file(secret_path: Path) -> str: def decrypt_file(secret_path: Path) -> str:
with sops_manifest([]) as manifest: with sops_manifest([]) as manifest:
cmd = nix_shell( cmd = nix_shell(
["sops"], ["sops", "--config", str(manifest), "--decrypt", str(secret_path)] ["nixpkgs#sops"],
["sops", "--config", str(manifest), "--decrypt", str(secret_path)],
) )
res = subprocess.run(cmd, stdout=subprocess.PIPE, text=True) res = subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
if res.returncode != 0: if res.returncode != 0:

View File

@@ -61,7 +61,7 @@ export secrets={shlex.quote(str(secrets_dir))}
{generator} {generator}
""" """
try: try:
cmd = nix_shell(["bash"], ["bash", "-c", text]) cmd = nix_shell(["nixpkgs#bash"], ["bash", "-c", text])
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
msg = "failed to the following command:\n" msg = "failed to the following command:\n"

View File

@@ -21,7 +21,7 @@ def upload_secrets(machine: Machine) -> None:
ssh_cmd = host.ssh_cmd() ssh_cmd = host.ssh_cmd()
subprocess.run( subprocess.run(
nix_shell( nix_shell(
["rsync"], ["nixpkgs#rsync"],
[ [
"rsync", "rsync",
"-e", "-e",

View File

@@ -11,10 +11,10 @@ def ssh(
password: str | None = None, password: str | None = None,
ssh_args: list[str] = [], ssh_args: list[str] = [],
) -> None: ) -> None:
packages = ["tor", "openssh"] packages = ["nixpkgs#tor", "nixpkgs#openssh"]
password_args = [] password_args = []
if password: if password:
packages.append("sshpass") packages.append("nixpkgs#sshpass")
password_args = [ password_args = [
"sshpass", "sshpass",
"-p", "-p",
@@ -37,7 +37,7 @@ def qrcode_scan(picture_file: str) -> str:
return ( return (
subprocess.run( subprocess.run(
nix_shell( nix_shell(
["zbar"], ["nixpkgs#zbar"],
[ [
"zbarimg", "zbarimg",
"--quiet", "--quiet",

View File

@@ -189,7 +189,7 @@ def generate_secrets(
def prepare_disk(tmpdir: Path, log_fd: IO[str] | None) -> Path: def prepare_disk(tmpdir: Path, log_fd: IO[str] | None) -> Path:
disk_img = tmpdir / "disk.img" disk_img = tmpdir / "disk.img"
cmd = nix_shell( cmd = nix_shell(
["qemu"], ["nixpkgs#qemu"],
[ [
"qemu-img", "qemu-img",
"create", "create",
@@ -211,7 +211,7 @@ def prepare_disk(tmpdir: Path, log_fd: IO[str] | None) -> Path:
) )
cmd = nix_shell( cmd = nix_shell(
["e2fsprogs"], ["nixpkgs#e2fsprogs"],
[ [
"mkfs.ext4", "mkfs.ext4",
"-L", "-L",
@@ -269,11 +269,11 @@ def run_vm(
if vm.wayland: if vm.wayland:
packages = ["git+https://git.clan.lol/clan/clan-core.git#qemu-wayland"] packages = ["git+https://git.clan.lol/clan/clan-core.git#qemu-wayland"]
else: else:
packages = ["qemu"] packages = ["nixpkgs#qemu"]
env = os.environ.copy() env = os.environ.copy()
if vm.graphics and not vm.wayland: if vm.graphics and not vm.wayland:
packages.append("virt-viewer") packages.append("nixpkgs#virt-viewer")
remote_viewer_mimetypes = module_root() / "vms" / "mimetypes" remote_viewer_mimetypes = module_root() / "vms" / "mimetypes"
env[ env[
"XDG_DATA_DIRS" "XDG_DATA_DIRS"

View File

@@ -26,12 +26,12 @@ pytest_plugins = [
@pytest.fixture @pytest.fixture
def git_repo(tmp_path: Path) -> Path: def git_repo(tmp_path: Path) -> Path:
# initialize a git repository # initialize a git repository
cmd = nix_shell(["git"], ["git", "init"]) cmd = nix_shell(["nixpkgs#git"], ["git", "init"])
subprocess.run(cmd, cwd=tmp_path, check=True) subprocess.run(cmd, cwd=tmp_path, check=True)
# set user.name and user.email # set user.name and user.email
cmd = nix_shell(["git"], ["git", "config", "user.name", "test"]) cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "test"])
subprocess.run(cmd, cwd=tmp_path, check=True) subprocess.run(cmd, cwd=tmp_path, check=True)
cmd = nix_shell(["git"], ["git", "config", "user.email", "test@test.test"]) cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.email", "test@test.test"])
subprocess.run(cmd, cwd=tmp_path, check=True) subprocess.run(cmd, cwd=tmp_path, check=True)
# return the path to the git repository # return the path to the git repository
return tmp_path return tmp_path

View File

@@ -35,10 +35,14 @@ def test_upload_secret(
) )
cli = Cli() cli = Cli()
subprocess.run( subprocess.run(
nix_shell(["gnupg"], ["gpg", "--batch", "--gen-key", str(gpg_key_spec)]), nix_shell(
["nixpkgs#gnupg"], ["gpg", "--batch", "--gen-key", str(gpg_key_spec)]
),
check=True, check=True,
) )
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True) subprocess.run(
nix_shell(["nixpkgs#pass"], ["pass", "init", "test@local"]), check=True
)
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1"])
network_id = machine_get_fact( network_id = machine_get_fact(
test_flake_with_core_and_pass.path, "vm1", "zerotier-network-id" test_flake_with_core_and_pass.path, "vm1", "zerotier-network-id"