Merge pull request 'remove unused ssh.run method' (#818) from Mic92-target_host into main
This commit is contained in:
@@ -30,7 +30,7 @@ let
|
|||||||
(machineSettings.clanImports or [ ]);
|
(machineSettings.clanImports or [ ]);
|
||||||
|
|
||||||
# TODO: remove default system once we have a hardware-config mechanism
|
# TODO: remove default system once we have a hardware-config mechanism
|
||||||
nixosConfiguration = { system ? "x86_64-linux", name, forceSystem ? false }: nixpkgs.lib.nixosSystem {
|
nixosConfiguration = { system ? "x86_64-linux", name, pkgs ? null }: nixpkgs.lib.nixosSystem {
|
||||||
modules =
|
modules =
|
||||||
let
|
let
|
||||||
settings = machineSettings name;
|
settings = machineSettings name;
|
||||||
@@ -40,19 +40,21 @@ let
|
|||||||
settings
|
settings
|
||||||
clan-core.nixosModules.clanCore
|
clan-core.nixosModules.clanCore
|
||||||
(machines.${name} or { })
|
(machines.${name} or { })
|
||||||
{
|
({
|
||||||
clanCore.clanName = clanName;
|
clanCore.clanName = clanName;
|
||||||
clanCore.clanIcon = clanIcon;
|
clanCore.clanIcon = clanIcon;
|
||||||
clanCore.clanDir = directory;
|
clanCore.clanDir = directory;
|
||||||
clanCore.machineName = name;
|
clanCore.machineName = name;
|
||||||
nixpkgs.hostPlatform = if forceSystem then lib.mkForce system else lib.mkDefault system;
|
nixpkgs.hostPlatform = lib.mkDefault system;
|
||||||
|
|
||||||
# speeds up nix commands by using the nixpkgs from the host system (especially useful in VMs)
|
# speeds up nix commands by using the nixpkgs from the host system (especially useful in VMs)
|
||||||
nix.registry.nixpkgs.to = {
|
nix.registry.nixpkgs.to = {
|
||||||
type = "path";
|
type = "path";
|
||||||
path = lib.mkDefault nixpkgs;
|
path = lib.mkDefault nixpkgs;
|
||||||
};
|
};
|
||||||
}
|
} // lib.optionalAttrs (pkgs != null) {
|
||||||
|
nixpkgs.pkgs = lib.mkForce pkgs;
|
||||||
|
})
|
||||||
];
|
];
|
||||||
inherit specialArgs;
|
inherit specialArgs;
|
||||||
};
|
};
|
||||||
@@ -75,7 +77,7 @@ let
|
|||||||
configsPerSystem = builtins.listToAttrs
|
configsPerSystem = builtins.listToAttrs
|
||||||
(builtins.map
|
(builtins.map
|
||||||
(system: lib.nameValuePair system
|
(system: lib.nameValuePair system
|
||||||
(lib.mapAttrs (name: _: nixosConfiguration { inherit name system; forceSystem = true; }) allMachines))
|
(lib.mapAttrs (name: _: nixosConfiguration { inherit name system; pkgs = nixpkgs.legacyPackages.${system}; }) allMachines))
|
||||||
supportedSystems);
|
supportedSystems);
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
|
|||||||
backup_scripts = json.loads(machine.eval_nix("config.clanCore.backups"))
|
backup_scripts = json.loads(machine.eval_nix("config.clanCore.backups"))
|
||||||
if provider is None:
|
if provider is None:
|
||||||
for provider in backup_scripts["providers"]:
|
for provider in backup_scripts["providers"]:
|
||||||
proc = machine.host.run(
|
proc = machine.target_host.run(
|
||||||
["bash", "-c", backup_scripts["providers"][provider]["create"]],
|
["bash", "-c", backup_scripts["providers"][provider]["create"]],
|
||||||
)
|
)
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
@@ -23,7 +23,7 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
|
|||||||
else:
|
else:
|
||||||
if provider not in backup_scripts["providers"]:
|
if provider not in backup_scripts["providers"]:
|
||||||
raise ClanError(f"provider {provider} not found")
|
raise ClanError(f"provider {provider} not found")
|
||||||
proc = machine.host.run(
|
proc = machine.target_host.run(
|
||||||
["bash", "-c", backup_scripts["providers"][provider]["create"]],
|
["bash", "-c", backup_scripts["providers"][provider]["create"]],
|
||||||
)
|
)
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ class Backup:
|
|||||||
def list_provider(machine: Machine, provider: str) -> list[Backup]:
|
def list_provider(machine: Machine, provider: str) -> list[Backup]:
|
||||||
results = []
|
results = []
|
||||||
backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
|
backup_metadata = json.loads(machine.eval_nix("config.clanCore.backups"))
|
||||||
proc = machine.host.run(
|
proc = machine.target_host.run(
|
||||||
["bash", "-c", backup_metadata["providers"][provider]["list"]],
|
["bash", "-c", backup_metadata["providers"][provider]["list"]],
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
check=False,
|
check=False,
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ def restore_service(
|
|||||||
env["JOB"] = backup.job_name
|
env["JOB"] = backup.job_name
|
||||||
env["FOLDERS"] = ":".join(folders)
|
env["FOLDERS"] = ":".join(folders)
|
||||||
|
|
||||||
proc = machine.host.run(
|
proc = machine.target_host.run(
|
||||||
[
|
[
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
@@ -34,7 +34,7 @@ def restore_service(
|
|||||||
f"failed to run preRestoreScript: {backup_folders[service]['preRestoreScript']}, error was: {proc.stdout}"
|
f"failed to run preRestoreScript: {backup_folders[service]['preRestoreScript']}, error was: {proc.stdout}"
|
||||||
)
|
)
|
||||||
|
|
||||||
proc = machine.host.run(
|
proc = machine.target_host.run(
|
||||||
[
|
[
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
@@ -48,7 +48,7 @@ def restore_service(
|
|||||||
f"failed to restore backup: {backup_metadata['providers'][provider]['restore']}"
|
f"failed to restore backup: {backup_metadata['providers'][provider]['restore']}"
|
||||||
)
|
)
|
||||||
|
|
||||||
proc = machine.host.run(
|
proc = machine.target_host.run(
|
||||||
[
|
[
|
||||||
"bash",
|
"bash",
|
||||||
"-c",
|
"-c",
|
||||||
|
|||||||
@@ -19,12 +19,10 @@ def install_nixos(machine: Machine, kexec: str | None = None) -> None:
|
|||||||
log.info(f"using secret store: {secrets_module.SecretStore}")
|
log.info(f"using secret store: {secrets_module.SecretStore}")
|
||||||
secret_store = secrets_module.SecretStore(machine=machine)
|
secret_store = secrets_module.SecretStore(machine=machine)
|
||||||
|
|
||||||
h = machine.host
|
h = machine.target_host
|
||||||
target_host = f"{h.user or 'root'}@{h.host}"
|
target_host = f"{h.user or 'root'}@{h.host}"
|
||||||
log.info(f"target host: {target_host}")
|
log.info(f"target host: {target_host}")
|
||||||
|
|
||||||
flake_attr = h.meta.get("flake_attr", "")
|
|
||||||
|
|
||||||
generate_secrets(machine)
|
generate_secrets(machine)
|
||||||
|
|
||||||
with TemporaryDirectory() as tmpdir_:
|
with TemporaryDirectory() as tmpdir_:
|
||||||
@@ -40,7 +38,7 @@ def install_nixos(machine: Machine, kexec: str | None = None) -> None:
|
|||||||
cmd = [
|
cmd = [
|
||||||
"nixos-anywhere",
|
"nixos-anywhere",
|
||||||
"-f",
|
"-f",
|
||||||
f"{machine.flake}#{flake_attr}",
|
f"{machine.flake}#{machine.name}",
|
||||||
"-t",
|
"-t",
|
||||||
"--no-reboot",
|
"--no-reboot",
|
||||||
"--extra-files",
|
"--extra-files",
|
||||||
@@ -75,7 +73,7 @@ def install_command(args: argparse.Namespace) -> None:
|
|||||||
kexec=args.kexec,
|
kexec=args.kexec,
|
||||||
)
|
)
|
||||||
machine = Machine(opts.machine, flake=opts.flake)
|
machine = Machine(opts.machine, flake=opts.flake)
|
||||||
machine.target_host = opts.target_host
|
machine.target_host_address = opts.target_host
|
||||||
|
|
||||||
install_nixos(machine, kexec=opts.kexec)
|
install_nixos(machine, kexec=opts.kexec)
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ class Machine:
|
|||||||
return self._deployment_info
|
return self._deployment_info
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def target_host(self) -> str:
|
def target_host_address(self) -> str:
|
||||||
# deploymentAddress is deprecated.
|
# deploymentAddress is deprecated.
|
||||||
val = self.deployment_info.get("targetHost") or self.deployment_info.get(
|
val = self.deployment_info.get("targetHost") or self.deployment_info.get(
|
||||||
"deploymentAddress"
|
"deploymentAddress"
|
||||||
@@ -57,8 +57,8 @@ class Machine:
|
|||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
return val
|
return val
|
||||||
|
|
||||||
@target_host.setter
|
@target_host_address.setter
|
||||||
def target_host(self, value: str) -> None:
|
def target_host_address(self, value: str) -> None:
|
||||||
self.deployment_info["targetHost"] = value
|
self.deployment_info["targetHost"] = value
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -92,9 +92,26 @@ class Machine:
|
|||||||
return Path(self.flake_path)
|
return Path(self.flake_path)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def host(self) -> Host:
|
def target_host(self) -> Host:
|
||||||
return parse_deployment_address(
|
return parse_deployment_address(
|
||||||
self.name, self.target_host, meta={"machine": self}
|
self.name, self.target_host_address, meta={"machine": self}
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def build_host(self) -> Host:
|
||||||
|
"""
|
||||||
|
The host where the machine is built and deployed from.
|
||||||
|
Can be the same as the target host.
|
||||||
|
"""
|
||||||
|
build_host = self.deployment_info.get("buildHost")
|
||||||
|
if build_host is None:
|
||||||
|
return self.target_host
|
||||||
|
# enable ssh agent forwarding to allow the build host to access the target host
|
||||||
|
return parse_deployment_address(
|
||||||
|
self.name,
|
||||||
|
build_host,
|
||||||
|
forward_agent=True,
|
||||||
|
meta={"machine": self, "target_host": self.target_host},
|
||||||
)
|
)
|
||||||
|
|
||||||
def eval_nix(self, attr: str, refresh: bool = False) -> str:
|
def eval_nix(self, attr: str, refresh: bool = False) -> str:
|
||||||
|
|||||||
@@ -105,16 +105,11 @@ def deploy_nixos(hosts: HostGroup) -> None:
|
|||||||
|
|
||||||
ssh_arg += " -i " + h.key if h.key else ""
|
ssh_arg += " -i " + h.key if h.key else ""
|
||||||
|
|
||||||
flake_attr = h.meta.get("flake_attr", "")
|
machine: Machine = h.meta["machine"]
|
||||||
|
|
||||||
generate_secrets(h.meta["machine"])
|
generate_secrets(machine)
|
||||||
upload_secrets(h.meta["machine"])
|
upload_secrets(machine)
|
||||||
|
|
||||||
target_host = h.meta.get("target_host")
|
|
||||||
if target_host:
|
|
||||||
target_user = h.meta.get("target_user")
|
|
||||||
if target_user:
|
|
||||||
target_host = f"{target_user}@{target_host}"
|
|
||||||
extra_args = h.meta.get("extra_args", [])
|
extra_args = h.meta.get("extra_args", [])
|
||||||
cmd = [
|
cmd = [
|
||||||
"nixos-rebuild",
|
"nixos-rebuild",
|
||||||
@@ -130,9 +125,10 @@ def deploy_nixos(hosts: HostGroup) -> None:
|
|||||||
"--build-host",
|
"--build-host",
|
||||||
"",
|
"",
|
||||||
"--flake",
|
"--flake",
|
||||||
f"{path}#{flake_attr}",
|
f"{path}#{machine.name}",
|
||||||
]
|
]
|
||||||
if target_host:
|
if target_host := h.meta.get("target_host"):
|
||||||
|
target_host = f"{target_host.user or 'root'}@{target_host.host}"
|
||||||
cmd.extend(["--target-host", target_host])
|
cmd.extend(["--target-host", target_host])
|
||||||
ret = h.run(cmd, check=False)
|
ret = h.run(cmd, check=False)
|
||||||
# re-retry switch if the first time fails
|
# re-retry switch if the first time fails
|
||||||
@@ -157,16 +153,10 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
|
|||||||
for name, machine_data in machines.items():
|
for name, machine_data in machines.items():
|
||||||
machine = Machine(name=name, flake=clan_dir, deployment_info=machine_data)
|
machine = Machine(name=name, flake=clan_dir, deployment_info=machine_data)
|
||||||
try:
|
try:
|
||||||
machine.target_host
|
hosts.append(machine.build_host)
|
||||||
except ClanError:
|
except ClanError:
|
||||||
ignored_machines.append(name)
|
ignored_machines.append(name)
|
||||||
continue
|
continue
|
||||||
host = parse_deployment_address(
|
|
||||||
name,
|
|
||||||
host=machine.target_host,
|
|
||||||
meta={"machine": machine},
|
|
||||||
)
|
|
||||||
hosts.append(host)
|
|
||||||
if not hosts and ignored_machines != []:
|
if not hosts and ignored_machines != []:
|
||||||
print(
|
print(
|
||||||
"WARNING: No machines to update. The following defined machines were ignored because they do not have `clan.networking.targetHost` nixos option set:",
|
"WARNING: No machines to update. The following defined machines were ignored because they do not have `clan.networking.targetHost` nixos option set:",
|
||||||
@@ -182,7 +172,7 @@ def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGrou
|
|||||||
hosts = []
|
hosts = []
|
||||||
for name in machine_names:
|
for name in machine_names:
|
||||||
machine = Machine(name=name, flake=flake_dir)
|
machine = Machine(name=name, flake=flake_dir)
|
||||||
hosts.append(machine.host)
|
hosts.append(machine.build_host)
|
||||||
return HostGroup(hosts)
|
return HostGroup(hosts)
|
||||||
|
|
||||||
|
|
||||||
@@ -192,7 +182,7 @@ def update(args: argparse.Namespace) -> None:
|
|||||||
raise ClanError("Could not find clan flake toplevel directory")
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
if len(args.machines) == 1 and args.target_host is not None:
|
if len(args.machines) == 1 and args.target_host is not None:
|
||||||
machine = Machine(name=args.machines[0], flake=args.flake)
|
machine = Machine(name=args.machines[0], flake=args.flake)
|
||||||
machine.target_host = args.target_host
|
machine.target_host_address = args.target_host
|
||||||
host = parse_deployment_address(
|
host = parse_deployment_address(
|
||||||
args.machines[0],
|
args.machines[0],
|
||||||
args.target_host,
|
args.target_host,
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ class SecretStore:
|
|||||||
|
|
||||||
def update_check(self) -> bool:
|
def update_check(self) -> bool:
|
||||||
local_hash = self.generate_hash()
|
local_hash = self.generate_hash()
|
||||||
remote_hash = self.machine.host.run(
|
remote_hash = self.machine.target_host.run(
|
||||||
# TODO get the path to the secrets from the machine
|
# TODO get the path to the secrets from the machine
|
||||||
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
|
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
|
||||||
check=False,
|
check=False,
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ def upload_secrets(machine: Machine) -> None:
|
|||||||
return
|
return
|
||||||
with TemporaryDirectory() as tempdir:
|
with TemporaryDirectory() as tempdir:
|
||||||
secret_store.upload(Path(tempdir))
|
secret_store.upload(Path(tempdir))
|
||||||
host = machine.host
|
host = machine.target_host
|
||||||
|
|
||||||
ssh_cmd = host.ssh_cmd()
|
ssh_cmd = host.ssh_cmd()
|
||||||
run(
|
run(
|
||||||
|
|||||||
@@ -16,14 +16,7 @@ from enum import Enum
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shlex import quote
|
from shlex import quote
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from typing import (
|
from typing import IO, Any, Generic, TypeVar
|
||||||
IO,
|
|
||||||
Any,
|
|
||||||
Generic,
|
|
||||||
Literal,
|
|
||||||
TypeVar,
|
|
||||||
overload,
|
|
||||||
)
|
|
||||||
|
|
||||||
# https://no-color.org
|
# https://no-color.org
|
||||||
DISABLE_COLOR = not sys.stderr.isatty() or os.environ.get("NO_COLOR", "") != ""
|
DISABLE_COLOR = not sys.stderr.isatty() or os.environ.get("NO_COLOR", "") != ""
|
||||||
@@ -755,7 +748,7 @@ class HostGroup:
|
|||||||
|
|
||||||
|
|
||||||
def parse_deployment_address(
|
def parse_deployment_address(
|
||||||
machine_name: str, host: str, meta: dict[str, Any] = {}
|
machine_name: str, host: str, forward_agent: bool = True, meta: dict[str, Any] = {}
|
||||||
) -> Host:
|
) -> Host:
|
||||||
parts = host.split("@")
|
parts = host.split("@")
|
||||||
user: str | None = None
|
user: str | None = None
|
||||||
@@ -777,83 +770,12 @@ def parse_deployment_address(
|
|||||||
hostname = result.hostname
|
hostname = result.hostname
|
||||||
port = result.port
|
port = result.port
|
||||||
meta = meta.copy()
|
meta = meta.copy()
|
||||||
meta["flake_attr"] = machine_name
|
|
||||||
return Host(
|
return Host(
|
||||||
hostname,
|
hostname,
|
||||||
user=user,
|
user=user,
|
||||||
port=port,
|
port=port,
|
||||||
command_prefix=machine_name,
|
command_prefix=machine_name,
|
||||||
|
forward_agent=forward_agent,
|
||||||
meta=meta,
|
meta=meta,
|
||||||
ssh_options=options,
|
ssh_options=options,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@overload
|
|
||||||
def run(
|
|
||||||
cmd: list[str] | str,
|
|
||||||
text: Literal[True] = ...,
|
|
||||||
stdout: FILE = ...,
|
|
||||||
stderr: FILE = ...,
|
|
||||||
extra_env: dict[str, str] = ...,
|
|
||||||
cwd: None | str | Path = ...,
|
|
||||||
check: bool = ...,
|
|
||||||
) -> subprocess.CompletedProcess[str]:
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
@overload
|
|
||||||
def run(
|
|
||||||
cmd: list[str] | str,
|
|
||||||
text: Literal[False],
|
|
||||||
stdout: FILE = ...,
|
|
||||||
stderr: FILE = ...,
|
|
||||||
extra_env: dict[str, str] = ...,
|
|
||||||
cwd: None | str | Path = ...,
|
|
||||||
check: bool = ...,
|
|
||||||
) -> subprocess.CompletedProcess[bytes]:
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
def run(
|
|
||||||
cmd: list[str] | str,
|
|
||||||
text: bool = True,
|
|
||||||
stdout: FILE = None,
|
|
||||||
stderr: FILE = None,
|
|
||||||
extra_env: dict[str, str] = {},
|
|
||||||
cwd: None | str | Path = None,
|
|
||||||
check: bool = True,
|
|
||||||
) -> subprocess.CompletedProcess[Any]:
|
|
||||||
"""
|
|
||||||
Run command locally
|
|
||||||
|
|
||||||
@cmd if this parameter is a string the command is interpreted as a shell command,
|
|
||||||
otherwise if it is a list, than the first list element is the command
|
|
||||||
and the remaining list elements are passed as arguments to the
|
|
||||||
command.
|
|
||||||
@text when true, file objects for stdout and stderr are opened in text mode.
|
|
||||||
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
|
|
||||||
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
|
|
||||||
@extra_env environment variables to override whe running the command
|
|
||||||
@cwd current working directory to run the process in
|
|
||||||
@check If check is true, and the process exits with a non-zero exit code, a
|
|
||||||
CalledProcessError exception will be raised. Attributes of that exception
|
|
||||||
hold the arguments, the exit code, and stdout and stderr if they were
|
|
||||||
captured.
|
|
||||||
"""
|
|
||||||
if isinstance(cmd, list):
|
|
||||||
info("$ " + " ".join(cmd))
|
|
||||||
else:
|
|
||||||
info(f"$ {cmd}")
|
|
||||||
env = os.environ.copy()
|
|
||||||
env.update(extra_env)
|
|
||||||
|
|
||||||
return subprocess.run(
|
|
||||||
cmd,
|
|
||||||
stdout=stdout,
|
|
||||||
stderr=stderr,
|
|
||||||
env=env,
|
|
||||||
cwd=cwd,
|
|
||||||
check=check,
|
|
||||||
shell=not isinstance(cmd, list),
|
|
||||||
text=text,
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -1,32 +1,11 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from clan_cli.ssh import Host, HostGroup, run
|
from clan_cli.ssh import Host, HostGroup
|
||||||
|
|
||||||
|
|
||||||
def test_run() -> None:
|
|
||||||
p = run("echo hello")
|
|
||||||
assert p.stdout is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_run_failure() -> None:
|
|
||||||
p = run("exit 1", check=False)
|
|
||||||
assert p.returncode == 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
p = run("exit 1")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
assert False, "Command should have raised an error"
|
|
||||||
|
|
||||||
|
|
||||||
hosts = HostGroup([Host("some_host")])
|
hosts = HostGroup([Host("some_host")])
|
||||||
|
|
||||||
|
|
||||||
def test_run_environment() -> None:
|
def test_run_environment() -> None:
|
||||||
p1 = run("echo $env_var", stdout=subprocess.PIPE, extra_env=dict(env_var="true"))
|
|
||||||
assert p1.stdout == "true\n"
|
|
||||||
|
|
||||||
p2 = hosts.run_local(
|
p2 = hosts.run_local(
|
||||||
"echo $env_var", extra_env=dict(env_var="true"), stdout=subprocess.PIPE
|
"echo $env_var", extra_env=dict(env_var="true"), stdout=subprocess.PIPE
|
||||||
)
|
)
|
||||||
@@ -38,17 +17,6 @@ def test_run_environment() -> None:
|
|||||||
assert "env_var=true" in p3[0].result.stdout
|
assert "env_var=true" in p3[0].result.stdout
|
||||||
|
|
||||||
|
|
||||||
def test_run_non_shell() -> None:
|
|
||||||
p = run(["echo", "$hello"], stdout=subprocess.PIPE)
|
|
||||||
assert p.stdout == "$hello\n"
|
|
||||||
|
|
||||||
|
|
||||||
def test_run_stderr_stdout() -> None:
|
|
||||||
p = run("echo 1; echo 2 >&2", stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
||||||
assert p.stdout == "1\n"
|
|
||||||
assert p.stderr == "2\n"
|
|
||||||
|
|
||||||
|
|
||||||
def test_run_local() -> None:
|
def test_run_local() -> None:
|
||||||
hosts.run_local("echo hello")
|
hosts.run_local("echo hello")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user