CLI: init cmd_with_root

This commit is contained in:
Johannes Kirschbauer
2024-12-29 15:28:06 +01:00
parent a1c640db3d
commit 7b930ab440
4 changed files with 66 additions and 21 deletions

View File

@@ -4,6 +4,7 @@ import math
import os import os
import select import select
import shlex import shlex
import shutil
import signal import signal
import subprocess import subprocess
import threading import threading
@@ -271,6 +272,38 @@ class RunOpts:
needs_user_terminal: bool = False needs_user_terminal: bool = False
timeout: float = math.inf timeout: float = math.inf
shell: bool = False shell: bool = False
# Some commands require sudo
requires_root_perm: bool = False
# Ask for sudo password in a graphical way.
# This is needed for GUI applications
graphical_perm: bool = False
def cmd_with_root(cmd: list[str], graphical: bool = False) -> list[str]:
"""
This function returns a wrapped command that will be run with root permissions.
It will use sudo if graphical is False, otherwise it will use run0 or pkexec.
"""
if os.geteuid() == 0:
return cmd
# Decide permission handler
if graphical:
# TODO(mic92): figure out how to use run0
# if shutil.which("run0") is not None:
# perm_prefix = "run0"
if shutil.which("pkexec") is not None:
return ["pkexec", *cmd]
description = (
"pkexec is required to launch root commands with graphical permissions"
)
msg = "Missing graphical permission handler"
raise ClanError(msg, description=description)
if shutil.which("sudo") is None:
msg = "sudo is required to run this command as a non-root user"
raise ClanError(msg)
return ["sudo", *cmd]
def run( def run(
@@ -293,6 +326,9 @@ def run(
if options.stderr is None: if options.stderr is None:
options.stderr = async_ctx.stderr options.stderr = async_ctx.stderr
if options.requires_root_perm:
cmd = cmd_with_root(cmd, options.graphical_perm)
if options.input: if options.input:
if any(not ch.isprintable() for ch in options.input.decode("ascii", "replace")): if any(not ch.isprintable() for ch in options.input.decode("ascii", "replace")):
filtered_input = "<<binary_blob>>" filtered_input = "<<binary_blob>>"

View File

@@ -7,14 +7,13 @@ from pathlib import Path
from clan_cli.cmd import Log, RunOpts, run from clan_cli.cmd import Log, RunOpts, run
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@contextmanager @contextmanager
def pause_automounting( def pause_automounting(
devices: list[Path], machine: Machine, sudo: str = "pkexec" devices: list[Path], machine: Machine, request_graphical: bool = False
) -> Generator[None, None, None]: ) -> Generator[None, None, None]:
""" """
Pause automounting on the device for the duration of this context Pause automounting on the device for the duration of this context
@@ -33,21 +32,32 @@ def pause_automounting(
raise ClanError(msg) raise ClanError(msg)
str_devs = [str(dev) for dev in devices] str_devs = [str(dev) for dev in devices]
cmd = nix_shell( cmd = [str(inhibit_path), "enable", *str_devs]
["nixpkgs#pkexec"] if sudo == "pkexec" else [],
[sudo, str(inhibit_path), "enable", *str_devs],
)
result = run( result = run(
cmd, cmd,
RunOpts( RunOpts(
log=Log.BOTH, check=False, needs_user_terminal=True, prefix=machine.name log=Log.BOTH,
check=False,
needs_user_terminal=True,
prefix=machine.name,
requires_root_perm=True,
graphical_perm=request_graphical,
), ),
) )
if result.returncode != 0: if result.returncode != 0:
machine.error("Failed to inhibit automounting") machine.error("Failed to inhibit automounting")
yield None yield None
cmd = [sudo, str(inhibit_path), "disable", *str_devs] cmd = [str(inhibit_path), "disable", *str_devs]
result = run(cmd, RunOpts(log=Log.BOTH, check=False, prefix=machine.name)) result = run(
cmd,
RunOpts(
log=Log.BOTH,
check=False,
prefix=machine.name,
requires_root_perm=True,
graphical_perm=request_graphical,
),
)
if result.returncode != 0: if result.returncode != 0:
machine.error("Failed to re-enable automounting") machine.error("Failed to re-enable automounting")

View File

@@ -2,14 +2,13 @@ import importlib
import json import json
import logging import logging
import os import os
import shutil
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any from typing import Any
from clan_cli.api import API from clan_cli.api import API
from clan_cli.cmd import Log, RunOpts, run from clan_cli.cmd import Log, RunOpts, cmd_with_root, run
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.facts.generate import generate_facts from clan_cli.facts.generate import generate_facts
from clan_cli.facts.secret_modules import SecretStoreBase from clan_cli.facts.secret_modules import SecretStoreBase
@@ -47,11 +46,10 @@ def flash_machine(
write_efi_boot_entries: bool, write_efi_boot_entries: bool,
debug: bool, debug: bool,
extra_args: list[str] | None = None, extra_args: list[str] | None = None,
use_user_permission: bool = False, graphical: bool = False,
) -> None: ) -> None:
devices = [Path(disk.device) for disk in disks] devices = [Path(disk.device) for disk in disks]
sudo = "pkexec" if use_user_permission else "sudo" with pause_automounting(devices, machine, request_graphical=graphical):
with pause_automounting(devices, machine, sudo):
if extra_args is None: if extra_args is None:
extra_args = [] extra_args = []
system_config_nix: dict[str, Any] = {} system_config_nix: dict[str, Any] = {}
@@ -110,12 +108,13 @@ def flash_machine(
disko_install = [] disko_install = []
if os.geteuid() != 0: if os.geteuid() != 0:
# Use pkexec to elevate permissions if not running as root wrapper = " ".join(
perm_prefix = "pkexec" if use_user_permission else "exec sudo" [
if shutil.which("sudo") is None: "disko_install=$(command -v disko-install);",
msg = "sudo is required to run disko-install as a non-root user" "exec",
raise ClanError(msg) *cmd_with_root(['"$disko_install" "$@"'], graphical=graphical),
wrapper = f'set -x; disko_install=$(command -v disko-install); {perm_prefix} "$disko_install" "$@"' ]
)
disko_install.extend(["bash", "-c", wrapper]) disko_install.extend(["bash", "-c", wrapper])
disko_install.append("disko-install") disko_install.append("disko-install")

View File

@@ -176,7 +176,7 @@ export const Flash = () => {
dry_run: false, dry_run: false,
write_efi_boot_entries: false, write_efi_boot_entries: false,
debug: false, debug: false,
use_pkexec: true, graphical: true,
}), }),
{ {
error: (errors) => `Error flashing disk: ${errors}`, error: (errors) => `Error flashing disk: ${errors}`,