Merge branch 'main' of git.clan.lol:clan/clan-core

This commit is contained in:
Luis-Hebendanz
2023-08-09 17:27:07 +02:00
58 changed files with 2171 additions and 554 deletions

View File

@@ -1,6 +1,8 @@
name: build name: build
on: on:
pull_request:
push: push:
branches: main
jobs: jobs:
test: test:
runs-on: nix runs-on: nix

View File

@@ -15,7 +15,7 @@
self'.packages.merge-after-ci self'.packages.merge-after-ci
]; ];
shellHook = '' shellHook = ''
ln -sf ../../scripts/pre-commit .git/hooks/pre-commit ln -sf ../../scripts/pre-commit "$(git rev-parse --show-toplevel)/.git/hooks/pre-commit"
''; '';
}; };
}; };

12
flake.lock generated
View File

@@ -7,11 +7,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1690739034, "lastModified": 1691339339,
"narHash": "sha256-roW02IaiQ3gnEEDMCDWL5YyN+C4nBf/te6vfL7rG0jk=", "narHash": "sha256-wNiTX1c3kZy7BSxWodbn+mem1zCx1wIsdDRDFcIfOkc=",
"owner": "nix-community", "owner": "nix-community",
"repo": "disko", "repo": "disko",
"rev": "4015740375676402a2ee6adebc3c30ea625b9a94", "rev": "493b347d8fffa6912afb8d89b91703cd40ff6038",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -98,11 +98,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1690881714, "lastModified": 1691276849,
"narHash": "sha256-h/nXluEqdiQHs1oSgkOOWF+j8gcJMWhwnZ9PFabN6q0=", "narHash": "sha256-RNnrzxhW38SOFIF6TY/WaX7VB3PCkYFEeRE5YZU+wHw=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "9e1960bc196baf6881340d53dccb203a951745a2", "rev": "5faab29808a2d72f4ee0c44c8e850e4e6ada972f",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@@ -22,17 +22,13 @@
"aarch64-linux" "aarch64-linux"
]; ];
imports = [ imports = [
# ./checks/flake-module.nix
./devShell.nix ./devShell.nix
./formatter.nix ./formatter.nix
./templates/flake-module.nix ./templates/flake-module.nix
./templates/python-project/flake-module.nix
./pkgs/flake-module.nix ./pkgs/flake-module.nix
./pkgs/clan-cli/flake-module.nix
./pkgs/installer/flake-module.nix
./pkgs/ui/flake-module.nix
./lib/flake-module.nix ./lib/flake-module.nix
({ self, lib, ... }: { ({ self, lib, ... }: {
flake.nixosModules = lib.mapAttrs (_: nix: { imports = [ nix ]; }) (self.lib.findNixFiles ./nixosModules); flake.nixosModules = lib.mapAttrs (_: nix: { imports = [ nix ]; }) (self.lib.findNixFiles ./nixosModules);

View File

@@ -1,6 +1,5 @@
{ lib, ... }: { lib, ... }:
let {
clanLib = {
findNixFiles = folder: findNixFiles = folder:
lib.mapAttrs' lib.mapAttrs'
(name: type: (name: type:
@@ -12,6 +11,6 @@ let
lib.nameValuePair (lib.removeSuffix ".nix" name) "${folder}/${name}" lib.nameValuePair (lib.removeSuffix ".nix" name) "${folder}/${name}"
) )
(builtins.readDir folder); (builtins.readDir folder);
};
in jsonschema = import ./jsonschema { inherit lib; };
clanLib }

View File

@@ -1,5 +1,8 @@
{ lib { lib
, ... , ...
}: { }: {
imports = [
./jsonschema/flake-module.nix
];
flake.lib = import ./default.nix { inherit lib; }; flake.lib = import ./default.nix { inherit lib; };
} }

View File

@@ -0,0 +1,29 @@
{
perSystem = { pkgs, self', ... }: {
checks = {
# check if the `clan config` example jsonschema and data is valid
lib-jsonschema-example-valid = pkgs.runCommand "lib-jsonschema-example-valid" { } ''
echo "Checking that example-schema.json is valid"
${pkgs.check-jsonschema}/bin/check-jsonschema \
--check-metaschema ${./.}/example-schema.json
echo "Checking that example-data.json is valid according to example-schema.json"
${pkgs.check-jsonschema}/bin/check-jsonschema \
--schemafile ${./.}/example-schema.json \
${./.}/example-data.json
touch $out
'';
# check if the `clan config` nix jsonschema converter unit tests succeed
lib-jsonschema-nix-unit-tests = pkgs.runCommand "lib-jsonschema-nix-unit-tests" { } ''
export NIX_PATH=nixpkgs=${pkgs.path}
${self'.packages.nix-unit}/bin/nix-unit \
${./.}/test.nix \
--eval-store $(realpath .)
touch $out
'';
};
};
}

View File

@@ -1,6 +1,6 @@
# run these tests via `nix-unit ./test.nix` # run these tests via `nix-unit ./test.nix`
{ lib ? (import <nixpkgs> { }).lib { lib ? (import <nixpkgs> { }).lib
, slib ? import ../../clan_cli/config/schema-lib.nix { inherit lib; } , slib ? import ./. { inherit lib; }
}: }:
{ {
parseOption = import ./test_parseOption.nix { inherit lib slib; }; parseOption = import ./test_parseOption.nix { inherit lib slib; };

View File

@@ -1,7 +1,7 @@
# tests for the nixos options to jsonschema converter # tests for the nixos options to jsonschema converter
# run these tests via `nix-unit ./test.nix` # run these tests via `nix-unit ./test.nix`
{ lib ? (import <nixpkgs> { }).lib { lib ? (import <nixpkgs> { }).lib
, slib ? import ../../clan_cli/config/schema-lib.nix { inherit lib; } , slib ? import ./. { inherit lib; }
}: }:
let let
description = "Test Description"; description = "Test Description";

View File

@@ -1,7 +1,7 @@
# tests for the nixos options to jsonschema converter # tests for the nixos options to jsonschema converter
# run these tests via `nix-unit ./test.nix` # run these tests via `nix-unit ./test.nix`
{ lib ? (import <nixpkgs> { }).lib { lib ? (import <nixpkgs> { }).lib
, slib ? import ../../clan_cli/config/schema-lib.nix { inherit lib; } , slib ? import ./. { inherit lib; }
}: }:
let let
evaledOptions = evaledOptions =

View File

@@ -7,4 +7,4 @@ if type nix_direnv_watch_file &>/dev/null; then
else else
direnv watch flake-module.nix direnv watch flake-module.nix
fi fi
use flake .#clan --builders '' use flake .#clan-cli --builders ''

View File

@@ -1,8 +1,9 @@
import argparse import argparse
import sys import sys
from . import admin, secrets, ssh from . import admin, secrets
from .errors import ClanError from .errors import ClanError
from .ssh import cli as ssh_cli
has_argcomplete = True has_argcomplete = True
try: try:
@@ -27,7 +28,7 @@ def main() -> None:
# warn(f"The config command does not work in the nix sandbox: {e}") # warn(f"The config command does not work in the nix sandbox: {e}")
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine") parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
ssh.register_parser(parser_ssh) ssh_cli.register_parser(parser_ssh)
parser_secrets = subparsers.add_parser("secrets", help="manage secrets") parser_secrets = subparsers.add_parser("secrets", help="manage secrets")
secrets.register_parser(parser_secrets) secrets.register_parser(parser_secrets)

View File

@@ -1,6 +1,7 @@
# !/usr/bin/env python3 # !/usr/bin/env python3
import argparse import argparse
import json import json
import os
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
@@ -8,6 +9,8 @@ from typing import Any, Optional, Type, Union
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
CLAN_FLAKE = os.getenv("CLAN_FLAKE")
class Kwargs: class Kwargs:
def __init__(self) -> None: def __init__(self) -> None:
@@ -27,7 +30,7 @@ def schema_from_module_file(
nix_expr = f""" nix_expr = f"""
let let
lib = import <nixpkgs/lib>; lib = import <nixpkgs/lib>;
slib = import {__file__}/../schema-lib.nix {{inherit lib;}}; slib = import {CLAN_FLAKE}/lib/jsonschema.nix {{inherit lib;}};
in in
slib.parseModule {absolute_path} slib.parseModule {absolute_path}
""" """

View File

@@ -2,6 +2,7 @@
import argparse import argparse
from .groups import register_groups_parser from .groups import register_groups_parser
from .import_sops import register_import_sops_parser
from .machines import register_machines_parser from .machines import register_machines_parser
from .secrets import register_secrets_parser from .secrets import register_secrets_parser
from .users import register_users_parser from .users import register_users_parser
@@ -25,4 +26,7 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = subparser.add_parser("machines", help="manage machines") machines_parser = subparser.add_parser("machines", help="manage machines")
register_machines_parser(machines_parser) register_machines_parser(machines_parser)
import_sops_parser = subparser.add_parser("import-sops", help="import a sops file")
register_import_sops_parser(import_sops_parser)
register_secrets_parser(subparser) register_secrets_parser(subparser)

View File

@@ -24,12 +24,14 @@ sops_machines_folder = gen_sops_subfolder("machines")
sops_groups_folder = gen_sops_subfolder("groups") sops_groups_folder = gen_sops_subfolder("groups")
def list_objects(path: Path, is_valid: Callable[[str], bool]) -> None: def list_objects(path: Path, is_valid: Callable[[str], bool]) -> list[str]:
objs: list[str] = []
if not path.exists(): if not path.exists():
return return objs
for f in os.listdir(path): for f in os.listdir(path):
if is_valid(f): if is_valid(f):
print(f) objs.append(f)
return objs
def remove_object(path: Path, name: str) -> None: def remove_object(path: Path, name: str) -> None:

View File

@@ -23,35 +23,68 @@ def users_folder(group: str) -> Path:
return sops_groups_folder() / group / "users" return sops_groups_folder() / group / "users"
# TODO: make this a tree class Group:
def list_command(args: argparse.Namespace) -> None: def __init__(self, name: str, machines: list[str], users: list[str]) -> None:
self.name = name
self.machines = machines
self.users = users
def list_groups() -> list[Group]:
groups: list[Group] = []
folder = sops_groups_folder() folder = sops_groups_folder()
if not folder.exists(): if not folder.exists():
return return groups
for group in os.listdir(folder): for name in os.listdir(folder):
group_folder = folder / group group_folder = folder / name
if not group_folder.is_dir(): if not group_folder.is_dir():
continue continue
print(group) machines_path = machines_folder(name)
machines = machines_folder(group) machines = []
if machines.is_dir(): if machines_path.is_dir():
print("machines:") for f in machines_path.iterdir():
for f in machines.iterdir():
if validate_hostname(f.name): if validate_hostname(f.name):
print(f.name) machines.append(f.name)
users = users_folder(group) users_path = users_folder(name)
if users.is_dir(): users = []
print("users:") if users_path.is_dir():
for f in users.iterdir(): for f in users_path.iterdir():
if VALID_USER_NAME.match(f.name): if VALID_USER_NAME.match(f.name):
print(f) users.append(f.name)
groups.append(Group(name, machines, users))
return groups
def list_command(args: argparse.Namespace) -> None:
for group in list_groups():
print(group.name)
if group.machines:
print("machines:")
for machine in group.machines:
print(f" {machine}")
if group.users:
print("users:")
for user in group.users:
print(f" {user}")
print()
def list_directory(directory: Path) -> str:
if not directory.exists():
return "{directory} does not exist"
msg = f"\n{directory} contains:"
for f in directory.iterdir():
msg += f"\n {f.name}"
return msg
def add_member(group_folder: Path, source_folder: Path, name: str) -> None: def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
source = source_folder / name source = source_folder / name
if not source.exists(): if not source.exists():
raise ClanError(f"{name} does not exist in {source_folder}") msg = f"{name} does not exist in {source_folder}"
msg += list_directory(source_folder)
raise ClanError(msg)
group_folder.mkdir(parents=True, exist_ok=True) group_folder.mkdir(parents=True, exist_ok=True)
user_target = group_folder / name user_target = group_folder / name
if user_target.exists(): if user_target.exists():
@@ -60,13 +93,15 @@ def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
f"Cannot add user {name}. {user_target} exists but is not a symlink" f"Cannot add user {name}. {user_target} exists but is not a symlink"
) )
os.remove(user_target) os.remove(user_target)
user_target.symlink_to(source) user_target.symlink_to(os.path.relpath(source, user_target.parent))
def remove_member(group_folder: Path, name: str) -> None: def remove_member(group_folder: Path, name: str) -> None:
target = group_folder / name target = group_folder / name
if not target.exists(): if not target.exists():
raise ClanError(f"{name} does not exist in group in {group_folder}") msg = f"{name} does not exist in group in {group_folder}"
msg += list_directory(group_folder)
raise ClanError(msg)
os.remove(target) os.remove(target)
if len(os.listdir(group_folder)) == 0: if len(os.listdir(group_folder)) == 0:
@@ -76,38 +111,56 @@ def remove_member(group_folder: Path, name: str) -> None:
os.rmdir(group_folder.parent) os.rmdir(group_folder.parent)
def add_user(group: str, name: str) -> None:
add_member(users_folder(group), sops_users_folder(), name)
def add_user_command(args: argparse.Namespace) -> None: def add_user_command(args: argparse.Namespace) -> None:
add_member(users_folder(args.group), sops_users_folder(), args.user) add_user(args.group, args.user)
def remove_user(group: str, name: str) -> None:
remove_member(users_folder(group), name)
def remove_user_command(args: argparse.Namespace) -> None: def remove_user_command(args: argparse.Namespace) -> None:
remove_member(users_folder(args.group), args.user) remove_user(args.group, args.user)
def add_machine(group: str, name: str) -> None:
add_member(machines_folder(group), sops_machines_folder(), name)
def add_machine_command(args: argparse.Namespace) -> None: def add_machine_command(args: argparse.Namespace) -> None:
add_member( add_machine(args.group, args.machine)
machines_folder(args.group),
sops_machines_folder(),
args.machine, def remove_machine(group: str, name: str) -> None:
) remove_member(machines_folder(group), name)
def remove_machine_command(args: argparse.Namespace) -> None: def remove_machine_command(args: argparse.Namespace) -> None:
remove_member(machines_folder(args.group), args.machine) remove_machine(args.group, args.machine)
def add_group_argument(parser: argparse.ArgumentParser) -> None: def add_group_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("group", help="the name of the secret", type=group_name_type) parser.add_argument("group", help="the name of the secret", type=group_name_type)
def add_secret(group: str, name: str) -> None:
secrets.allow_member(secrets.groups_folder(name), sops_groups_folder(), group)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
secrets.allow_member( add_secret(args.group, args.secret)
secrets.groups_folder(args.group), sops_machines_folder(), args.group
)
def remove_secret(group: str, name: str) -> None:
secrets.disallow_member(secrets.groups_folder(name), group)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
secrets.disallow_member(secrets.groups_folder(args.group), args.group) remove_secret(args.group, args.secret)
def register_groups_parser(parser: argparse.ArgumentParser) -> None: def register_groups_parser(parser: argparse.ArgumentParser) -> None:

View File

@@ -0,0 +1,93 @@
import argparse
import json
import subprocess
import sys
from pathlib import Path
from ..errors import ClanError
from ..nix import nix_shell
from .secrets import encrypt_secret, sops_secrets_folder
def import_sops(args: argparse.Namespace) -> None:
file = Path(args.sops_file)
file_type = file.suffix
try:
file.read_text()
except OSError as e:
raise ClanError(f"Could not read file {file}: {e}") from e
if file_type == ".yaml":
cmd = ["sops"]
if args.input_type:
cmd += ["--input-type", args.input_type]
cmd += ["--output-type", "json", "--decrypt", args.sops_file]
cmd = nix_shell(["sops"], cmd)
try:
res = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise ClanError(f"Could not import sops file {file}: {e}") from e
secrets = json.loads(res.stdout)
for k, v in secrets.items():
k = args.prefix + k
if not isinstance(v, str):
print(
f"WARNING: {k} is not a string but {type(v)}, skipping",
file=sys.stderr,
)
continue
if (sops_secrets_folder() / k).exists():
print(
f"WARNING: {k} already exists, skipping",
file=sys.stderr,
)
continue
encrypt_secret(
sops_secrets_folder() / k,
v,
add_groups=args.group,
add_machines=args.machine,
add_users=args.user,
)
def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--input-type",
type=str,
default=None,
help="the input type of the sops file (yaml, json, ...). If not specified, it will be guessed from the file extension",
)
parser.add_argument(
"--group",
type=str,
action="append",
default=[],
help="the group to import the secrets to",
)
parser.add_argument(
"--machine",
type=str,
action="append",
default=[],
help="the machine to import the secrets to",
)
parser.add_argument(
"--user",
type=str,
action="append",
default=[],
help="the user to import the secrets to",
)
parser.add_argument(
"--prefix",
type=str,
default="",
help="the prefix to use for the secret names",
)
parser.add_argument(
"sops_file",
type=str,
help="the sops file to import (- for stdin)",
)
parser.set_defaults(func=import_sops)

View File

@@ -11,26 +11,48 @@ from .types import (
) )
def list_command(args: argparse.Namespace) -> None: def add_machine(name: str, key: str, force: bool) -> None:
list_objects(sops_machines_folder(), lambda x: validate_hostname(x)) write_key(sops_machines_folder() / name, key, force)
def add_command(args: argparse.Namespace) -> None: def remove_machine(name: str) -> None:
write_key(sops_machines_folder() / args.machine, args.key, args.force) remove_object(sops_machines_folder(), name)
def remove_command(args: argparse.Namespace) -> None: def list_machines() -> list[str]:
remove_object(sops_machines_folder(), args.machine) return list_objects(sops_machines_folder(), lambda x: validate_hostname(x))
def add_secret_command(args: argparse.Namespace) -> None: def add_secret(machine: str, secret: str) -> None:
secrets.allow_member( secrets.allow_member(
secrets.machines_folder(args.group), sops_machines_folder(), args.machine secrets.machines_folder(secret), sops_machines_folder(), machine
) )
def remove_secret(machine: str, secret: str) -> None:
secrets.disallow_member(secrets.machines_folder(secret), machine)
def list_command(args: argparse.Namespace) -> None:
lst = list_machines()
if len(lst) > 0:
print("\n".join(lst))
def add_command(args: argparse.Namespace) -> None:
add_machine(args.machine, args.key, args.force)
def remove_command(args: argparse.Namespace) -> None:
remove_machine(args.machine)
def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.machine, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
secrets.disallow_member(secrets.machines_folder(args.group), args.machine) remove_secret(args.machine, args.secret)
def register_machines_parser(parser: argparse.ArgumentParser) -> None: def register_machines_parser(parser: argparse.ArgumentParser) -> None:

View File

@@ -2,99 +2,103 @@ import argparse
import getpass import getpass
import os import os
import shutil import shutil
import subprocess
import sys import sys
from io import StringIO
from pathlib import Path from pathlib import Path
from typing import IO from typing import IO, Union
from .. import tty from .. import tty
from ..errors import ClanError from ..errors import ClanError
from ..nix import nix_shell from .folders import (
from .folders import list_objects, sops_secrets_folder list_objects,
from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key sops_groups_folder,
sops_machines_folder,
sops_secrets_folder,
sops_users_folder,
)
from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_keys
from .types import VALID_SECRET_NAME, secret_name_type from .types import VALID_SECRET_NAME, secret_name_type
def list_command(args: argparse.Namespace) -> None: def collect_keys_for_type(folder: Path) -> set[str]:
list_objects( if not folder.exists():
sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None return set()
keys = set()
for p in folder.iterdir():
if not p.is_symlink():
continue
try:
target = p.resolve()
except FileNotFoundError:
tty.warn(f"Ignoring broken symlink {p}")
continue
kind = target.parent.name
if folder.name != kind:
tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}")
continue
keys.add(read_key(target))
return keys
def collect_keys_for_path(path: Path) -> set[str]:
keys = set([])
keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users"))
groups = path / "groups"
if not groups.is_dir():
return keys
for group in groups.iterdir():
keys.update(collect_keys_for_type(group / "machines"))
keys.update(collect_keys_for_type(group / "users"))
return keys
def encrypt_secret(
secret: Path,
value: Union[IO[str], str],
add_users: list[str] = [],
add_machines: list[str] = [],
add_groups: list[str] = [],
) -> None:
key = ensure_sops_key()
keys = set([])
for user in add_users:
allow_member(users_folder(secret.name), sops_users_folder(), user, False)
for machine in add_machines:
allow_member(
machines_folder(secret.name), sops_machines_folder(), machine, False
) )
for group in add_groups:
allow_member(groups_folder(secret.name), sops_groups_folder(), group, False)
def get_command(args: argparse.Namespace) -> None: keys = collect_keys_for_path(secret)
secret: str = args.secret
ensure_sops_key()
secret_path = sops_secrets_folder() / secret / "secret"
if not secret_path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
cmd = nix_shell(["sops"], ["sops", "--decrypt", str(secret_path)])
res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
print(res.stdout, end="")
if key.pubkey not in keys:
keys.add(key.pubkey)
allow_member(
users_folder(secret.name), sops_users_folder(), key.username, False
)
def encrypt_secret(key: SopsKey, secret: Path, value: IO[str]) -> None:
keys = set([key.pubkey])
for kind in ["users", "machines", "groups"]:
if not (sops_secrets_folder() / kind).is_dir():
continue
k = read_key(sops_secrets_folder() / kind)
keys.add(k)
encrypt_file(secret / "secret", value, list(sorted(keys))) encrypt_file(secret / "secret", value, list(sorted(keys)))
def set_command(args: argparse.Namespace) -> None: def remove_secret(secret: str) -> None:
key = ensure_sops_key()
secret_value = os.environ.get("SOPS_NIX_SECRET")
if secret_value:
encrypt_secret(key, sops_secrets_folder() / args.secret, StringIO(secret_value))
elif tty.is_interactive():
secret = getpass.getpass(prompt="Paste your secret: ")
encrypt_secret(key, sops_secrets_folder() / args.secret, StringIO(secret))
else:
encrypt_secret(key, sops_secrets_folder() / args.secret, sys.stdin)
def remove_command(args: argparse.Namespace) -> None:
secret: str = args.secret
path = sops_secrets_folder() / secret path = sops_secrets_folder() / secret
if not path.exists(): if not path.exists():
raise ClanError(f"Secret '{secret}' does not exist") raise ClanError(f"Secret '{secret}' does not exist")
shutil.rmtree(path) shutil.rmtree(path)
def remove_command(args: argparse.Namespace) -> None:
remove_secret(args.secret)
def add_secret_argument(parser: argparse.ArgumentParser) -> None: def add_secret_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("secret", help="the name of the secret", type=secret_name_type) parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
def allow_member(group_folder: Path, source_folder: Path, name: str) -> None:
source = source_folder / name
if not source.exists():
raise ClanError(f"{name} does not exist in {source_folder}")
group_folder.mkdir(parents=True, exist_ok=True)
user_target = group_folder / name
if user_target.exists():
if not user_target.is_symlink():
raise ClanError(
f"Cannot add user {name}. {user_target} exists but is not a symlink"
)
os.remove(user_target)
user_target.symlink_to(source)
def disallow_member(group_folder: Path, name: str) -> None:
target = group_folder / name
if not target.exists():
raise ClanError(f"{name} does not exist in group in {group_folder}")
os.remove(target)
if len(os.listdir(group_folder)) == 0:
os.rmdir(group_folder)
if len(os.listdir(group_folder.parent)) == 0:
os.rmdir(group_folder.parent)
def machines_folder(group: str) -> Path: def machines_folder(group: str) -> Path:
return sops_secrets_folder() / group / "machines" return sops_secrets_folder() / group / "machines"
@@ -107,6 +111,106 @@ def groups_folder(group: str) -> Path:
return sops_secrets_folder() / group / "groups" return sops_secrets_folder() / group / "groups"
def list_directory(directory: Path) -> str:
if not directory.exists():
return "{directory} does not exist"
msg = f"\n{directory} contains:"
for f in directory.iterdir():
msg += f"\n {f.name}"
return msg
def allow_member(
group_folder: Path, source_folder: Path, name: str, do_update_keys: bool = True
) -> None:
source = source_folder / name
if not source.exists():
msg = f"{name} does not exist in {source_folder}"
msg += list_directory(source_folder)
raise ClanError(msg)
group_folder.mkdir(parents=True, exist_ok=True)
user_target = group_folder / name
if user_target.exists():
if not user_target.is_symlink():
raise ClanError(
f"Cannot add user {name}. {user_target} exists but is not a symlink"
)
os.remove(user_target)
user_target.symlink_to(os.path.relpath(source, user_target.parent))
if do_update_keys:
update_keys(
group_folder.parent,
list(sorted(collect_keys_for_path(group_folder.parent))),
)
def disallow_member(group_folder: Path, name: str) -> None:
target = group_folder / name
if not target.exists():
msg = f"{name} does not exist in group in {group_folder}"
msg += list_directory(group_folder)
raise ClanError(msg)
keys = collect_keys_for_path(group_folder.parent)
if len(keys) < 2:
raise ClanError(
f"Cannot remove {name} from {group_folder.parent.name}. No keys left. Use 'clan secrets remove {name}' to remove the secret."
)
os.remove(target)
if len(os.listdir(group_folder)) == 0:
os.rmdir(group_folder)
if len(os.listdir(group_folder.parent)) == 0:
os.rmdir(group_folder.parent)
update_keys(
target.parent.parent, list(sorted(collect_keys_for_path(group_folder.parent)))
)
def list_secrets() -> list[str]:
return list_objects(
sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None
)
def list_command(args: argparse.Namespace) -> None:
lst = list_secrets()
if len(lst) > 0:
print("\n".join(lst))
def decrypt_secret(secret: str) -> str:
ensure_sops_key()
secret_path = sops_secrets_folder() / secret / "secret"
if not secret_path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
return decrypt_file(secret_path)
def get_command(args: argparse.Namespace) -> None:
print(decrypt_secret(args.secret), end="")
def set_command(args: argparse.Namespace) -> None:
env_value = os.environ.get("SOPS_NIX_SECRET")
secret_value: Union[str, IO[str]] = sys.stdin
if env_value:
secret_value = env_value
elif tty.is_interactive():
secret_value = getpass.getpass(prompt="Paste your secret: ")
encrypt_secret(
sops_secrets_folder() / args.secret,
secret_value,
args.user,
args.machine,
args.group,
)
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None: def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
parser_list = subparser.add_parser("list", help="list secrets") parser_list = subparser.add_parser("list", help="list secrets")
parser_list.set_defaults(func=list_command) parser_list.set_defaults(func=list_command)
@@ -117,6 +221,27 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
parser_set = subparser.add_parser("set", help="set a secret") parser_set = subparser.add_parser("set", help="set a secret")
add_secret_argument(parser_set) add_secret_argument(parser_set)
parser_set.add_argument(
"--group",
type=str,
action="append",
default=[],
help="the group to import the secrets to",
)
parser_set.add_argument(
"--machine",
type=str,
action="append",
default=[],
help="the machine to import the secrets to",
)
parser_set.add_argument(
"--user",
type=str,
action="append",
default=[],
help="the user to import the secrets to",
)
parser_set.set_defaults(func=set_command) parser_set.set_defaults(func=set_command)
parser_delete = subparser.add_parser("remove", help="remove a secret") parser_delete = subparser.add_parser("remove", help="remove a secret")

View File

@@ -2,9 +2,10 @@ import json
import os import os
import shutil import shutil
import subprocess import subprocess
from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import IO from typing import IO, Iterator, Union
from .. import tty from .. import tty
from ..dirs import user_config_dir from ..dirs import user_config_dir
@@ -14,8 +15,9 @@ from .folders import sops_users_folder
class SopsKey: class SopsKey:
def __init__(self, pubkey: str) -> None: def __init__(self, pubkey: str, username: str) -> None:
self.pubkey = pubkey self.pubkey = pubkey
self.username = username
def get_public_key(privkey: str) -> str: def get_public_key(privkey: str) -> str:
@@ -51,7 +53,7 @@ def get_user_name(user: str) -> str:
def ensure_user(pub_key: str) -> SopsKey: def ensure_user(pub_key: str) -> SopsKey:
key = SopsKey(pub_key) key = SopsKey(pub_key, username="")
users_folder = sops_users_folder() users_folder = sops_users_folder()
# Check if the public key already exists for any user # Check if the public key already exists for any user
@@ -60,6 +62,7 @@ def ensure_user(pub_key: str) -> SopsKey:
if not user.is_dir(): if not user.is_dir():
continue continue
if read_key(user) == pub_key: if read_key(user) == pub_key:
key.username = user.name
return key return key
# Find a unique user name if the public key is not found # Find a unique user name if the public key is not found
@@ -76,6 +79,8 @@ def ensure_user(pub_key: str) -> SopsKey:
# Add the public key for the user # Add the public key for the user
write_key(users_folder / username, pub_key, False) write_key(users_folder / username, pub_key, False)
key.username = username
return key return key
@@ -100,18 +105,48 @@ def ensure_sops_key() -> SopsKey:
return ensure_user(get_public_key(path.read_text())) return ensure_user(get_public_key(path.read_text()))
def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None: @contextmanager
def sops_manifest(keys: list[str]) -> Iterator[Path]:
with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump(
dict(creation_rules=[dict(key_groups=[dict(age=keys)])]), manifest, indent=2
)
manifest.flush()
yield Path(manifest.name)
def update_keys(secret_path: Path, keys: list[str]) -> None:
with sops_manifest(keys) as manifest:
cmd = nix_shell(
["sops"],
[
"sops",
"--config",
str(manifest),
"updatekeys",
"--yes",
str(secret_path / "secret"),
],
)
subprocess.run(cmd, check=True)
def encrypt_file(
secret_path: Path, content: Union[IO[str], str], keys: list[str]
) -> None:
folder = secret_path.parent folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True) folder.mkdir(parents=True, exist_ok=True)
# hopefully /tmp is written to an in-memory file to avoid leaking secrets # hopefully /tmp is written to an in-memory file to avoid leaking secrets
with NamedTemporaryFile(delete=False) as f: with sops_manifest(keys) as manifest, NamedTemporaryFile(delete=False) as f:
try: try:
with open(f.name, "w") as fd: with open(f.name, "w") as fd:
if isinstance(content, str):
fd.write(content)
else:
shutil.copyfileobj(content, fd) shutil.copyfileobj(content, fd)
args = ["sops"] # we pass an empty manifest to pick up existing configuration of the user
for key in keys: args = ["sops", "--config", str(manifest)]
args.extend(["--age", key])
args.extend(["-i", "--encrypt", str(f.name)]) args.extend(["-i", "--encrypt", str(f.name)])
cmd = nix_shell(["sops"], args) cmd = nix_shell(["sops"], args)
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
@@ -126,6 +161,12 @@ def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None:
pass pass
def decrypt_file(secret_path: Path) -> str:
cmd = nix_shell(["sops"], ["sops", "--decrypt", str(secret_path)])
res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
return res.stdout
def write_key(path: Path, publickey: str, overwrite: bool) -> None: def write_key(path: Path, publickey: str, overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
try: try:

View File

@@ -15,8 +15,28 @@ def add_user(name: str, key: str, force: bool) -> None:
write_key(sops_users_folder() / name, key, force) write_key(sops_users_folder() / name, key, force)
def remove_user(name: str) -> None:
remove_object(sops_users_folder(), name)
def list_users() -> list[str]:
return list_objects(
sops_users_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None
)
def add_secret(user: str, secret: str) -> None:
secrets.allow_member(secrets.users_folder(secret), sops_users_folder(), user)
def remove_secret(user: str, secret: str) -> None:
secrets.disallow_member(secrets.users_folder(secret), user)
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
list_objects(sops_users_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None) lst = list_users()
if len(lst) > 0:
print("\n".join(lst))
def add_command(args: argparse.Namespace) -> None: def add_command(args: argparse.Namespace) -> None:
@@ -24,17 +44,15 @@ def add_command(args: argparse.Namespace) -> None:
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
remove_object(sops_users_folder(), args.user) remove_user(args.user)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
secrets.allow_member( add_secret(args.user, args.secret)
secrets.groups_folder(args.group), sops_users_folder(), args.group
)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
secrets.disallow_member(secrets.groups_folder(args.group), args.group) remove_secret(args.user, args.secret)
def register_users_parser(parser: argparse.ArgumentParser) -> None: def register_users_parser(parser: argparse.ArgumentParser) -> None:
@@ -74,21 +92,10 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
) )
add_secret_parser.set_defaults(func=add_secret_command) add_secret_parser.set_defaults(func=add_secret_command)
add_secret_parser = subparser.add_parser(
"add-secret", help="allow a machine to access a secret"
)
add_secret_parser.add_argument(
"user", help="the name of the group", type=user_name_type
)
add_secret_parser.add_argument(
"secret", help="the name of the secret", type=secret_name_type
)
add_secret_parser.set_defaults(func=add_secret_command)
remove_secret_parser = subparser.add_parser( remove_secret_parser = subparser.add_parser(
"remove-secret", help="remove a user's access to a secret" "remove-secret", help="remove a user's access to a secret"
) )
add_secret_parser.add_argument( remove_secret_parser.add_argument(
"user", help="the name of the group", type=user_name_type "user", help="the name of the group", type=user_name_type
) )
remove_secret_parser.add_argument( remove_secret_parser.add_argument(

View File

@@ -0,0 +1,821 @@
# Adapted from https://github.com/numtide/deploykit
import fcntl
import logging
import math
import os
import select
import shlex
import subprocess
import sys
import time
from contextlib import ExitStack, contextmanager
from enum import Enum
from pathlib import Path
from shlex import quote
from threading import Thread
from typing import (
IO,
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Literal,
Optional,
Tuple,
TypeVar,
Union,
overload,
)
# https://no-color.org
DISABLE_COLOR = not sys.stderr.isatty() or os.environ.get("NO_COLOR", "") != ""
def ansi_color(color: int) -> str:
return f"\x1b[{color}m"
class CommandFormatter(logging.Formatter):
"""
print errors in red and warnings in yellow
"""
def __init__(self) -> None:
super().__init__(
"%(prefix_color)s[%(command_prefix)s]%(color_reset)s %(color)s%(message)s%(color_reset)s"
)
self.hostnames: List[str] = []
self.hostname_color_offset = 1 # first host shouldn't get agressive red
def formatMessage(self, record: logging.LogRecord) -> str:
colorcode = 0
if record.levelno == logging.ERROR:
colorcode = 31 # red
if record.levelno == logging.WARN:
colorcode = 33 # yellow
color, prefix_color, color_reset = "", "", ""
if not DISABLE_COLOR:
command_prefix = getattr(record, "command_prefix", "")
color = ansi_color(colorcode)
prefix_color = ansi_color(self.hostname_colorcode(command_prefix))
color_reset = "\x1b[0m"
setattr(record, "color", color)
setattr(record, "prefix_color", prefix_color)
setattr(record, "color_reset", color_reset)
return super().formatMessage(record)
def hostname_colorcode(self, hostname: str) -> int:
try:
index = self.hostnames.index(hostname)
except ValueError:
self.hostnames += [hostname]
index = self.hostnames.index(hostname)
return 31 + (index + self.hostname_color_offset) % 7
def setup_loggers() -> Tuple[logging.Logger, logging.Logger]:
# If we use the default logger here (logging.error etc) or a logger called
# "deploykit", then cmdlog messages are also posted on the default logger.
# To avoid this message duplication, we set up a main and command logger
# and use a "deploykit" main logger.
kitlog = logging.getLogger("deploykit.main")
kitlog.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter())
kitlog.addHandler(ch)
# use specific logger for command outputs
cmdlog = logging.getLogger("deploykit.command")
cmdlog.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(CommandFormatter())
cmdlog.addHandler(ch)
return (kitlog, cmdlog)
# loggers for: general deploykit, command output
kitlog, cmdlog = setup_loggers()
info = kitlog.info
warn = kitlog.warning
error = kitlog.error
@contextmanager
def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]:
(pipe_r, pipe_w) = os.pipe()
read_end = os.fdopen(pipe_r, "r")
write_end = os.fdopen(pipe_w, "w")
try:
fl = fcntl.fcntl(read_end, fcntl.F_GETFL)
fcntl.fcntl(read_end, fcntl.F_SETFL, fl | os.O_NONBLOCK)
yield (read_end, write_end)
finally:
read_end.close()
write_end.close()
FILE = Union[None, int]
# Seconds until a message is printed when _run produces no output.
NO_OUTPUT_TIMEOUT = 20
class HostKeyCheck(Enum):
# Strictly check ssh host keys, prompt for unknown ones
STRICT = 0
# Trust on ssh keys on first use
TOFU = 1
# Do not check ssh host keys
NONE = 2
class Host:
def __init__(
self,
host: str,
user: Optional[str] = None,
port: Optional[int] = None,
key: Optional[str] = None,
forward_agent: bool = False,
command_prefix: Optional[str] = None,
host_key_check: HostKeyCheck = HostKeyCheck.STRICT,
meta: Dict[str, Any] = {},
verbose_ssh: bool = False,
) -> None:
"""
Creates a Host
@host the hostname to connect to via ssh
@port the port to connect to via ssh
@forward_agent: wheter to forward ssh agent
@command_prefix: string to prefix each line of the command output with, defaults to host
@host_key_check: wether to check ssh host keys
@verbose_ssh: Enables verbose logging on ssh connections
@meta: meta attributes associated with the host. Those can be accessed in custom functions passed to `run_function`
"""
self.host = host
self.user = user
self.port = port
self.key = key
if command_prefix:
self.command_prefix = command_prefix
else:
self.command_prefix = host
self.forward_agent = forward_agent
self.host_key_check = host_key_check
self.meta = meta
self.verbose_ssh = verbose_ssh
def _prefix_output(
self,
displayed_cmd: str,
print_std_fd: Optional[IO[str]],
print_err_fd: Optional[IO[str]],
stdout: Optional[IO[str]],
stderr: Optional[IO[str]],
timeout: float = math.inf,
) -> Tuple[str, str]:
rlist = []
if print_std_fd is not None:
rlist.append(print_std_fd)
if print_err_fd is not None:
rlist.append(print_err_fd)
if stdout is not None:
rlist.append(stdout)
if stderr is not None:
rlist.append(stderr)
print_std_buf = ""
print_err_buf = ""
stdout_buf = ""
stderr_buf = ""
start = time.time()
last_output = time.time()
while len(rlist) != 0:
r, _, _ = select.select(rlist, [], [], min(timeout, NO_OUTPUT_TIMEOUT))
def print_from(
print_fd: IO[str], print_buf: str, is_err: bool = False
) -> Tuple[float, str]:
read = os.read(print_fd.fileno(), 4096)
if len(read) == 0:
rlist.remove(print_fd)
print_buf += read.decode("utf-8")
if (read == b"" and len(print_buf) != 0) or "\n" in print_buf:
# print and empty the print_buf, if the stream is draining,
# but there is still something in the buffer or on newline.
lines = print_buf.rstrip("\n").split("\n")
for line in lines:
if not is_err:
cmdlog.info(
line, extra=dict(command_prefix=self.command_prefix)
)
pass
else:
cmdlog.error(
line, extra=dict(command_prefix=self.command_prefix)
)
print_buf = ""
last_output = time.time()
return (last_output, print_buf)
if print_std_fd in r and print_std_fd is not None:
(last_output, print_std_buf) = print_from(
print_std_fd, print_std_buf, is_err=False
)
if print_err_fd in r and print_err_fd is not None:
(last_output, print_err_buf) = print_from(
print_err_fd, print_err_buf, is_err=True
)
now = time.time()
elapsed = now - start
if now - last_output > NO_OUTPUT_TIMEOUT:
elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed))
cmdlog.warn(
f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)",
extra=dict(command_prefix=self.command_prefix),
)
def handle_fd(fd: Optional[IO[Any]]) -> str:
if fd and fd in r:
read = os.read(fd.fileno(), 4096)
if len(read) == 0:
rlist.remove(fd)
else:
return read.decode("utf-8")
return ""
stdout_buf += handle_fd(stdout)
stderr_buf += handle_fd(stderr)
if now - last_output >= timeout:
break
return stdout_buf, stderr_buf
def _run(
self,
cmd: List[str],
displayed_cmd: str,
shell: bool,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]:
with ExitStack() as stack:
read_std_fd, write_std_fd = (None, None)
read_err_fd, write_err_fd = (None, None)
if stdout is None or stderr is None:
read_std_fd, write_std_fd = stack.enter_context(_pipe())
read_err_fd, write_err_fd = stack.enter_context(_pipe())
if stdout is None:
stdout_read = None
stdout_write = write_std_fd
elif stdout == subprocess.PIPE:
stdout_read, stdout_write = stack.enter_context(_pipe())
else:
raise Exception(f"unsupported value for stdout parameter: {stdout}")
if stderr is None:
stderr_read = None
stderr_write = write_err_fd
elif stderr == subprocess.PIPE:
stderr_read, stderr_write = stack.enter_context(_pipe())
else:
raise Exception(f"unsupported value for stderr parameter: {stderr}")
env = os.environ.copy()
env.update(extra_env)
with subprocess.Popen(
cmd,
text=True,
shell=shell,
stdout=stdout_write,
stderr=stderr_write,
env=env,
cwd=cwd,
) as p:
if write_std_fd is not None:
write_std_fd.close()
if write_err_fd is not None:
write_err_fd.close()
if stdout == subprocess.PIPE:
assert stdout_write is not None
stdout_write.close()
if stderr == subprocess.PIPE:
assert stderr_write is not None
stderr_write.close()
start = time.time()
stdout_data, stderr_data = self._prefix_output(
displayed_cmd,
read_std_fd,
read_err_fd,
stdout_read,
stderr_read,
timeout,
)
try:
ret = p.wait(timeout=max(0, timeout - (time.time() - start)))
except subprocess.TimeoutExpired:
p.kill()
raise
if ret != 0:
if check:
raise subprocess.CalledProcessError(
ret, cmd=cmd, output=stdout_data, stderr=stderr_data
)
else:
cmdlog.warning(
f"[Command failed: {ret}] {displayed_cmd}",
extra=dict(command_prefix=self.command_prefix),
)
return subprocess.CompletedProcess(
cmd, ret, stdout=stdout_data, stderr=stderr_data
)
raise RuntimeError("unreachable")
def run_local(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]:
"""
Command to run locally for the host
@cmd the commmand to run
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@extra_env environment variables to override whe running the command
@cwd current working directory to run the process in
@timeout: Timeout in seconds for the command to complete
@return subprocess.CompletedProcess result of the command
"""
shell = False
if isinstance(cmd, str):
cmd = [cmd]
shell = True
displayed_cmd = " ".join(cmd)
cmdlog.info(
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
)
return self._run(
cmd,
displayed_cmd,
shell=shell,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
timeout=timeout,
)
def run(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
become_root: bool = False,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]:
"""
Command to run on the host via ssh
@cmd the commmand to run
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@become_root if the ssh_user is not root than sudo is prepended
@extra_env environment variables to override whe running the command
@cwd current working directory to run the process in
@verbose_ssh: Enables verbose logging on ssh connections
@timeout: Timeout in seconds for the command to complete
@return subprocess.CompletedProcess result of the ssh command
"""
sudo = ""
if become_root and self.user != "root":
sudo = "sudo -- "
vars = []
for k, v in extra_env.items():
vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
displayed_cmd = ""
export_cmd = ""
if vars:
export_cmd = f"export {' '.join(vars)}; "
displayed_cmd += export_cmd
if isinstance(cmd, list):
displayed_cmd += " ".join(cmd)
else:
displayed_cmd += cmd
cmdlog.info(
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
)
if self.user is not None:
ssh_target = f"{self.user}@{self.host}"
else:
ssh_target = self.host
ssh_opts = ["-A"] if self.forward_agent else []
if self.port:
ssh_opts.extend(["-p", str(self.port)])
if self.key:
ssh_opts.extend(["-i", self.key])
if self.host_key_check != HostKeyCheck.STRICT:
ssh_opts.extend(["-o", "StrictHostKeyChecking=no"])
if self.host_key_check == HostKeyCheck.NONE:
ssh_opts.extend(["-o", "UserKnownHostsFile=/dev/null"])
if verbose_ssh or self.verbose_ssh:
ssh_opts.extend(["-v"])
bash_cmd = export_cmd
bash_args = []
if isinstance(cmd, list):
bash_cmd += 'exec "$@"'
bash_args += cmd
else:
bash_cmd += cmd
# FIXME we assume bash to be present here? Should be documented...
ssh_cmd = (
["ssh", ssh_target]
+ ssh_opts
+ [
"--",
f"{sudo}bash -c {quote(bash_cmd)} -- {' '.join(map(quote, bash_args))}",
]
)
return self._run(
ssh_cmd,
displayed_cmd,
shell=False,
stdout=stdout,
stderr=stderr,
cwd=cwd,
check=check,
timeout=timeout,
)
T = TypeVar("T")
class HostResult(Generic[T]):
def __init__(self, host: Host, result: Union[T, Exception]) -> None:
self.host = host
self._result = result
@property
def error(self) -> Optional[Exception]:
"""
Returns an error if the command failed
"""
if isinstance(self._result, Exception):
return self._result
return None
@property
def result(self) -> T:
"""
Unwrap the result
"""
if isinstance(self._result, Exception):
raise self._result
return self._result
Results = List[HostResult[subprocess.CompletedProcess[str]]]
def _worker(
func: Callable[[Host], T],
host: Host,
results: List[HostResult[T]],
idx: int,
) -> None:
try:
results[idx] = HostResult(host, func(host))
except Exception as e:
kitlog.exception(e)
results[idx] = HostResult(host, e)
class Group:
def __init__(self, hosts: List[Host]) -> None:
self.hosts = hosts
def _run_local(
self,
cmd: Union[str, List[str]],
host: Host,
results: Results,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> None:
try:
proc = host.run_local(
cmd,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
timeout=timeout,
)
results.append(HostResult(host, proc))
except Exception as e:
kitlog.exception(e)
results.append(HostResult(host, e))
def _run_remote(
self,
cmd: Union[str, List[str]],
host: Host,
results: Results,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> None:
try:
proc = host.run(
cmd,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
verbose_ssh=verbose_ssh,
timeout=timeout,
)
results.append(HostResult(host, proc))
except Exception as e:
kitlog.exception(e)
results.append(HostResult(host, e))
def _reraise_errors(self, results: List[HostResult[Any]]) -> None:
errors = 0
for result in results:
e = result.error
if e:
cmdlog.error(
f"failed with: {e}",
extra=dict(command_prefix=result.host.command_prefix),
)
errors += 1
if errors > 0:
raise Exception(
f"{errors} hosts failed with an error. Check the logs above"
)
def _run(
self,
cmd: Union[str, List[str]],
local: bool = False,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> Results:
results: Results = []
threads = []
for host in self.hosts:
fn = self._run_local if local else self._run_remote
thread = Thread(
target=fn,
kwargs=dict(
results=results,
cmd=cmd,
host=host,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
verbose_ssh=verbose_ssh,
timeout=timeout,
),
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if check:
self._reraise_errors(results)
return results
def run(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
verbose_ssh: bool = False,
timeout: float = math.inf,
) -> Results:
"""
Command to run on the remote host via ssh
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@cwd current working directory to run the process in
@verbose_ssh: Enables verbose logging on ssh connections
@timeout: Timeout in seconds for the command to complete
@return a lists of tuples containing Host and the result of the command for this Host
"""
return self._run(
cmd,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
verbose_ssh=verbose_ssh,
timeout=timeout,
)
def run_local(
self,
cmd: Union[str, List[str]],
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
timeout: float = math.inf,
) -> Results:
"""
Command to run locally for each host in the group in parallel
@cmd the commmand to run
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@cwd current working directory to run the process in
@extra_env environment variables to override whe running the command
@timeout: Timeout in seconds for the command to complete
@return a lists of tuples containing Host and the result of the command for this Host
"""
return self._run(
cmd,
local=True,
stdout=stdout,
stderr=stderr,
extra_env=extra_env,
cwd=cwd,
check=check,
timeout=timeout,
)
def run_function(
self, func: Callable[[Host], T], check: bool = True
) -> List[HostResult[T]]:
"""
Function to run for each host in the group in parallel
@func the function to call
"""
threads = []
results: List[HostResult[T]] = [
HostResult(h, Exception(f"No result set for thread {i}"))
for (i, h) in enumerate(self.hosts)
]
for i, host in enumerate(self.hosts):
thread = Thread(
target=_worker,
args=(func, host, results, i),
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if check:
self._reraise_errors(results)
return results
def filter(self, pred: Callable[[Host], bool]) -> "Group":
"""Return a new Group with the results filtered by the predicate"""
return Group(list(filter(pred, self.hosts)))
@overload
def run(
cmd: Union[List[str], str],
text: Literal[True] = ...,
stdout: FILE = ...,
stderr: FILE = ...,
extra_env: Dict[str, str] = ...,
cwd: Union[None, str, Path] = ...,
check: bool = ...,
) -> subprocess.CompletedProcess[str]:
...
@overload
def run(
cmd: Union[List[str], str],
text: Literal[False],
stdout: FILE = ...,
stderr: FILE = ...,
extra_env: Dict[str, str] = ...,
cwd: Union[None, str, Path] = ...,
check: bool = ...,
) -> subprocess.CompletedProcess[bytes]:
...
def run(
cmd: Union[List[str], str],
text: bool = True,
stdout: FILE = None,
stderr: FILE = None,
extra_env: Dict[str, str] = {},
cwd: Union[None, str, Path] = None,
check: bool = True,
) -> subprocess.CompletedProcess[Any]:
"""
Run command locally
@cmd if this parameter is a string the command is interpreted as a shell command,
otherwise if it is a list, than the first list element is the command
and the remaining list elements are passed as arguments to the
command.
@text when true, file objects for stdout and stderr are opened in text mode.
@stdout if not None stdout of the command will be redirected to this file i.e. stdout=subprocss.PIPE
@stderr if not None stderr of the command will be redirected to this file i.e. stderr=subprocess.PIPE
@extra_env environment variables to override whe running the command
@cwd current working directory to run the process in
@check If check is true, and the process exits with a non-zero exit code, a
CalledProcessError exception will be raised. Attributes of that exception
hold the arguments, the exit code, and stdout and stderr if they were
captured.
"""
if isinstance(cmd, list):
info("$ " + " ".join(cmd))
else:
info(f"$ {cmd}")
env = os.environ.copy()
env.update(extra_env)
return subprocess.run(
cmd,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd,
check=check,
shell=not isinstance(cmd, list),
text=text,
)

View File

@@ -3,7 +3,7 @@ import json
import subprocess import subprocess
from typing import Optional from typing import Optional
from .nix import nix_shell from ..nix import nix_shell
def ssh( def ssh(

View File

@@ -31,7 +31,7 @@ let
checkPython = python3.withPackages (_ps: dependencies ++ testDependencies); checkPython = python3.withPackages (_ps: dependencies ++ testDependencies);
in in
python3.pkgs.buildPythonPackage { python3.pkgs.buildPythonPackage {
name = "clan"; name = "clan-cli";
src = lib.cleanSource ./.; src = lib.cleanSource ./.;
format = "pyproject"; format = "pyproject";
nativeBuildInputs = [ nativeBuildInputs = [
@@ -79,6 +79,11 @@ python3.pkgs.buildPythonPackage {
''; '';
checkPhase = '' checkPhase = ''
PYTHONPATH= $out/bin/clan --help PYTHONPATH= $out/bin/clan --help
if grep --include \*.py -Rq "breakpoint()" $out; then
echo "breakpoint() found in $out:"
grep --include \*.py -Rn "breakpoint()" $out
exit 1
fi
''; '';
meta.mainProgram = "clan"; meta.mainProgram = "clan";
} }

View File

@@ -1,15 +1,15 @@
{ self, ... }: { { self, ... }: {
perSystem = { self', pkgs, ... }: { perSystem = { self', pkgs, ... }: {
devShells.clan = pkgs.callPackage ./shell.nix { devShells.clan-cli = pkgs.callPackage ./shell.nix {
inherit self; inherit self;
inherit (self'.packages) clan; inherit (self'.packages) clan-cli;
}; };
packages = { packages = {
clan = pkgs.python3.pkgs.callPackage ./default.nix { clan-cli = pkgs.python3.pkgs.callPackage ./default.nix {
inherit self; inherit self;
zerotierone = self'.packages.zerotierone; zerotierone = self'.packages.zerotierone;
}; };
default = self'.packages.clan; default = self'.packages.clan-cli;
## Optional dependencies for clan cli, we re-expose them here to make sure they all build. ## Optional dependencies for clan cli, we re-expose them here to make sure they all build.
inherit (pkgs) inherit (pkgs)
@@ -27,30 +27,7 @@
## End optional dependencies ## End optional dependencies
}; };
checks = self'.packages.clan.tests // { checks = self'.packages.clan-cli.tests;
# check if the `clan config` example jsonschema and data is valid
clan-config-example-schema-valid = pkgs.runCommand "clan-config-example-schema-valid" { } ''
echo "Checking that example-schema.json is valid"
${pkgs.check-jsonschema}/bin/check-jsonschema \
--check-metaschema ${./.}/tests/config/example-schema.json
echo "Checking that example-data.json is valid according to example-schema.json"
${pkgs.check-jsonschema}/bin/check-jsonschema \
--schemafile ${./.}/tests/config/example-schema.json \
${./.}/tests/config/example-data.json
touch $out
'';
# check if the `clan config` nix jsonschema converter unit tests succeed
clan-config-nix-unit-tests = pkgs.runCommand "clan-edit-unit-tests" { } ''
export NIX_PATH=nixpkgs=${pkgs.path}
${self'.packages.nix-unit}/bin/nix-unit \
${./.}/tests/config/test.nix \
--eval-store $(realpath .)
touch $out
'';
};
}; };
} }

View File

@@ -3,7 +3,7 @@ requires = [ "setuptools" ]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project] [project]
name = "clan" name = "clan-cli"
description = "cLAN CLI tool" description = "cLAN CLI tool"
dynamic = [ "version" ] dynamic = [ "version" ]
scripts = { clan = "clan_cli:main" } scripts = { clan = "clan_cli:main" }

View File

@@ -1,15 +1,15 @@
{ self, clan, pkgs }: { self, clan-cli, pkgs }:
let let
pythonWithDeps = pkgs.python3.withPackages ( pythonWithDeps = pkgs.python3.withPackages (
ps: ps:
clan.propagatedBuildInputs clan-cli.propagatedBuildInputs
++ clan.devDependencies ++ clan-cli.devDependencies
++ [ ++ [
ps.pip ps.pip
] ]
); );
checkScript = pkgs.writeScriptBin "check" '' checkScript = pkgs.writeScriptBin "check" ''
nix build -f . tests -L "$@" nix build .#checks.${pkgs.system}.{treefmt,clan-mypy,clan-pytest} -L "$@"
''; '';
in in
pkgs.mkShell { pkgs.mkShell {

View File

@@ -0,0 +1,31 @@
import pytest
class KeyPair:
def __init__(self, pubkey: str, privkey: str) -> None:
self.pubkey = pubkey
self.privkey = privkey
KEYS = [
KeyPair(
"age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c",
"AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK",
),
KeyPair(
"age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62",
"AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ",
),
KeyPair(
"age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp",
"AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF",
),
]
@pytest.fixture
def age_keys() -> list[KeyPair]:
"""
Root directory of the tests
"""
return KEYS

View File

@@ -3,4 +3,4 @@ import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "helpers")) sys.path.append(os.path.join(os.path.dirname(__file__), "helpers"))
pytest_plugins = ["temporary_dir", "clan_flake"] pytest_plugins = ["temporary_dir", "clan_flake", "root", "age_keys"]

View File

@@ -0,0 +1,23 @@
secret-key: ENC[AES256_GCM,data:gjX4OmCUdd3TlA4p,iv:3yZVpyd6FqkITQY0nU2M1iubmzvkR6PfkK2m/s6nQh8=,tag:Abgp9xkiFFylZIyAlap6Ew==,type:str]
nested:
secret-key: ENC[AES256_GCM,data:iUMgDhhIjwvd7wL4,iv:jiJIrh12dSu/sXX+z9ITVoEMNDMjwIlFBnyv40oN4LE=,tag:G9VmAa66Km1sc7JEhW5AvA==,type:str]
sops:
kms: []
gcp_kms: []
azure_kv: []
hc_vault: []
age:
- recipient: age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62
enc: |
-----BEGIN AGE ENCRYPTED FILE-----
YWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSA0eWdRVjlydXlXOVZFQ3lO
bzU1eG9Iam5Ka29Sdlo0cHJ4b1R6bjdNSzBjCkgwRndCbWZQWHlDU0x1cWRmaGVt
N29lbjR6UjN0L2RhaXEzSG9zQmRsZGsKLS0tIEdsdWgxSmZwU3BWUDVxVWRSSC9M
eVZ6bjgwZnR2TTM5MkRYZWNFSFplQWsKmSzv12/dftL9jx2y35UZUGVK6xWdatE8
BGJiCvMlp0BQNrh2s/+YaEaBa48w8LL79U/XJnEZ+ZUwxmlbSTn6Hg==
-----END AGE ENCRYPTED FILE-----
lastmodified: "2023-08-08T14:27:20Z"
mac: ENC[AES256_GCM,data:iRWWX+L5Q5nKn3fBCLaWoz/mvqGnNnRd93gJmYXDZbRjFoHa9IFJZst5QDIDa1ZRYUe6G0/+lV5SBi+vwRm1pHysJ3c0ZWYjBP+e1jw3jLXxLV5gACsDC8by+6rFUCho0Xgu+Nqu2ehhNenjQQnCvDH5ivWbW70KFT5ynNgR9Tw=,iv:RYnnbLMC/hNfMwWPreMq9uvY0khajwQTZENO/P34ckY=,tag:Xi1PS5vM1c+sRkroHkPn1Q==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3

View File

@@ -0,0 +1,14 @@
import argparse
from clan_cli.secrets import register_parser
class SecretCli:
def __init__(self) -> None:
self.parser = argparse.ArgumentParser()
register_parser(self.parser)
def run(self, args: list[str]) -> argparse.Namespace:
parsed = self.parser.parse_args(args)
parsed.func(parsed)
return parsed

View File

@@ -0,0 +1,22 @@
from pathlib import Path
import pytest
TEST_ROOT = Path(__file__).parent.resolve()
PROJECT_ROOT = TEST_ROOT.parent
@pytest.fixture
def project_root() -> Path:
"""
Root directory of the tests
"""
return PROJECT_ROOT
@pytest.fixture
def test_root() -> Path:
"""
Root directory of the tests
"""
return TEST_ROOT

View File

@@ -0,0 +1,46 @@
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from environment import mock_env
from secret_cli import SecretCli
if TYPE_CHECKING:
from age_keys import KeyPair
def test_import_sops(
test_root: Path,
clan_flake: Path,
capsys: pytest.CaptureFixture,
age_keys: list["KeyPair"],
) -> None:
cli = SecretCli()
with mock_env(SOPS_AGE_KEY=age_keys[1].privkey):
cli.run(["machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["users", "add", "user1", age_keys[1].pubkey])
cli.run(["users", "add", "user2", age_keys[2].pubkey])
cli.run(["groups", "add-user", "group1", "user1"])
cli.run(["groups", "add-user", "group1", "user2"])
# To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cli.run(
[
"import-sops",
"--group",
"group1",
"--machine",
"machine1",
str(test_root.joinpath("data", "secrets.yaml")),
]
)
capsys.readouterr()
cli.run(["users", "list"])
users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["get", "secret-key"])
assert capsys.readouterr().out == "secret-value"

View File

@@ -1,130 +0,0 @@
import argparse
import os
from pathlib import Path
import pytest
from environment import mock_env
from clan_cli.errors import ClanError
from clan_cli.secrets import register_parser
class SecretCli:
def __init__(self) -> None:
self.parser = argparse.ArgumentParser()
register_parser(self.parser)
def run(self, args: list[str]) -> argparse.Namespace:
parsed = self.parser.parse_args(args)
parsed.func(parsed)
return parsed
PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c"
PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK"
def _test_identities(
what: str, clan_flake: Path, capsys: pytest.CaptureFixture
) -> None:
cli = SecretCli()
sops_folder = clan_flake / "sops"
cli.run([what, "add", "foo", PUBKEY])
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run([what, "add", "foo", PUBKEY])
cli.run(
[
what,
"add",
"-f",
"foo",
PRIVKEY,
]
)
capsys.readouterr() # empty the buffer
cli.run([what, "list"])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
cli.run([what, "remove", "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run([what, "remove", "foo"])
capsys.readouterr()
cli.run([what, "list"])
out = capsys.readouterr()
assert "foo" not in out.out
def test_users(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
_test_identities("users", clan_flake, capsys)
def test_machines(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
_test_identities("machines", clan_flake, capsys)
def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(["groups", "add-machine", "group1", "machine1"])
with pytest.raises(ClanError): # user does not exist yet
cli.run(["groups", "add-user", "groupb1", "user1"])
cli.run(["machines", "add", "machine1", PUBKEY])
cli.run(["groups", "add-machine", "group1", "machine1"])
# Should this fail?
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["users", "add", "user1", PUBKEY])
cli.run(["groups", "add-user", "group1", "user1"])
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
out = capsys.readouterr().out
assert "user1" in out
assert "machine1" in out
cli.run(["groups", "remove-user", "group1", "user1"])
cli.run(["groups", "remove-machine", "group1", "machine1"])
groups = os.listdir(clan_flake / "sops" / "groups")
assert len(groups) == 0
def test_secrets(
clan_flake: Path, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # does not exist yet
cli.run(["get", "nonexisting"])
with mock_env(
SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key")
):
cli.run(["set", "key"])
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == "key\n"
cli.run(["remove", "key"])
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == ""

View File

@@ -0,0 +1,165 @@
import os
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from environment import mock_env
from secret_cli import SecretCli
from clan_cli.errors import ClanError
if TYPE_CHECKING:
from age_keys import KeyPair
def _test_identities(
what: str,
clan_flake: Path,
capsys: pytest.CaptureFixture,
age_keys: list["KeyPair"],
) -> None:
cli = SecretCli()
sops_folder = clan_flake / "sops"
cli.run([what, "add", "foo", age_keys[0].pubkey])
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run([what, "add", "foo", age_keys[0].pubkey])
cli.run(
[
what,
"add",
"-f",
"foo",
age_keys[0].privkey,
]
)
capsys.readouterr() # empty the buffer
cli.run([what, "list"])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
cli.run([what, "remove", "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run([what, "remove", "foo"])
capsys.readouterr()
cli.run([what, "list"])
out = capsys.readouterr()
assert "foo" not in out.out
def test_users(
clan_flake: Path, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
_test_identities("users", clan_flake, capsys, age_keys)
def test_machines(
clan_flake: Path, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
_test_identities("machines", clan_flake, capsys, age_keys)
def test_groups(
clan_flake: Path, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(["groups", "add-machine", "group1", "machine1"])
with pytest.raises(ClanError): # user does not exist yet
cli.run(["groups", "add-user", "groupb1", "user1"])
cli.run(["machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["groups", "add-machine", "group1", "machine1"])
# Should this fail?
cli.run(["groups", "add-machine", "group1", "machine1"])
cli.run(["users", "add", "user1", age_keys[0].pubkey])
cli.run(["groups", "add-user", "group1", "user1"])
capsys.readouterr() # empty the buffer
cli.run(["groups", "list"])
out = capsys.readouterr().out
assert "user1" in out
assert "machine1" in out
cli.run(["groups", "remove-user", "group1", "user1"])
cli.run(["groups", "remove-machine", "group1", "machine1"])
groups = os.listdir(clan_flake / "sops" / "groups")
assert len(groups) == 0
def test_secrets(
clan_flake: Path, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
cli = SecretCli()
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == ""
with mock_env(
SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key")
):
with pytest.raises(ClanError): # does not exist yet
cli.run(["get", "nonexisting"])
cli.run(["set", "key"])
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
capsys.readouterr()
cli.run(["users", "list"])
users = capsys.readouterr().out.rstrip().split("\n")
assert len(users) == 1, f"users: {users}"
owner = users[0]
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == "key\n"
cli.run(["machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["machines", "add-secret", "machine1", "key"])
with mock_env(SOPS_AGE_KEY=age_keys[0].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["machines", "remove-secret", "machine1", "key"])
cli.run(["users", "add", "user1", age_keys[1].pubkey])
cli.run(["users", "add-secret", "user1", "key"])
with mock_env(SOPS_AGE_KEY=age_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["users", "remove-secret", "user1", "key"])
with pytest.raises(ClanError): # does not exist yet
cli.run(["groups", "add-secret", "admin-group", "key"])
cli.run(["groups", "add-user", "admin-group", "user1"])
cli.run(["groups", "add-user", "admin-group", owner])
cli.run(["groups", "add-secret", "admin-group", "key"])
capsys.readouterr() # empty the buffer
cli.run(["set", "--group", "admin-group", "key2"])
with mock_env(SOPS_AGE_KEY=age_keys[1].privkey, SOPS_AGE_KEY_FILE=""):
capsys.readouterr()
cli.run(["get", "key"])
assert capsys.readouterr().out == "foo"
cli.run(["groups", "remove-secret", "admin-group", "key"])
cli.run(["remove", "key"])
cli.run(["remove", "key2"])
capsys.readouterr() # empty the buffer
cli.run(["list"])
assert capsys.readouterr().out == ""

View File

@@ -6,7 +6,8 @@ import pytest_subprocess.fake_process
from environment import mock_env from environment import mock_env
from pytest_subprocess import utils from pytest_subprocess import utils
import clan_cli.ssh import clan_cli
from clan_cli.ssh import cli
def test_no_args( def test_no_args(
@@ -40,7 +41,7 @@ def test_ssh_no_pass(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
fp.any(), fp.any(),
] ]
fp.register(cmd) fp.register(cmd)
clan_cli.ssh.ssh( cli.ssh(
host=host, host=host,
user=user, user=user,
) )
@@ -64,7 +65,7 @@ def test_ssh_with_pass(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
fp.any(), fp.any(),
] ]
fp.register(cmd) fp.register(cmd)
clan_cli.ssh.ssh( cli.ssh(
host=host, host=host,
user=user, user=user,
password="XXX", password="XXX",
@@ -75,5 +76,5 @@ def test_ssh_with_pass(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None: def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
cmd: list[Union[str, utils.Any]] = [fp.any()] cmd: list[Union[str, utils.Any]] = [fp.any()]
fp.register(cmd, stdout="https://test.test") fp.register(cmd, stdout="https://test.test")
result = clan_cli.ssh.qrcode_scan("test.png") result = cli.qrcode_scan("test.png")
assert result == "https://test.test" assert result == "https://test.test"

View File

@@ -0,0 +1,97 @@
import subprocess
from clan_cli.ssh import Group, Host, run
def test_run() -> None:
p = run("echo hello")
assert p.stdout is None
def test_run_failure() -> None:
p = run("exit 1", check=False)
assert p.returncode == 1
try:
p = run("exit 1")
except Exception:
pass
else:
assert False, "Command should have raised an error"
hosts = Group([Host("some_host")])
def test_run_environment() -> None:
p1 = run("echo $env_var", stdout=subprocess.PIPE, extra_env=dict(env_var="true"))
assert p1.stdout == "true\n"
p2 = hosts.run_local(
"echo $env_var", extra_env=dict(env_var="true"), stdout=subprocess.PIPE
)
assert p2[0].result.stdout == "true\n"
p3 = hosts.run_local(
["env"], extra_env=dict(env_var="true"), stdout=subprocess.PIPE
)
assert "env_var=true" in p3[0].result.stdout
def test_run_non_shell() -> None:
p = run(["echo", "$hello"], stdout=subprocess.PIPE)
assert p.stdout == "$hello\n"
def test_run_stderr_stdout() -> None:
p = run("echo 1; echo 2 >&2", stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert p.stdout == "1\n"
assert p.stderr == "2\n"
def test_run_local() -> None:
hosts.run_local("echo hello")
def test_timeout() -> None:
try:
hosts.run_local("sleep 10", timeout=0.01)
except Exception:
pass
else:
assert False, "should have raised TimeoutExpired"
def test_run_function() -> None:
def some_func(h: Host) -> bool:
p = h.run_local("echo hello", stdout=subprocess.PIPE)
return p.stdout == "hello\n"
res = hosts.run_function(some_func)
assert res[0].result
def test_run_exception() -> None:
try:
hosts.run_local("exit 1")
except Exception:
pass
else:
assert False, "should have raised Exception"
def test_run_function_exception() -> None:
def some_func(h: Host) -> None:
h.run_local("exit 1")
try:
hosts.run_function(some_func)
except Exception:
pass
else:
assert False, "should have raised Exception"
def test_run_local_non_shell() -> None:
p2 = hosts.run_local(["echo", "1"], stdout=subprocess.PIPE)
assert p2[0].result.stdout == "1\n"

View File

@@ -1,4 +1,10 @@
{ ... }: { { ... }: {
imports = [
./clan-cli/flake-module.nix
./installer/flake-module.nix
./ui/flake-module.nix
];
perSystem = { pkgs, config, ... }: { perSystem = { pkgs, config, ... }: {
packages = { packages = {
tea-create-pr = pkgs.callPackage ./tea-create-pr { }; tea-create-pr = pkgs.callPackage ./tea-create-pr { };

View File

@@ -1,6 +1,7 @@
/** @type {import('next').NextConfig} */ /** @type {import('next').NextConfig} */
const nextConfig = { const nextConfig = {
output: "export", output: "export",
images: { unoptimized: true },
}; };
module.exports = nextConfig; module.exports = nextConfig;

View File

@@ -895,6 +895,37 @@
version = "5.14.3"; version = "5.14.3";
}; };
}; };
"@mui/icons-material" = {
"5.14.3" = {
depInfo = {
"@babel/runtime" = {
descriptor = "^7.22.6";
pin = "7.22.6";
runtime = true;
};
};
fetchInfo = {
narHash = "sha256-wmY7EzOahWuCF2g5vpcOeFZ8+iJKwyFLHsQiXh1R2jY=";
type = "tarball";
url = "https://registry.npmjs.org/@mui/icons-material/-/icons-material-5.14.3.tgz";
};
ident = "@mui/icons-material";
ltype = "file";
peerInfo = {
"@mui/material" = {
descriptor = "^5.0.0";
};
"@types/react" = {
descriptor = "^17.0.0 || ^18.0.0";
optional = true;
};
react = {
descriptor = "^17.0.0 || ^18.0.0";
};
};
version = "5.14.3";
};
};
"@mui/material" = { "@mui/material" = {
"5.14.3" = { "5.14.3" = {
depInfo = { depInfo = {
@@ -6643,6 +6674,11 @@
pin = "11.11.0"; pin = "11.11.0";
runtime = true; runtime = true;
}; };
"@mui/icons-material" = {
descriptor = "^5.14.3";
pin = "5.14.3";
runtime = true;
};
"@mui/material" = { "@mui/material" = {
descriptor = "^5.14.3"; descriptor = "^5.14.3";
pin = "5.14.3"; pin = "5.14.3";
@@ -6693,6 +6729,14 @@
pin = "8.4.27"; pin = "8.4.27";
runtime = true; runtime = true;
}; };
prettier = {
descriptor = "^3.0.1";
pin = "3.0.1";
};
prettier-plugin-tailwindcss = {
descriptor = "^0.4.1";
pin = "0.4.1";
};
react = { react = {
descriptor = "18.2.0"; descriptor = "18.2.0";
pin = "18.2.0"; pin = "18.2.0";
@@ -6856,6 +6900,9 @@
"node_modules/@mui/core-downloads-tracker" = { "node_modules/@mui/core-downloads-tracker" = {
key = "@mui/core-downloads-tracker/5.14.3"; key = "@mui/core-downloads-tracker/5.14.3";
}; };
"node_modules/@mui/icons-material" = {
key = "@mui/icons-material/5.14.3";
};
"node_modules/@mui/material" = { "node_modules/@mui/material" = {
key = "@mui/material/5.14.3"; key = "@mui/material/5.14.3";
}; };
@@ -7727,6 +7774,14 @@
"node_modules/prelude-ls" = { "node_modules/prelude-ls" = {
key = "prelude-ls/1.2.1"; key = "prelude-ls/1.2.1";
}; };
"node_modules/prettier" = {
dev = true;
key = "prettier/3.0.1";
};
"node_modules/prettier-plugin-tailwindcss" = {
dev = true;
key = "prettier-plugin-tailwindcss/0.4.1";
};
"node_modules/prop-types" = { "node_modules/prop-types" = {
key = "prop-types/15.8.1"; key = "prop-types/15.8.1";
}; };
@@ -9062,6 +9117,102 @@
version = "1.2.1"; version = "1.2.1";
}; };
}; };
prettier = {
"3.0.1" = {
binInfo = {
binPairs = {
prettier = "bin/prettier.cjs";
};
};
fetchInfo = {
narHash = "sha256-rgaO4WYmjoHtlOu8SnOau8b/O9lIEDtt26ovEY7qseY=";
type = "tarball";
url = "https://registry.npmjs.org/prettier/-/prettier-3.0.1.tgz";
};
ident = "prettier";
ltype = "file";
treeInfo = { };
version = "3.0.1";
};
};
prettier-plugin-tailwindcss = {
"0.4.1" = {
fetchInfo = {
narHash = "sha256-39DJn6lvrLmDYTN/lXXuWzMC9pLI4+HNrhnHlYuOMRM=";
type = "tarball";
url = "https://registry.npmjs.org/prettier-plugin-tailwindcss/-/prettier-plugin-tailwindcss-0.4.1.tgz";
};
ident = "prettier-plugin-tailwindcss";
ltype = "file";
peerInfo = {
"@ianvs/prettier-plugin-sort-imports" = {
descriptor = "*";
optional = true;
};
"@prettier/plugin-pug" = {
descriptor = "*";
optional = true;
};
"@shopify/prettier-plugin-liquid" = {
descriptor = "*";
optional = true;
};
"@shufo/prettier-plugin-blade" = {
descriptor = "*";
optional = true;
};
"@trivago/prettier-plugin-sort-imports" = {
descriptor = "*";
optional = true;
};
prettier = {
descriptor = "^2.2 || ^3.0";
};
prettier-plugin-astro = {
descriptor = "*";
optional = true;
};
prettier-plugin-css-order = {
descriptor = "*";
optional = true;
};
prettier-plugin-import-sort = {
descriptor = "*";
optional = true;
};
prettier-plugin-jsdoc = {
descriptor = "*";
optional = true;
};
prettier-plugin-marko = {
descriptor = "*";
optional = true;
};
prettier-plugin-organize-attributes = {
descriptor = "*";
optional = true;
};
prettier-plugin-organize-imports = {
descriptor = "*";
optional = true;
};
prettier-plugin-style-order = {
descriptor = "*";
optional = true;
};
prettier-plugin-svelte = {
descriptor = "*";
optional = true;
};
prettier-plugin-twig-melody = {
descriptor = "*";
optional = true;
};
};
treeInfo = { };
version = "0.4.1";
};
};
prop-types = { prop-types = {
"15.8.1" = { "15.8.1" = {
depInfo = { depInfo = {

View File

@@ -10,6 +10,7 @@
"dependencies": { "dependencies": {
"@emotion/react": "^11.11.1", "@emotion/react": "^11.11.1",
"@emotion/styled": "^11.11.0", "@emotion/styled": "^11.11.0",
"@mui/icons-material": "^5.14.3",
"@mui/material": "^5.14.3", "@mui/material": "^5.14.3",
"@types/node": "20.4.7", "@types/node": "20.4.7",
"@types/react": "18.2.18", "@types/react": "18.2.18",
@@ -497,6 +498,31 @@
"url": "https://opencollective.com/mui" "url": "https://opencollective.com/mui"
} }
}, },
"node_modules/@mui/icons-material": {
"version": "5.14.3",
"resolved": "https://registry.npmjs.org/@mui/icons-material/-/icons-material-5.14.3.tgz",
"integrity": "sha512-XkxWPhageu1OPUm2LWjo5XqeQ0t2xfGe8EiLkRW9oz2LHMMZmijvCxulhgquUVTF1DnoSh+3KoDLSsoAFtVNVw==",
"dependencies": {
"@babel/runtime": "^7.22.6"
},
"engines": {
"node": ">=12.0.0"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/mui"
},
"peerDependencies": {
"@mui/material": "^5.0.0",
"@types/react": "^17.0.0 || ^18.0.0",
"react": "^17.0.0 || ^18.0.0"
},
"peerDependenciesMeta": {
"@types/react": {
"optional": true
}
}
},
"node_modules/@mui/material": { "node_modules/@mui/material": {
"version": "5.14.3", "version": "5.14.3",
"resolved": "https://registry.npmjs.org/@mui/material/-/material-5.14.3.tgz", "resolved": "https://registry.npmjs.org/@mui/material/-/material-5.14.3.tgz",

View File

@@ -14,6 +14,7 @@
"dependencies": { "dependencies": {
"@emotion/react": "^11.11.1", "@emotion/react": "^11.11.1",
"@emotion/styled": "^11.11.0", "@emotion/styled": "^11.11.0",
"@mui/icons-material": "^5.14.3",
"@mui/material": "^5.14.3", "@mui/material": "^5.14.3",
"@types/node": "20.4.7", "@types/node": "20.4.7",
"@types/react": "18.2.18", "@types/react": "18.2.18",

View File

@@ -1,4 +0,0 @@
module.exports = {
plugins: [require("prettier-plugin-tailwindcss")],
tailwindFunctions: ['clsx', 'cx'],
};

1
pkgs/ui/public/logo.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 14 KiB

View File

@@ -1,25 +0,0 @@
import { DashboardCard } from "../../components/card";
import { Grid } from "@mui/material";
import { Button } from "@mui/material";
export default function Dashboard() {
return (
<Grid container>
<Grid item xs={12}>
<DashboardCard />
<Button variant="contained" color="primary">
Click me!
</Button>
Hallo Mike !
</Grid>
<Grid item xs={6}>
<DashboardCard />
Server Stats
</Grid>
<Grid item xs={6}>
Network Stats
<DashboardCard />
</Grid>
</Grid>
);
}

View File

@@ -1,27 +1,3 @@
@tailwind base; @tailwind base;
@tailwind components; @tailwind components;
@tailwind utilities; @tailwind utilities;
:root {
--foreground-rgb: 0, 0, 0;
--background-start-rgb: 214, 219, 220;
--background-end-rgb: 255, 255, 255;
}
@media (prefers-color-scheme: dark) {
:root {
--foreground-rgb: 255, 255, 255;
--background-start-rgb: 0, 0, 0;
--background-end-rgb: 0, 0, 0;
}
}
body {
color: rgb(var(--foreground-rgb));
background: linear-gradient(
to bottom,
transparent,
rgb(var(--background-end-rgb))
)
rgb(var(--background-start-rgb));
}

View File

@@ -1,10 +1,15 @@
"use client";
import "./globals.css"; import "./globals.css";
import type { Metadata } from "next";
import localFont from "next/font/local"; import localFont from "next/font/local";
import * as React from "react"; import * as React from "react";
import { CssBaseline, ThemeProvider } from "@mui/material";
import { ChangeEvent, useState } from "react";
import { StyledEngineProvider } from "@mui/material/styles"; import { StyledEngineProvider } from "@mui/material/styles";
import cx from "classnames";
// import { tw } from "../utils/tailwind"; import { darkTheme, lightTheme } from "./theme/themes";
import { Sidebar } from "@/components/sidebar";
const roboto = localFont({ const roboto = localFont({
src: [ src: [
@@ -13,49 +18,42 @@ const roboto = localFont({
weight: "400", weight: "400",
style: "normal", style: "normal",
}, },
// {
// path: "./Roboto-Italic.woff2",
// weight: "400",
// style: "italic",
// },
// {
// path: "./Roboto-Bold.woff2",
// weight: "700",
// style: "normal",
// },
// {
// path: "./Roboto-BoldItalic.woff2",
// weight: "700",
// style: "italic",
// },
], ],
}); });
export const metadata: Metadata = {
title: "Create Next App",
description: "Generated by create next app",
};
export default function RootLayout({ export default function RootLayout({
children, children,
}: { }: {
children: React.ReactNode; children: React.ReactNode;
}) { }) {
let [useDarkTheme, setUseDarkTheme] = useState(false);
let [theme, setTheme] = useState(useDarkTheme ? darkTheme : lightTheme);
const changeThemeHandler = (target: ChangeEvent, currentValue: boolean) => {
setUseDarkTheme(currentValue);
setTheme(currentValue ? darkTheme : lightTheme);
};
return ( return (
<html lang="en"> <html lang="en">
<head>
<title>Clan.lol</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="description" content="Clan.lol - build your own network" />
<link rel="icon" href="/favicon.ico" />
</head>
<StyledEngineProvider injectFirst> <StyledEngineProvider injectFirst>
<body <ThemeProvider theme={theme}>
className={cx( <body id="__next" className={roboto.className}>
"h-screen", <CssBaseline />
"min-h-screen", <div className="flex h-screen overflow-hidden">
"w-screen", <Sidebar />
"bg-white", <div className="relative flex flex-1 flex-col overflow-y-auto overflow-x-hidden">
// custom animation defined in tailwind.config <main>{children}</main>
roboto.className </div>
)} </div>
>
{children}
</body> </body>
</ThemeProvider>
</StyledEngineProvider> </StyledEngineProvider>
</html> </html>
); );

View File

@@ -1,112 +1,14 @@
import Image from "next/image"; import { Button } from "@mui/material";
export default function Home() { export default function Dashboard() {
return ( return (
<main className="min-h-screen"> <div className="w-full flex justify-center items-center h-screen">
<div className="z-10 w-full max-w-5xl items-center justify-between font-mono text-sm lg:flex"> <div className="grid">
<p className="fixed left-0 top-0 flex w-full justify-center border-b border-gray-300 bg-gradient-to-b from-zinc-200 pb-6 pt-8 backdrop-blur-2xl dark:border-neutral-800 dark:bg-zinc-800/30 dark:from-inherit lg:static lg:w-auto lg:rounded-xl lg:border lg:bg-gray-200 lg:p-4 lg:dark:bg-zinc-800/30"> Welcome to the Dashboard
Get started by editing&nbsp; <Button variant="contained" color="primary">
<code className="font-mono font-bold">src/app/page.tsx</code> LOL
</p> </Button>
<div className="fixed bottom-0 left-0 flex h-48 w-full items-end justify-center bg-gradient-to-t from-white via-white dark:from-black dark:via-black lg:static lg:h-auto lg:w-auto lg:bg-none">
<a
className="pointer-events-none flex place-items-center gap-2 p-8 lg:pointer-events-auto lg:p-0"
href="https://vercel.com?utm_source=create-next-app&utm_medium=appdir-template&utm_campaign=create-next-app"
target="_blank"
rel="noopener noreferrer"
>
<Image
src="/vercel.svg"
alt="Vercel Logo"
className="dark:invert"
width={100}
height={24}
priority
/>
</a>
</div> </div>
</div> </div>
<div className="relative flex place-items-center before:absolute before:h-[300px] before:w-[480px] before:-translate-x-1/2 before:rounded-full before:bg-gradient-radial before:from-white before:to-transparent before:blur-2xl before:content-[''] after:absolute after:-z-20 after:h-[180px] after:w-[240px] after:translate-x-1/3 after:bg-gradient-conic after:from-sky-200 after:via-blue-200 after:blur-2xl after:content-[''] before:dark:bg-gradient-to-br before:dark:from-transparent before:dark:to-blue-700 before:dark:opacity-10 after:dark:from-sky-900 after:dark:via-[#0141ff] after:dark:opacity-40 before:lg:h-[360px] z-[-1]">
<Image
className="relative dark:drop-shadow-[0_0_0.3rem_#ffffff70] dark:invert"
src="/next.svg"
alt="Next.js Logo"
width={180}
height={37}
priority
/>
</div>
<div className="mb-32 grid text-center lg:mb-0 lg:grid-cols-4 lg:text-left">
<a
href="https://nextjs.org/docs?utm_source=create-next-app&utm_medium=appdir-template&utm_campaign=create-next-app"
className="group rounded-lg border border-transparent px-5 py-4 transition-colors hover:border-gray-300 hover:bg-gray-100 hover:dark:border-neutral-700 hover:dark:bg-neutral-800/30"
target="_blank"
rel="noopener noreferrer"
>
<h2 className={`mb-3 text-2xl font-semibold`}>
Docs{" "}
<span className="inline-block transition-transform group-hover:translate-x-1 motion-reduce:transform-none">
-&gt;
</span>
</h2>
<p className={`m-0 max-w-[30ch] text-sm opacity-50`}>
Find in-depth information about Next.js features and API.
</p>
</a>
<a
href="https://nextjs.org/learn?utm_source=create-next-app&utm_medium=appdir-template-tw&utm_campaign=create-next-app"
className="group rounded-lg border border-transparent px-5 py-4 transition-colors hover:border-gray-300 hover:bg-gray-100 hover:dark:border-neutral-700 hover:dark:bg-neutral-800/30"
target="_blank"
rel="noopener noreferrer"
>
<h2 className={`mb-3 text-2xl font-semibold`}>
Learn{" "}
<span className="inline-block transition-transform group-hover:translate-x-1 motion-reduce:transform-none">
-&gt;
</span>
</h2>
<p className={`m-0 max-w-[30ch] text-sm opacity-50`}>
Learn about Next.js in an interactive course with&nbsp;quizzes!
</p>
</a>
<a
href="https://vercel.com/templates?framework=next.js&utm_source=create-next-app&utm_medium=appdir-template&utm_campaign=create-next-app"
className="group rounded-lg border border-transparent px-5 py-4 transition-colors hover:border-gray-300 hover:bg-gray-100 hover:dark:border-neutral-700 hover:dark:bg-neutral-800/30"
target="_blank"
rel="noopener noreferrer"
>
<h2 className={`mb-3 text-2xl font-semibold`}>
Templates{" "}
<span className="inline-block transition-transform group-hover:translate-x-1 motion-reduce:transform-none">
-&gt;
</span>
</h2>
<p className={`m-0 max-w-[30ch] text-sm opacity-50`}>
Explore the Next.js 13 playground.
</p>
</a>
<a
href="https://vercel.com/new?utm_source=create-next-app&utm_medium=appdir-template&utm_campaign=create-next-app"
className="group rounded-lg border border-transparent px-5 py-4 transition-colors hover:border-gray-300 hover:bg-gray-100 hover:dark:border-neutral-700 hover:dark:bg-neutral-800/30"
target="_blank"
rel="noopener noreferrer"
>
<h2 className={`mb-3 text-2xl font-semibold`}>
Deploy{" "}
<span className="inline-block transition-transform group-hover:translate-x-1 motion-reduce:transform-none">
-&gt;
</span>
</h2>
<p className={`m-0 max-w-[30ch] text-sm opacity-50`}>
Instantly deploy your Next.js site to a shareable URL with Vercel.
</p>
</a>
</div>
</main>
); );
} }

View File

@@ -0,0 +1,13 @@
import { createTheme } from "@mui/material/styles";
export const darkTheme = createTheme({
palette: {
mode: "dark",
},
});
export const lightTheme = createTheme({
palette: {
mode: "light",
},
});

View File

@@ -0,0 +1,122 @@
import {
Divider,
List,
ListItem,
ListItemButton,
ListItemIcon,
ListItemText,
} from "@mui/material";
import Image from "next/image";
import { ReactNode } from "react";
import DashboardIcon from "@mui/icons-material/Dashboard";
import DevicesIcon from "@mui/icons-material/Devices";
import LanIcon from "@mui/icons-material/Lan";
import AppsIcon from "@mui/icons-material/Apps";
import DesignServicesIcon from "@mui/icons-material/DesignServices";
import BackupIcon from "@mui/icons-material/Backup";
import Link from "next/link";
type MenuEntry = {
icon: ReactNode;
label: string;
to: string;
} & {
subMenuEntries?: MenuEntry[];
};
const menuEntries: MenuEntry[] = [
{
icon: <DashboardIcon />,
label: "Dashoard",
to: "/",
},
{
icon: <DevicesIcon />,
label: "Devices",
to: "/nodes",
},
{
icon: <AppsIcon />,
label: "Applications",
to: "/applications",
},
{
icon: <LanIcon />,
label: "Network",
to: "/network",
},
{
icon: <DesignServicesIcon />,
label: "Templates",
to: "/templates",
},
{
icon: <BackupIcon />,
label: "Backups",
to: "/backups",
},
];
export function Sidebar() {
return (
<aside className="absolute left-0 top-0 z-9999 flex h-screen w-12 sm:w-64 flex-col overflow-y-hidden bg-zinc-950 dark:bg-boxdark sm:static">
<div className="flex items-center justify-between gap-2 px-6 py-5.5 lg:py-6.5">
<div className="mt-8 font-semibold text-white w-full text-center hidden sm:block">
<Image
src="/logo.svg"
alt="Clan Logo"
width={58}
height={58}
priority
/>
</div>
</div>
<Divider flexItem className="bg-zinc-600 my-9 mx-8" />
<div className="overflow-hidden flex flex-col overflow-y-auto duration-200 ease-linear">
<List className="pb-4 mb-14 px-4 lg:mt-1 lg:px-6 text-white">
{menuEntries.map((menuEntry, idx) => {
return (
<ListItem key={idx}>
<ListItemButton
className="justify-center sm:justify-normal"
LinkComponent={Link}
href={menuEntry.to}
>
<ListItemIcon
color="inherit"
className="justify-center sm:justify-normal text-white"
>
{menuEntry.icon}
</ListItemIcon>
<ListItemText
primary={menuEntry.label}
primaryTypographyProps={{
color: "inherit",
}}
className="hidden sm:block"
/>
</ListItemButton>
</ListItem>
);
})}
</List>
<Divider flexItem className="bg-zinc-600 mx-8 my-10" />
<div className="hidden sm:block mx-auto mb-8 w-full max-w-60 rounded-sm py-6 px-4 text-center shadow-default align-bottom">
<h3 className="mb-1 w-full font-semibold text-white">
Clan.lol Admin
</h3>
<a
href=""
target="_blank"
rel="nofollow"
className="w-full text-center rounded-md bg-primary p-2 text-white hover:bg-opacity-95"
>
Donate
</a>
</div>
</div>
</aside>
);
}

View File

@@ -1,22 +1,16 @@
/** @type {import('tailwindcss').Config} */ /** @type {import('tailwindcss').Config} */
module.exports = { module.exports = {
corePlugins: {
preflight: false,
},
content: [ content: [
"./src/pages/**/*.{js,ts,jsx,tsx,mdx}", "./src/pages/**/*.{js,ts,jsx,tsx,mdx}",
"./src/components/**/*.{js,ts,jsx,tsx,mdx}", "./src/components/**/*.{js,ts,jsx,tsx,mdx}",
"./src/app/**/*.{js,ts,jsx,tsx,mdx}", "./src/app/**/*.{js,ts,jsx,tsx,mdx}",
], ],
important: "#root", important: "#__next",
theme: { theme: {
extend: { extend: {},
backgroundImage: {
"gradient-radial": "radial-gradient(var(--tw-gradient-stops))",
"gradient-conic":
"conic-gradient(from 180deg at 50% 50%, var(--tw-gradient-stops))",
},
},
}, },
plugins: [], plugins: [],
corePlugins: {
preflight: false,
},
}; };

View File

@@ -18,7 +18,7 @@ log() {
} }
# If the commit has no files, skip everything as there is nothing to format # If the commit has no files, skip everything as there is nothing to format
if [[ ${#commit_files} = 0 ]]; then if [[ -z ${commit_files+x} ]] || [[ ${#commit_files} = 0 ]]; then
log "no files to format" log "no files to format"
exit 0 exit 0
fi fi

View File

@@ -1,4 +1,7 @@
{ ... }: { {
imports = [
./python-project/flake-module.nix
];
flake.templates = { flake.templates = {
new-clan = { new-clan = {
description = "Initialize a new clan flake"; description = "Initialize a new clan flake";