clan-cli: split backups into cli and lib

This commit is contained in:
lassulus
2025-05-21 13:46:25 +02:00
parent 70db1aae10
commit 44e3df3ebf
7 changed files with 168 additions and 155 deletions

View File

@@ -1,6 +1,7 @@
import argparse
import logging
from clan_lib.backups.create import create_backup
from clan_lib.errors import ClanError
from clan_cli.completions import (
@@ -13,36 +14,6 @@ from clan_cli.machines.machines import Machine
log = logging.getLogger(__name__)
def create_backup(machine: Machine, provider: str | None = None) -> None:
machine.info(f"creating backup for {machine.name}")
backup_scripts = machine.eval_nix("config.clan.core.backups")
if provider is None:
if not backup_scripts["providers"]:
msg = "No providers specified"
raise ClanError(msg)
with machine.target_host() as host:
for provider in backup_scripts["providers"]:
proc = host.run(
[backup_scripts["providers"][provider]["create"]],
)
if proc.returncode != 0:
msg = "failed to start backup"
raise ClanError(msg)
print("successfully started backup")
else:
if provider not in backup_scripts["providers"]:
msg = f"provider {provider} not found"
raise ClanError(msg)
with machine.target_host() as host:
proc = host.run(
[backup_scripts["providers"][provider]["create"]],
)
if proc.returncode != 0:
msg = "failed to start backup"
raise ClanError(msg)
print("successfully started backup")
def create_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"

View File

@@ -1,8 +1,6 @@
import argparse
import json
from dataclasses import dataclass
from clan_lib.cmd import Log, RunOpts
from clan_lib.backups.list import list_backups
from clan_lib.errors import ClanError
from clan_cli.completions import (
@@ -11,54 +9,6 @@ from clan_cli.completions import (
complete_machines,
)
from clan_cli.machines.machines import Machine
from clan_cli.ssh.host import Host
@dataclass
class Backup:
name: str
job_name: str | None = None
def list_provider(machine: Machine, host: Host, provider: str) -> list[Backup]:
results = []
backup_metadata = machine.eval_nix("config.clan.core.backups")
list_command = backup_metadata["providers"][provider]["list"]
proc = host.run(
[list_command],
RunOpts(log=Log.NONE, check=False),
)
if proc.returncode != 0:
# TODO this should be a warning, only raise exception if no providers succeed
msg = f"Failed to list backups for provider {provider}:"
msg += f"\n{list_command} exited with {proc.returncode}"
if proc.stderr:
msg += f"\nerror output: {proc.stderr}"
raise ClanError(msg)
try:
parsed_json = json.loads(proc.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse json output from provider {provider}:\n{proc.stdout}"
raise ClanError(msg) from e
for archive in parsed_json:
results.append(Backup(name=archive["name"], job_name=archive.get("job_name")))
return results
def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
backup_metadata = machine.eval_nix("config.clan.core.backups")
results = []
with machine.target_host() as host:
if provider is None:
for _provider in backup_metadata["providers"]:
results += list_provider(machine, host, _provider)
else:
results += list_provider(machine, host, provider)
return results
def list_command(args: argparse.Namespace) -> None:

View File

@@ -1,6 +1,6 @@
import argparse
from clan_lib.cmd import Log, RunOpts
from clan_lib.backups.restore import restore_backup
from clan_lib.errors import ClanError
from clan_cli.completions import (
@@ -9,79 +9,6 @@ from clan_cli.completions import (
complete_machines,
)
from clan_cli.machines.machines import Machine
from clan_cli.ssh.host import Host
def restore_service(
machine: Machine, host: Host, name: str, provider: str, service: str
) -> None:
backup_metadata = machine.eval_nix("config.clan.core.backups")
backup_folders = machine.eval_nix("config.clan.core.state")
if service not in backup_folders:
msg = f"Service {service} not found in configuration. Available services are: {', '.join(backup_folders.keys())}"
raise ClanError(msg)
folders = backup_folders[service]["folders"]
env = {}
env["NAME"] = name
# FIXME: If we have too many folder this might overflow the stack.
env["FOLDERS"] = ":".join(set(folders))
if pre_restore := backup_folders[service]["preRestoreCommand"]:
proc = host.run(
[pre_restore],
RunOpts(log=Log.STDERR),
extra_env=env,
)
if proc.returncode != 0:
msg = f"failed to run preRestoreCommand: {pre_restore}, error was: {proc.stdout}"
raise ClanError(msg)
proc = host.run(
[backup_metadata["providers"][provider]["restore"]],
RunOpts(log=Log.STDERR),
extra_env=env,
)
if proc.returncode != 0:
msg = f"failed to restore backup: {backup_metadata['providers'][provider]['restore']}"
raise ClanError(msg)
if post_restore := backup_folders[service]["postRestoreCommand"]:
proc = host.run(
[post_restore],
RunOpts(log=Log.STDERR),
extra_env=env,
)
if proc.returncode != 0:
msg = f"failed to run postRestoreCommand: {post_restore}, error was: {proc.stdout}"
raise ClanError(msg)
def restore_backup(
machine: Machine,
provider: str,
name: str,
service: str | None = None,
) -> None:
errors = []
with machine.target_host() as host:
if service is None:
backup_folders = machine.eval_nix("config.clan.core.state")
for _service in backup_folders:
try:
restore_service(machine, host, name, provider, _service)
except ClanError as e:
errors.append(f"{_service}: {e}")
else:
try:
restore_service(machine, host, name, provider, service)
except ClanError as e:
errors.append(f"{service}: {e}")
if errors:
raise ClanError(
"Restore failed for the following services:\n" + "\n".join(errors)
)
def restore_command(args: argparse.Namespace) -> None:

View File

@@ -0,0 +1,33 @@
from clan_cli.machines.machines import Machine
from clan_lib.errors import ClanError
def create_backup(machine: Machine, provider: str | None = None) -> None:
machine.info(f"creating backup for {machine.name}")
backup_scripts = machine.eval_nix("config.clan.core.backups")
if provider is None:
if not backup_scripts["providers"]:
msg = "No providers specified"
raise ClanError(msg)
with machine.target_host() as host:
for provider in backup_scripts["providers"]:
proc = host.run(
[backup_scripts["providers"][provider]["create"]],
)
if proc.returncode != 0:
msg = "failed to start backup"
raise ClanError(msg)
print("successfully started backup")
else:
if provider not in backup_scripts["providers"]:
msg = f"provider {provider} not found"
raise ClanError(msg)
with machine.target_host() as host:
proc = host.run(
[backup_scripts["providers"][provider]["create"]],
)
if proc.returncode != 0:
msg = "failed to start backup"
raise ClanError(msg)
print("successfully started backup")

View File

@@ -0,0 +1,55 @@
import json
from dataclasses import dataclass
from clan_cli.machines.machines import Machine
from clan_cli.ssh.host import Host
from clan_lib.cmd import Log, RunOpts
from clan_lib.errors import ClanError
@dataclass
class Backup:
name: str
job_name: str | None = None
def list_provider(machine: Machine, host: Host, provider: str) -> list[Backup]:
results = []
backup_metadata = machine.eval_nix("config.clan.core.backups")
list_command = backup_metadata["providers"][provider]["list"]
proc = host.run(
[list_command],
RunOpts(log=Log.NONE, check=False),
)
if proc.returncode != 0:
# TODO this should be a warning, only raise exception if no providers succeed
msg = f"Failed to list backups for provider {provider}:"
msg += f"\n{list_command} exited with {proc.returncode}"
if proc.stderr:
msg += f"\nerror output: {proc.stderr}"
raise ClanError(msg)
try:
parsed_json = json.loads(proc.stdout)
except json.JSONDecodeError as e:
msg = f"Failed to parse json output from provider {provider}:\n{proc.stdout}"
raise ClanError(msg) from e
for archive in parsed_json:
results.append(Backup(name=archive["name"], job_name=archive.get("job_name")))
return results
def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
backup_metadata = machine.eval_nix("config.clan.core.backups")
results = []
with machine.target_host() as host:
if provider is None:
for _provider in backup_metadata["providers"]:
results += list_provider(machine, host, _provider)
else:
results += list_provider(machine, host, provider)
return results

View File

@@ -0,0 +1,77 @@
from clan_cli.machines.machines import Machine
from clan_cli.ssh.host import Host
from clan_lib.cmd import Log, RunOpts
from clan_lib.errors import ClanError
def restore_service(
machine: Machine, host: Host, name: str, provider: str, service: str
) -> None:
backup_metadata = machine.eval_nix("config.clan.core.backups")
backup_folders = machine.eval_nix("config.clan.core.state")
if service not in backup_folders:
msg = f"Service {service} not found in configuration. Available services are: {', '.join(backup_folders.keys())}"
raise ClanError(msg)
folders = backup_folders[service]["folders"]
env = {}
env["NAME"] = name
# FIXME: If we have too many folder this might overflow the stack.
env["FOLDERS"] = ":".join(set(folders))
if pre_restore := backup_folders[service]["preRestoreCommand"]:
proc = host.run(
[pre_restore],
RunOpts(log=Log.STDERR),
extra_env=env,
)
if proc.returncode != 0:
msg = f"failed to run preRestoreCommand: {pre_restore}, error was: {proc.stdout}"
raise ClanError(msg)
proc = host.run(
[backup_metadata["providers"][provider]["restore"]],
RunOpts(log=Log.STDERR),
extra_env=env,
)
if proc.returncode != 0:
msg = f"failed to restore backup: {backup_metadata['providers'][provider]['restore']}"
raise ClanError(msg)
if post_restore := backup_folders[service]["postRestoreCommand"]:
proc = host.run(
[post_restore],
RunOpts(log=Log.STDERR),
extra_env=env,
)
if proc.returncode != 0:
msg = f"failed to run postRestoreCommand: {post_restore}, error was: {proc.stdout}"
raise ClanError(msg)
def restore_backup(
machine: Machine,
provider: str,
name: str,
service: str | None = None,
) -> None:
errors = []
with machine.target_host() as host:
if service is None:
backup_folders = machine.eval_nix("config.clan.core.state")
for _service in backup_folders:
try:
restore_service(machine, host, name, provider, _service)
except ClanError as e:
errors.append(f"{_service}: {e}")
else:
try:
restore_service(machine, host, name, provider, service)
except ClanError as e:
errors.append(f"{service}: {e}")
if errors:
raise ClanError(
"Restore failed for the following services:\n" + "\n".join(errors)
)