Fixed unsafe dirs.py urlquoting, improved cmd.py run func

This commit is contained in:
Qubasa
2024-01-02 16:02:33 +01:00
parent d7c5850f2f
commit 79b651a6f2
2 changed files with 36 additions and 32 deletions

View File

@@ -2,10 +2,8 @@ import logging
import shlex import shlex
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any, NamedTuple from typing import Any, NamedTuple
from .custom_logger import get_caller
from .errors import ClanError from .errors import ClanError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -17,40 +15,49 @@ class CmdOut(NamedTuple):
cwd: Path | None = None cwd: Path | None = None
def run(cmd: list[str], cwd: Path | None = None) -> CmdOut: import subprocess
cwd_res = None import sys
if cwd is not None:
if not cwd.exists():
raise ClanError(f"Working directory {cwd} does not exist")
if not cwd.is_dir():
raise ClanError(f"Working directory {cwd} is not a directory")
cwd_res = cwd.resolve()
log.debug(
f"Command: {shlex.join(cmd)}\nWorking directory: {cwd_res}\nCaller : {get_caller()}"
)
proc = Popen(
args=cmd,
stderr=PIPE,
stdout=PIPE,
text=True,
cwd=cwd_res,
)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
def run(cmd: list[str], cwd: Path = Path.cwd()) -> CmdOut:
# Start the subprocess
process = subprocess.Popen(
cmd, cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Initialize empty strings for output and error
output = ""
error = ""
# Iterate over the stdout stream
for c in iter(lambda: process.stdout.read(1), b""): # type: ignore
# Convert bytes to string and append to output
output += c.decode("utf-8")
# Write to terminal
sys.stdout.write(c.decode("utf-8"))
# Iterate over the stderr stream
for c in iter(lambda: process.stderr.read(1), b""): # type: ignore
# Convert bytes to string and append to error
error += c.decode("utf-8")
# Write to terminal
sys.stderr.write(c.decode("utf-8"))
# Wait for the subprocess to finish
process.wait()
if process.returncode != 0:
raise ClanError( raise ClanError(
f""" f"""
command: {shlex.join(cmd)} command: {shlex.join(cmd)}
working directory: {cwd_res} working directory: {cwd}
exit code: {proc.returncode} exit code: {process.returncode}
stderr: stderr:
{stderr} {error}
stdout: stdout:
{stdout} {output}
""" """
) )
return CmdOut(output, error, cwd=cwd)
return CmdOut(stdout, stderr, cwd=cwd)
def runforcli(func: Callable[..., dict[str, CmdOut]], *args: Any) -> None: def runforcli(func: Callable[..., dict[str, CmdOut]], *args: Any) -> None:

View File

@@ -1,4 +1,3 @@
import copy
import logging import logging
import os import os
import sys import sys
@@ -46,9 +45,7 @@ def user_gcroot_dir() -> Path:
def specific_groot_dir(*, clan_name: str, flake_url: str) -> Path: def specific_groot_dir(*, clan_name: str, flake_url: str) -> Path:
# Always build icon so that we can symlink it to the gcroot # Always build icon so that we can symlink it to the gcroot
gcroot_dir = user_gcroot_dir() gcroot_dir = user_gcroot_dir()
# burl = base64.urlsafe_b64encode(flake_url.encode()).decode() burl = urllib.parse.quote_plus(flake_url)
burl = copy.copy(flake_url).replace("/", "_").replace(":", "_")
burl = urllib.parse.quote_plus(burl)
clan_gcroot = gcroot_dir / f"{clan_name}-{burl}" clan_gcroot = gcroot_dir / f"{clan_name}-{burl}"
clan_gcroot.mkdir(parents=True, exist_ok=True) clan_gcroot.mkdir(parents=True, exist_ok=True)