CLI: init cmd_with_root
This commit is contained in:
@@ -4,6 +4,7 @@ import math
|
||||
import os
|
||||
import select
|
||||
import shlex
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import threading
|
||||
@@ -271,6 +272,38 @@ class RunOpts:
|
||||
needs_user_terminal: bool = False
|
||||
timeout: float = math.inf
|
||||
shell: bool = False
|
||||
# Some commands require sudo
|
||||
requires_root_perm: bool = False
|
||||
# Ask for sudo password in a graphical way.
|
||||
# This is needed for GUI applications
|
||||
graphical_perm: bool = False
|
||||
|
||||
|
||||
def cmd_with_root(cmd: list[str], graphical: bool = False) -> list[str]:
|
||||
"""
|
||||
This function returns a wrapped command that will be run with root permissions.
|
||||
It will use sudo if graphical is False, otherwise it will use run0 or pkexec.
|
||||
"""
|
||||
if os.geteuid() == 0:
|
||||
return cmd
|
||||
|
||||
# Decide permission handler
|
||||
if graphical:
|
||||
# TODO(mic92): figure out how to use run0
|
||||
# if shutil.which("run0") is not None:
|
||||
# perm_prefix = "run0"
|
||||
if shutil.which("pkexec") is not None:
|
||||
return ["pkexec", *cmd]
|
||||
description = (
|
||||
"pkexec is required to launch root commands with graphical permissions"
|
||||
)
|
||||
msg = "Missing graphical permission handler"
|
||||
raise ClanError(msg, description=description)
|
||||
if shutil.which("sudo") is None:
|
||||
msg = "sudo is required to run this command as a non-root user"
|
||||
raise ClanError(msg)
|
||||
|
||||
return ["sudo", *cmd]
|
||||
|
||||
|
||||
def run(
|
||||
@@ -293,6 +326,9 @@ def run(
|
||||
if options.stderr is None:
|
||||
options.stderr = async_ctx.stderr
|
||||
|
||||
if options.requires_root_perm:
|
||||
cmd = cmd_with_root(cmd, options.graphical_perm)
|
||||
|
||||
if options.input:
|
||||
if any(not ch.isprintable() for ch in options.input.decode("ascii", "replace")):
|
||||
filtered_input = "<<binary_blob>>"
|
||||
|
||||
@@ -7,14 +7,13 @@ from pathlib import Path
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def pause_automounting(
|
||||
devices: list[Path], machine: Machine, sudo: str = "pkexec"
|
||||
devices: list[Path], machine: Machine, request_graphical: bool = False
|
||||
) -> Generator[None, None, None]:
|
||||
"""
|
||||
Pause automounting on the device for the duration of this context
|
||||
@@ -33,21 +32,32 @@ def pause_automounting(
|
||||
raise ClanError(msg)
|
||||
|
||||
str_devs = [str(dev) for dev in devices]
|
||||
cmd = nix_shell(
|
||||
["nixpkgs#pkexec"] if sudo == "pkexec" else [],
|
||||
[sudo, str(inhibit_path), "enable", *str_devs],
|
||||
)
|
||||
cmd = [str(inhibit_path), "enable", *str_devs]
|
||||
|
||||
result = run(
|
||||
cmd,
|
||||
RunOpts(
|
||||
log=Log.BOTH, check=False, needs_user_terminal=True, prefix=machine.name
|
||||
log=Log.BOTH,
|
||||
check=False,
|
||||
needs_user_terminal=True,
|
||||
prefix=machine.name,
|
||||
requires_root_perm=True,
|
||||
graphical_perm=request_graphical,
|
||||
),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
machine.error("Failed to inhibit automounting")
|
||||
yield None
|
||||
cmd = [sudo, str(inhibit_path), "disable", *str_devs]
|
||||
result = run(cmd, RunOpts(log=Log.BOTH, check=False, prefix=machine.name))
|
||||
cmd = [str(inhibit_path), "disable", *str_devs]
|
||||
result = run(
|
||||
cmd,
|
||||
RunOpts(
|
||||
log=Log.BOTH,
|
||||
check=False,
|
||||
prefix=machine.name,
|
||||
requires_root_perm=True,
|
||||
graphical_perm=request_graphical,
|
||||
),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
machine.error("Failed to re-enable automounting")
|
||||
|
||||
@@ -2,14 +2,13 @@ import importlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.cmd import Log, RunOpts, cmd_with_root, run
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.facts.generate import generate_facts
|
||||
from clan_cli.facts.secret_modules import SecretStoreBase
|
||||
@@ -47,11 +46,10 @@ def flash_machine(
|
||||
write_efi_boot_entries: bool,
|
||||
debug: bool,
|
||||
extra_args: list[str] | None = None,
|
||||
use_user_permission: bool = False,
|
||||
graphical: bool = False,
|
||||
) -> None:
|
||||
devices = [Path(disk.device) for disk in disks]
|
||||
sudo = "pkexec" if use_user_permission else "sudo"
|
||||
with pause_automounting(devices, machine, sudo):
|
||||
with pause_automounting(devices, machine, request_graphical=graphical):
|
||||
if extra_args is None:
|
||||
extra_args = []
|
||||
system_config_nix: dict[str, Any] = {}
|
||||
@@ -110,12 +108,13 @@ def flash_machine(
|
||||
disko_install = []
|
||||
|
||||
if os.geteuid() != 0:
|
||||
# Use pkexec to elevate permissions if not running as root
|
||||
perm_prefix = "pkexec" if use_user_permission else "exec sudo"
|
||||
if shutil.which("sudo") is None:
|
||||
msg = "sudo is required to run disko-install as a non-root user"
|
||||
raise ClanError(msg)
|
||||
wrapper = f'set -x; disko_install=$(command -v disko-install); {perm_prefix} "$disko_install" "$@"'
|
||||
wrapper = " ".join(
|
||||
[
|
||||
"disko_install=$(command -v disko-install);",
|
||||
"exec",
|
||||
*cmd_with_root(['"$disko_install" "$@"'], graphical=graphical),
|
||||
]
|
||||
)
|
||||
disko_install.extend(["bash", "-c", wrapper])
|
||||
|
||||
disko_install.append("disko-install")
|
||||
|
||||
@@ -176,7 +176,7 @@ export const Flash = () => {
|
||||
dry_run: false,
|
||||
write_efi_boot_entries: false,
|
||||
debug: false,
|
||||
use_pkexec: true,
|
||||
graphical: true,
|
||||
}),
|
||||
{
|
||||
error: (errors) => `Error flashing disk: ${errors}`,
|
||||
|
||||
Reference in New Issue
Block a user