Merge pull request 'Add multiline-hidden-prompt' (#3632) from multiline-hidden-prompt into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/3632
Reviewed-by: hsjobeki <hsjobeki@gmail.com>
Reviewed-by: lopter <louis@opter.org>
This commit is contained in:
Mic92
2025-05-15 08:10:34 +00:00
13 changed files with 147 additions and 50 deletions

View File

@@ -364,11 +364,13 @@ in
- hidden: A hidden text (e.g. password) - hidden: A hidden text (e.g. password)
- line: A single line of text - line: A single line of text
- multiline: A multiline text - multiline: A multiline text
- multiline-hidden: A multiline text
''; '';
type = enum [ type = enum [
"hidden" "hidden"
"line" "line"
"multiline" "multiline"
"multiline-hidden"
]; ];
default = "line"; default = "line";
}; };

View File

@@ -91,7 +91,7 @@ def flash_machine(
"users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}} "users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}}
} }
for generator in machine.vars_generators: for generator in machine.vars_generators():
for file in generator.files: for file in generator.files:
if file.needed_for == "partitioning": if file.needed_for == "partitioning":
msg = f"Partitioning time secrets are not supported with `clan flash write`: clan.core.vars.generators.{generator.name}.files.{file.name}" msg = f"Partitioning time secrets are not supported with `clan flash write`: clan.core.vars.generators.{generator.name}.files.{file.name}"

View File

@@ -123,7 +123,6 @@ class Machine:
return self.deployment["facts"]["services"] return self.deployment["facts"]["services"]
return {} return {}
@property
def vars_generators(self) -> list["Generator"]: def vars_generators(self) -> list["Generator"]:
from clan_cli.vars.generate import Generator from clan_cli.vars.generate import Generator

View File

@@ -107,7 +107,7 @@ def test_add_module_to_inventory(
generator = None generator = None
for gen in machine.vars_generators: for gen in machine.vars_generators():
if gen.name == "borgbackup": if gen.name == "borgbackup":
generator = gen generator = gen
break break

View File

@@ -33,7 +33,7 @@ def vars_status(machine: Machine, generator_name: None | str = None) -> VarStatu
# signals if a var needs to be updated (eg. needs re-encryption due to new users added) # signals if a var needs to be updated (eg. needs re-encryption due to new users added)
unfixed_secret_vars = [] unfixed_secret_vars = []
invalid_generators = [] invalid_generators = []
generators = machine.vars_generators generators = machine.vars_generators()
if generator_name: if generator_name:
for generator in generators: for generator in generators:
if generator_name == generator.name: if generator_name == generator.name:

View File

@@ -9,7 +9,7 @@ log = logging.getLogger(__name__)
def fix_vars(machine: Machine, generator_name: None | str = None) -> None: def fix_vars(machine: Machine, generator_name: None | str = None) -> None:
generators = machine.vars_generators generators = machine.vars_generators()
if generator_name: if generator_name:
for generator in generators: for generator in generators:
if generator_name == generator.name: if generator_name == generator.name:

View File

@@ -132,7 +132,7 @@ def decrypt_dependencies(
decrypted_dependencies: dict[str, Any] = {} decrypted_dependencies: dict[str, Any] = {}
for generator_name in set(generator.dependencies): for generator_name in set(generator.dependencies):
decrypted_dependencies[generator_name] = {} decrypted_dependencies[generator_name] = {}
for dep_generator in machine.vars_generators: for dep_generator in machine.vars_generators():
if generator_name == dep_generator.name: if generator_name == dep_generator.name:
break break
else: else:
@@ -320,7 +320,7 @@ def get_closure(
) -> list[Generator]: ) -> list[Generator]:
from .graph import all_missing_closure, full_closure from .graph import all_missing_closure, full_closure
vars_generators = machine.vars_generators vars_generators = machine.vars_generators()
generators: dict[str, Generator] = { generators: dict[str, Generator] = {
generator.name: generator for generator in vars_generators generator.name: generator for generator in vars_generators
} }
@@ -399,7 +399,7 @@ def generate_vars_for_machine(
machine = Machine(name=machine_name, flake=Flake(str(base_dir))) machine = Machine(name=machine_name, flake=Flake(str(base_dir)))
generators_set = set(generators) generators_set = set(generators)
generators_ = [g for g in machine.vars_generators if g.name in generators_set] generators_ = [g for g in machine.vars_generators() if g.name in generators_set]
return _generate_vars_for_machine( return _generate_vars_for_machine(
machine=machine, machine=machine,
@@ -417,7 +417,7 @@ def generate_vars_for_machine_interactive(
) -> bool: ) -> bool:
_generator = None _generator = None
if generator_name: if generator_name:
for generator in machine.vars_generators: for generator in machine.vars_generators():
if generator.name == generator_name: if generator.name == generator_name:
_generator = generator _generator = generator
break break

View File

@@ -19,7 +19,7 @@ def get_vars(base_dir: str, machine_name: str) -> list[Var]:
pub_store = machine.public_vars_store pub_store = machine.public_vars_store
sec_store = machine.secret_vars_store sec_store = machine.secret_vars_store
all_vars = [] all_vars = []
for generator in machine.vars_generators: for generator in machine.vars_generators():
for var in generator.files: for var in generator.files:
if var.secret: if var.secret:
var.store(sec_store) var.store(sec_store)
@@ -50,7 +50,7 @@ def _get_previous_value(
@API.register @API.register
def get_generators(base_dir: str, machine_name: str) -> list[Generator]: def get_generators(base_dir: str, machine_name: str) -> list[Generator]:
machine = Machine(name=machine_name, flake=Flake(base_dir)) machine = Machine(name=machine_name, flake=Flake(base_dir))
generators: list[Generator] = machine.vars_generators generators: list[Generator] = machine.vars_generators()
for generator in generators: for generator in generators:
for prompt in generator.prompts: for prompt in generator.prompts:
prompt.previous_value = _get_previous_value(machine, generator, prompt) prompt.previous_value = _get_previous_value(machine, generator, prompt)
@@ -66,7 +66,7 @@ def set_prompts(
) -> None: ) -> None:
machine = Machine(name=machine_name, flake=Flake(base_dir)) machine = Machine(name=machine_name, flake=Flake(base_dir))
for update in updates: for update in updates:
for generator in machine.vars_generators: for generator in machine.vars_generators():
if generator.name == update.generator: if generator.name == update.generator:
break break
else: else:

View File

@@ -1,10 +1,14 @@
import enum import enum
import logging import logging
import sys import sys
import termios
import tty
from dataclasses import dataclass from dataclasses import dataclass
from getpass import getpass from getpass import getpass
from typing import Any from typing import Any
from clan_cli.errors import ClanError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# This is for simulating user input in tests. # This is for simulating user input in tests.
@@ -15,6 +19,7 @@ class PromptType(enum.Enum):
LINE = "line" LINE = "line"
HIDDEN = "hidden" HIDDEN = "hidden"
MULTILINE = "multiline" MULTILINE = "multiline"
MULTILINE_HIDDEN = "multiline-hidden"
@dataclass @dataclass
@@ -36,6 +41,63 @@ class Prompt:
) )
def get_multiline_hidden_input() -> str:
"""
Get multiline input from the user without echoing the input.
This function allows the user to enter multiple lines of text,
and it will return the concatenated string of all lines entered.
The user can finish the input by pressing Ctrl-D (EOF).
"""
# Save terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
lines = []
current_line: list[str] = []
try:
# Change terminal settings - disable echo
tty.setraw(fd)
while True:
char = sys.stdin.read(1)
# Check for Ctrl-D (ASCII value 4 or EOF)
if not char or ord(char) == 4:
# Add last line if not empty
if current_line:
lines.append("".join(current_line))
break
# Check for Ctrl-C (KeyboardInterrupt)
if ord(char) == 3:
raise KeyboardInterrupt
# Handle Enter key
if char == "\r" or char == "\n":
lines.append("".join(current_line))
current_line = []
# Print newline for visual feedback
sys.stdout.write("\r\n")
sys.stdout.flush()
# Handle backspace
elif ord(char) == 127 or ord(char) == 8:
if current_line:
current_line.pop()
# Regular character
else:
current_line.append(char)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
# Print a final newline for clean display
print()
return "\n".join(lines)
def ask( def ask(
ident: str, ident: str,
input_type: PromptType, input_type: PromptType,
@@ -47,14 +109,23 @@ def ask(
log.info(f"Prompting value for {ident}") log.info(f"Prompting value for {ident}")
if MOCK_PROMPT_RESPONSE: if MOCK_PROMPT_RESPONSE:
return next(MOCK_PROMPT_RESPONSE) return next(MOCK_PROMPT_RESPONSE)
try:
match input_type: match input_type:
case PromptType.LINE: case PromptType.LINE:
result = input(f"{text}: ") result = input(f"{text}: ")
case PromptType.MULTILINE: case PromptType.MULTILINE:
print(f"{text} (Finish with Ctrl-D): ") print(f"{text} (Finish with Ctrl-D): ")
result = sys.stdin.read() result = sys.stdin.read()
case PromptType.MULTILINE_HIDDEN:
print(
"Enter multiple lines (press Ctrl-D to finish or Ctrl-C to cancel):"
)
result = get_multiline_hidden_input()
case PromptType.HIDDEN: case PromptType.HIDDEN:
result = getpass(f"{text} (hidden): ") result = getpass(f"{text} (hidden): ")
except KeyboardInterrupt as e:
msg = "User cancelled the input."
raise ClanError(msg) from e
log.info("Input received. Processing...") log.info("Input received. Processing...")
return result return result

View File

@@ -141,7 +141,7 @@ class SecretStore(StoreBase):
hashes.sort() hashes.sort()
manifest = [] manifest = []
for generator in self.machine.vars_generators: for generator in self.machine.vars_generators():
for file in generator.files: for file in generator.files:
manifest.append(f"{generator.name}/{file.name}".encode()) manifest.append(f"{generator.name}/{file.name}".encode())
manifest += hashes manifest += hashes
@@ -165,11 +165,12 @@ class SecretStore(StoreBase):
return local_hash.decode() != remote_hash return local_hash.decode() != remote_hash
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
vars_generators = self.machine.vars_generators()
if "users" in phases: if "users" in phases:
with tarfile.open( with tarfile.open(
output_dir / "secrets_for_users.tar.gz", "w:gz" output_dir / "secrets_for_users.tar.gz", "w:gz"
) as user_tar: ) as user_tar:
for generator in self.machine.vars_generators: for generator in vars_generators:
dir_exists = False dir_exists = False
for file in generator.files: for file in generator.files:
if not file.deploy: if not file.deploy:
@@ -184,7 +185,7 @@ class SecretStore(StoreBase):
if "services" in phases: if "services" in phases:
with tarfile.open(output_dir / "secrets.tar.gz", "w:gz") as tar: with tarfile.open(output_dir / "secrets.tar.gz", "w:gz") as tar:
for generator in self.machine.vars_generators: for generator in vars_generators:
dir_exists = False dir_exists = False
for file in generator.files: for file in generator.files:
if not file.deploy: if not file.deploy:
@@ -205,7 +206,7 @@ class SecretStore(StoreBase):
tar_file.gname = file.group tar_file.gname = file.group
tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content)) tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content))
if "activation" in phases: if "activation" in phases:
for generator in self.machine.vars_generators: for generator in vars_generators:
for file in generator.files: for file in generator.files:
if file.needed_for == "activation": if file.needed_for == "activation":
out_file = ( out_file = (
@@ -214,7 +215,7 @@ class SecretStore(StoreBase):
out_file.parent.mkdir(parents=True, exist_ok=True) out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_bytes(self.get(generator, file.name)) out_file.write_bytes(self.get(generator, file.name))
if "partitioning" in phases: if "partitioning" in phases:
for generator in self.machine.vars_generators: for generator in vars_generators:
for file in generator.files: for file in generator.files:
if file.needed_for == "partitioning": if file.needed_for == "partitioning":
out_file = ( out_file = (

View File

@@ -51,10 +51,11 @@ class SecretStore(StoreBase):
self.machine = machine self.machine = machine
# no need to generate keys if we don't manage secrets # no need to generate keys if we don't manage secrets
if not self.machine.vars_generators: vars_generators = self.machine.vars_generators()
if not vars_generators:
return return
has_secrets = False has_secrets = False
for generator in self.machine.vars_generators: for generator in vars_generators:
for file in generator.files: for file in generator.files:
if file.secret: if file.secret:
has_secrets = True has_secrets = True
@@ -108,7 +109,7 @@ class SecretStore(StoreBase):
""" """
if generator is None: if generator is None:
generators = self.machine.vars_generators generators = self.machine.vars_generators()
else: else:
generators = [generator] generators = [generator]
file_found = False file_found = False
@@ -179,6 +180,7 @@ class SecretStore(StoreBase):
return [store_folder] return [store_folder]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
vars_generators = self.machine.vars_generators()
if "users" in phases or "services" in phases: if "users" in phases or "services" in phases:
key_name = f"{self.machine.name}-age.key" key_name = f"{self.machine.name}-age.key"
if not has_secret(sops_secrets_folder(self.machine.flake_dir) / key_name): if not has_secret(sops_secrets_folder(self.machine.flake_dir) / key_name):
@@ -192,7 +194,7 @@ class SecretStore(StoreBase):
(output_dir / "key.txt").write_text(key) (output_dir / "key.txt").write_text(key)
if "activation" in phases: if "activation" in phases:
for generator in self.machine.vars_generators: for generator in vars_generators:
for file in generator.files: for file in generator.files:
if file.needed_for == "activation": if file.needed_for == "activation":
target_path = ( target_path = (
@@ -208,7 +210,7 @@ class SecretStore(StoreBase):
target_path.chmod(file.mode) target_path.chmod(file.mode)
if "partitioning" in phases: if "partitioning" in phases:
for generator in self.machine.vars_generators: for generator in vars_generators:
for file in generator.files: for file in generator.files:
if file.needed_for == "partitioning": if file.needed_for == "partitioning":
target_path = output_dir / generator.name / file.name target_path = output_dir / generator.name / file.name
@@ -284,7 +286,7 @@ class SecretStore(StoreBase):
from clan_cli.secrets.secrets import update_keys from clan_cli.secrets.secrets import update_keys
if generator is None: if generator is None:
generators = self.machine.vars_generators generators = self.machine.vars_generators()
else: else:
generators = [generator] generators = [generator]
file_found = False file_found = False

View File

@@ -33,19 +33,19 @@ def set_var(machine: str | Machine, var: str | Var, value: bytes, flake: Flake)
) )
def set_via_stdin(machine: str, var_id: str, flake: Flake) -> None: def set_via_stdin(machine_name: str, var_id: str, flake: Flake) -> None:
var = get_var(str(flake.path), machine, var_id) machine = Machine(name=machine_name, flake=flake)
var = get_var(str(flake.path), machine_name, var_id)
if sys.stdin.isatty(): if sys.stdin.isatty():
new_value = ask( new_value = ask(
var.id, var.id,
PromptType.HIDDEN, PromptType.MULTILINE_HIDDEN,
None, None,
).encode("utf-8") ).encode("utf-8")
else: else:
new_value = sys.stdin.buffer.read() new_value = sys.stdin.buffer.read()
_machine = Machine(name=machine, flake=flake) set_var(machine, var, new_value, flake)
set_var(_machine, var, new_value, flake)
def _set_command(args: argparse.Namespace) -> None: def _set_command(args: argparse.Namespace) -> None:

View File

@@ -1,9 +1,9 @@
import { callApi } from "@/src/api"; import { callApi } from "@/src/api";
import { import {
createForm, createForm,
FieldValues,
SubmitHandler, SubmitHandler,
validate, validate,
FieldValues,
} from "@modular-forms/solid"; } from "@modular-forms/solid";
import { createQuery, useQueryClient } from "@tanstack/solid-query"; import { createQuery, useQueryClient } from "@tanstack/solid-query";
import { Typography } from "@/src/components/Typography"; import { Typography } from "@/src/components/Typography";
@@ -113,9 +113,14 @@ export const VarsStep = (props: VarsStepProps) => {
</Typography> </Typography>
{/* Avoid nesting issue in case of a "." */} {/* Avoid nesting issue in case of a "." */}
<Field <Field
name={`${generator.name.replaceAll(".", "__dot__")}.${prompt.name.replaceAll(".", "__dot__")}`} name={`${generator.name.replaceAll(
".",
"__dot__",
)}.${prompt.name.replaceAll(".", "__dot__")}`}
> >
{(field, props) => ( {(field, props) => (
<Switch
fallback={
<TextInput <TextInput
inputProps={{ inputProps={{
...props, ...props,
@@ -128,6 +133,23 @@ export const VarsStep = (props: VarsStepProps) => {
value={prompt.previous_value ?? ""} value={prompt.previous_value ?? ""}
error={field.error} error={field.error}
/> />
}
>
<Match
when={
prompt.prompt_type === "multiline" ||
prompt.prompt_type === "multiline-hidden"
}
>
<textarea
{...props}
class="w-full h-32 border border-gray-300 rounded-md p-2"
placeholder={prompt.description}
value={prompt.previous_value ?? ""}
name={prompt.description}
/>
</Match>
</Switch>
)} )}
</Field> </Field>
</Group> </Group>