Refactor select with new maybe selector
This is a great refactor of the select functionality in the flake class. This now uses the same parser as the nix code, but runs it in python for nice stacktraces. Also we now have a maybe selector which can be used by prepending the selector with a ? Tests have been expanded to make sure the code is more stable and easier to understand
This commit is contained in:
16
flake.lock
generated
16
flake.lock
generated
@@ -89,6 +89,21 @@
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nix-select": {
|
||||
"locked": {
|
||||
"lastModified": 1745005516,
|
||||
"narHash": "sha256-IVaoOGDIvAa/8I0sdiiZuKptDldrkDWUNf/+ezIRhyc=",
|
||||
"ref": "refs/heads/main",
|
||||
"rev": "69d8bf596194c5c35a4e90dd02c52aa530caddf8",
|
||||
"revCount": 40,
|
||||
"type": "git",
|
||||
"url": "https://git.clan.lol/clan/nix-select"
|
||||
},
|
||||
"original": {
|
||||
"type": "git",
|
||||
"url": "https://git.clan.lol/clan/nix-select"
|
||||
}
|
||||
},
|
||||
"nixos-facter-modules": {
|
||||
"locked": {
|
||||
"lastModified": 1743671943,
|
||||
@@ -123,6 +138,7 @@
|
||||
"disko": "disko",
|
||||
"flake-parts": "flake-parts",
|
||||
"nix-darwin": "nix-darwin",
|
||||
"nix-select": "nix-select",
|
||||
"nixos-facter-modules": "nixos-facter-modules",
|
||||
"nixpkgs": "nixpkgs",
|
||||
"sops-nix": "sops-nix",
|
||||
|
||||
@@ -23,6 +23,8 @@
|
||||
treefmt-nix.url = "github:numtide/treefmt-nix";
|
||||
treefmt-nix.inputs.nixpkgs.follows = "nixpkgs";
|
||||
|
||||
nix-select.url = "git+https://git.clan.lol/clan/nix-select";
|
||||
|
||||
data-mesher = {
|
||||
url = "git+https://git.clan.lol/clan/data-mesher";
|
||||
inputs = {
|
||||
|
||||
@@ -30,6 +30,6 @@ lib.fix (clanLib: {
|
||||
# Plain imports.
|
||||
values = import ./introspection { inherit lib; };
|
||||
jsonschema = import ./jsonschema { inherit lib; };
|
||||
select = import select/default.nix;
|
||||
select = self.inputs.nix-select.lib;
|
||||
facts = import ./facts.nix { inherit lib; };
|
||||
})
|
||||
|
||||
@@ -11,7 +11,6 @@ rec {
|
||||
./introspection/flake-module.nix
|
||||
./inventory/flake-module.nix
|
||||
./jsonschema/flake-module.nix
|
||||
./select/flake-module.nix
|
||||
];
|
||||
flake.clanLib = import ./default.nix {
|
||||
inherit lib inputs self;
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
let
|
||||
recursiveSelect =
|
||||
selectorIndex: selectorList: target:
|
||||
let
|
||||
selector = builtins.elemAt selectorList selectorIndex;
|
||||
in
|
||||
|
||||
# selector is empty, we are done
|
||||
if selectorIndex + 1 > builtins.length selectorList then
|
||||
target
|
||||
|
||||
else if builtins.isList target then
|
||||
# support bla.* for lists and recurse into all elements
|
||||
if selector == "*" then
|
||||
builtins.map (v: recursiveSelect (selectorIndex + 1) selectorList v) target
|
||||
# support bla.3 for lists and recurse into the 4th element
|
||||
else if (builtins.match "[[:digit:]]*" selector) == [ ] then
|
||||
recursiveSelect (selectorIndex + 1) selectorList (
|
||||
builtins.elemAt target (builtins.fromJSON selector)
|
||||
)
|
||||
else
|
||||
throw "only * or a number is allowed in list selector"
|
||||
|
||||
else if builtins.isAttrs target then
|
||||
# handle the case bla.x.*.z where x is an attrset and we recurse into all elements
|
||||
if selector == "*" then
|
||||
builtins.mapAttrs (_: v: recursiveSelect (selectorIndex + 1) selectorList v) target
|
||||
# support bla.{x,y,z}.world where we get world from each of x, y and z
|
||||
else if (builtins.match ''^\{([^}]*)}$'' selector) != null then
|
||||
let
|
||||
attrsAsList = (
|
||||
builtins.filter (x: !builtins.isList x) (
|
||||
builtins.split "," (builtins.head (builtins.match ''^\{([^}]*)}$'' selector))
|
||||
)
|
||||
);
|
||||
dummyAttrSet = builtins.listToAttrs (
|
||||
map (x: {
|
||||
name = x;
|
||||
value = null;
|
||||
}) attrsAsList
|
||||
);
|
||||
filteredAttrs = builtins.intersectAttrs dummyAttrSet target;
|
||||
in
|
||||
builtins.mapAttrs (_: v: recursiveSelect (selectorIndex + 1) selectorList v) filteredAttrs
|
||||
else
|
||||
recursiveSelect (selectorIndex + 1) selectorList (builtins.getAttr selector target)
|
||||
else
|
||||
throw "Expected a list or an attrset";
|
||||
|
||||
parseSelector =
|
||||
selector:
|
||||
let
|
||||
splitByQuote = x: builtins.filter (x: !builtins.isList x) (builtins.split ''"'' x);
|
||||
splitByDot =
|
||||
x:
|
||||
builtins.filter (x: x != "") (
|
||||
map (builtins.replaceStrings [ "." ] [ "" ]) (
|
||||
builtins.filter (x: !builtins.isList x) (builtins.split ''\.'' x)
|
||||
)
|
||||
);
|
||||
handleQuoted =
|
||||
x: if x == [ ] then [ ] else [ (builtins.head x) ] ++ handleUnquoted (builtins.tail x);
|
||||
handleUnquoted =
|
||||
x: if x == [ ] then [ ] else splitByDot (builtins.head x) ++ handleQuoted (builtins.tail x);
|
||||
in
|
||||
handleUnquoted (splitByQuote selector);
|
||||
in
|
||||
selector: target: recursiveSelect 0 (parseSelector selector) target
|
||||
@@ -1,45 +0,0 @@
|
||||
{ self, inputs, ... }:
|
||||
let
|
||||
inputOverrides = builtins.concatStringsSep " " (
|
||||
builtins.map (input: " --override-input ${input} ${inputs.${input}}") (builtins.attrNames inputs)
|
||||
);
|
||||
in
|
||||
{
|
||||
perSystem =
|
||||
{
|
||||
pkgs,
|
||||
lib,
|
||||
system,
|
||||
...
|
||||
}:
|
||||
{
|
||||
# Run: nix-unit --extra-experimental-features flakes --flake .#legacyPackages.x86_64-linux.evalTests
|
||||
legacyPackages.evalTests-select = import ./tests.nix {
|
||||
inherit lib;
|
||||
inherit (self) clanLib;
|
||||
};
|
||||
|
||||
checks = {
|
||||
lib-select-eval = pkgs.runCommand "tests" { nativeBuildInputs = [ pkgs.nix-unit ]; } ''
|
||||
export HOME="$(realpath .)"
|
||||
export NIX_ABORT_ON_WARN=1
|
||||
nix-unit --eval-store "$HOME" \
|
||||
--extra-experimental-features flakes \
|
||||
--show-trace \
|
||||
${inputOverrides} \
|
||||
--flake ${
|
||||
self.filter {
|
||||
include = [
|
||||
"flakeModules"
|
||||
"lib"
|
||||
"clanModules/flake-module.nix"
|
||||
"clanModules/borgbackup"
|
||||
];
|
||||
}
|
||||
}#legacyPackages.${system}.evalTests-select
|
||||
|
||||
touch $out
|
||||
'';
|
||||
};
|
||||
};
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
{ clanLib, ... }:
|
||||
let
|
||||
inherit (clanLib) select;
|
||||
in
|
||||
{
|
||||
test_simple_1 = {
|
||||
expr = select "a" { a = 1; };
|
||||
expected = 1;
|
||||
};
|
||||
}
|
||||
@@ -1,11 +1,11 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from enum import Enum
|
||||
from hashlib import sha1
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Any, cast
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.dirs import user_cache_dir
|
||||
@@ -21,42 +21,212 @@ from clan_cli.nix import (
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AllSelector:
|
||||
pass
|
||||
class SetSelectorType(str, Enum):
|
||||
"""
|
||||
enum for the type of selector in a set.
|
||||
For now this is either a string or a maybe selector.
|
||||
"""
|
||||
|
||||
STR = "str"
|
||||
MAYBE = "maybe"
|
||||
|
||||
|
||||
Selector = str | int | AllSelector | set[int] | set[str]
|
||||
@dataclass
|
||||
class SetSelector:
|
||||
"""
|
||||
This class represents a selector used in a set.
|
||||
type: SetSelectorType = SetSelectorType.STR
|
||||
value: str = ""
|
||||
|
||||
a set looks like this:
|
||||
{key1,key2}
|
||||
"""
|
||||
|
||||
type: SetSelectorType = SetSelectorType.STR
|
||||
value: str = ""
|
||||
|
||||
|
||||
def split_selector(selector: str) -> list[Selector]:
|
||||
class SelectorType(str, Enum):
|
||||
"""
|
||||
enum for the type of a selector
|
||||
this can be all, string, set or maybe
|
||||
"""
|
||||
|
||||
ALL = "all"
|
||||
STR = "str"
|
||||
SET = "set"
|
||||
MAYBE = "maybe"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Selector:
|
||||
"""
|
||||
A class to represent a selector, which selects nix elements one level down.
|
||||
consists of a SelectorType and a value.
|
||||
|
||||
if the type is all, no value is needed, since it selects all elements.
|
||||
if the type is str, the value is a string, which is the key in a dict.
|
||||
if the type is maybe the value is a string, which is the key in a dict.
|
||||
if the type is set, the value is a list of SetSelector objects.
|
||||
"""
|
||||
|
||||
type: SelectorType = SelectorType.STR
|
||||
value: str | list[SetSelector] | None = None
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
if self.type == SelectorType.SET:
|
||||
assert isinstance(self.value, list)
|
||||
return {
|
||||
"type": self.type.value,
|
||||
"value": [asdict(selector) for selector in self.value],
|
||||
}
|
||||
if self.type == SelectorType.ALL:
|
||||
return {"type": self.type.value}
|
||||
if self.type == SelectorType.STR:
|
||||
assert isinstance(self.value, str)
|
||||
return {"type": self.type.value, "value": self.value}
|
||||
if self.type == SelectorType.MAYBE:
|
||||
assert isinstance(self.value, str)
|
||||
return {"type": self.type.value, "value": self.value}
|
||||
msg = f"Invalid selector type: {self.type}"
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
def selectors_as_dict(selectors: list[Selector]) -> list[dict[str, Any]]:
|
||||
return [selector.as_dict() for selector in selectors]
|
||||
|
||||
|
||||
def selectors_as_json(selectors: list[Selector]) -> str:
|
||||
return json.dumps(selectors_as_dict(selectors))
|
||||
|
||||
|
||||
def parse_selector(selector: str) -> list[Selector]:
|
||||
"""
|
||||
takes a string and returns a list of selectors.
|
||||
|
||||
a selector can be:
|
||||
- a string, which is a key in a dict
|
||||
- an integer, which is an index in a list
|
||||
- a set of strings, which are keys in a dict
|
||||
- a set of integers, which are indices in a list
|
||||
- a quoted string, which is a key in a dict
|
||||
- a set of strings or integers, which are keys in a dict or indices in a list.
|
||||
- the string "*", which selects all elements in a list or dict
|
||||
"""
|
||||
pattern = r'"[^"]*"|[^.]+'
|
||||
matches = re.findall(pattern, selector)
|
||||
|
||||
# Extract the matched groups (either quoted or unquoted parts)
|
||||
stack: list[str] = []
|
||||
selectors: list[Selector] = []
|
||||
for selector in matches:
|
||||
if selector == "*":
|
||||
selectors.append(AllSelector())
|
||||
elif selector.isdigit():
|
||||
selectors.append({int(selector)})
|
||||
elif selector.startswith("{") and selector.endswith("}"):
|
||||
sub_selectors = set(selector[1:-1].split(","))
|
||||
selectors.append(sub_selectors)
|
||||
elif selector.startswith('"') and selector.endswith('"'):
|
||||
selectors.append(selector[1:-1])
|
||||
acc_str: str = ""
|
||||
|
||||
# only used by set for now
|
||||
submode = ""
|
||||
acc_selectors: list[SetSelector] = []
|
||||
|
||||
for i in range(len(selector)):
|
||||
c = selector[i]
|
||||
if stack == []:
|
||||
mode = "start"
|
||||
else:
|
||||
selectors.append(selector)
|
||||
mode = stack[-1]
|
||||
|
||||
if mode == "end":
|
||||
if c == ".":
|
||||
stack.pop()
|
||||
if stack != []:
|
||||
msg = "expected empy stack, but got {stack}"
|
||||
raise ValueError(msg)
|
||||
else:
|
||||
msg = "expected ., but got {c}"
|
||||
raise ValueError(msg)
|
||||
|
||||
elif mode == "start":
|
||||
if c == "*":
|
||||
stack.append("end")
|
||||
selectors.append(Selector(type=SelectorType.ALL))
|
||||
elif c == "?":
|
||||
stack.append("maybe")
|
||||
elif c == '"':
|
||||
stack += ["str", "quote"]
|
||||
elif c == "{":
|
||||
stack.append("set")
|
||||
elif c == ".":
|
||||
selectors.append(Selector(type=SelectorType.STR, value=acc_str))
|
||||
else:
|
||||
stack.append("str")
|
||||
acc_str += c
|
||||
|
||||
elif mode == "set":
|
||||
if submode == "" and c == "?":
|
||||
submode = "maybe"
|
||||
elif c == "\\":
|
||||
stack.append("escape")
|
||||
if submode == "":
|
||||
submode = "str"
|
||||
elif c == '"':
|
||||
stack.append("quote")
|
||||
if submode == "":
|
||||
submode = "str"
|
||||
elif c == ",":
|
||||
if submode == "maybe":
|
||||
set_select_type = SetSelectorType.MAYBE
|
||||
else:
|
||||
set_select_type = SetSelectorType.STR
|
||||
acc_selectors.append(SetSelector(type=set_select_type, value=acc_str))
|
||||
submode = ""
|
||||
acc_str = ""
|
||||
elif c == "}":
|
||||
if submode == "maybe":
|
||||
set_select_type = SetSelectorType.MAYBE
|
||||
else:
|
||||
set_select_type = SetSelectorType.STR
|
||||
acc_selectors.append(SetSelector(type=set_select_type, value=acc_str))
|
||||
selectors.append(Selector(type=SelectorType.SET, value=acc_selectors))
|
||||
|
||||
submode = ""
|
||||
acc_selectors = []
|
||||
|
||||
acc_str = ""
|
||||
stack.pop()
|
||||
stack.append("end")
|
||||
else:
|
||||
acc_str += c
|
||||
if submode == "":
|
||||
submode = "str"
|
||||
|
||||
elif mode == "quote":
|
||||
if c == '"':
|
||||
stack.pop()
|
||||
elif c == "\\":
|
||||
stack.append("escape")
|
||||
else:
|
||||
acc_str += c
|
||||
|
||||
elif mode == "escape":
|
||||
stack.pop()
|
||||
acc_str += c
|
||||
|
||||
elif mode == "str" or mode == "maybe":
|
||||
if c == ".":
|
||||
stack.pop()
|
||||
if mode == "maybe":
|
||||
select_type = SelectorType.MAYBE
|
||||
else:
|
||||
select_type = SelectorType.STR
|
||||
selectors.append(Selector(type=select_type, value=acc_str))
|
||||
acc_str = ""
|
||||
elif c == "\\":
|
||||
stack.append("escape")
|
||||
else:
|
||||
acc_str += c
|
||||
|
||||
if stack != []:
|
||||
if stack[-1] == "str" or stack[-1] == "maybe":
|
||||
if stack[-1] == "maybe":
|
||||
select_type = SelectorType.MAYBE
|
||||
else:
|
||||
select_type = SelectorType.STR
|
||||
selectors.append(Selector(type=select_type, value=acc_str))
|
||||
elif stack[-1] == "end":
|
||||
pass
|
||||
else:
|
||||
msg = f"expected empty stack, but got {stack}"
|
||||
raise ValueError(msg)
|
||||
|
||||
return selectors
|
||||
|
||||
@@ -64,94 +234,18 @@ def split_selector(selector: str) -> list[Selector]:
|
||||
@dataclass
|
||||
class FlakeCacheEntry:
|
||||
"""
|
||||
a recursive structure to store the cache, with a value and a selector
|
||||
a recursive structure to store the cache.
|
||||
consists of a dict with the keys being the selectors and the values being FlakeCacheEntry objects.
|
||||
|
||||
is_list is used to check if the value is a list.
|
||||
exists is used to check if the value exists, which can be false if it was selected via maybe.
|
||||
fetched_all is used to check if we have all keys on the current level.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: str | float | dict[str, Any] | list[Any] | None,
|
||||
selectors: list[Selector],
|
||||
is_out_path: bool = False,
|
||||
) -> None:
|
||||
self.value: str | float | int | None | dict[str | int, FlakeCacheEntry]
|
||||
self.selector: set[int] | set[str] | AllSelector
|
||||
selector: Selector = AllSelector()
|
||||
|
||||
if selectors == []:
|
||||
self.selector = AllSelector()
|
||||
elif isinstance(selectors[0], set):
|
||||
self.selector = selectors[0]
|
||||
selector = selectors[0]
|
||||
elif isinstance(selectors[0], int):
|
||||
self.selector = {int(selectors[0])}
|
||||
selector = int(selectors[0])
|
||||
elif isinstance(selectors[0], str):
|
||||
self.selector = {selectors[0]}
|
||||
selector = selectors[0]
|
||||
elif isinstance(selectors[0], AllSelector):
|
||||
self.selector = AllSelector()
|
||||
|
||||
if is_out_path:
|
||||
if selectors != []:
|
||||
msg = "Cannot index outPath"
|
||||
raise ValueError(msg)
|
||||
if not isinstance(value, str):
|
||||
msg = "outPath must be a string"
|
||||
raise ValueError(msg)
|
||||
self.value = value
|
||||
|
||||
elif isinstance(selector, str):
|
||||
self.value = {selector: FlakeCacheEntry(value, selectors[1:])}
|
||||
|
||||
elif isinstance(value, dict):
|
||||
if isinstance(self.selector, set):
|
||||
if not all(isinstance(v, str) for v in self.selector):
|
||||
msg = "Cannot index dict with non-str set"
|
||||
raise ValueError(msg)
|
||||
self.value = {}
|
||||
for key, value_ in value.items():
|
||||
if key == "outPath":
|
||||
self.value[key] = FlakeCacheEntry(
|
||||
value_, selectors[1:], is_out_path=True
|
||||
)
|
||||
else:
|
||||
self.value[key] = FlakeCacheEntry(value_, selectors[1:])
|
||||
|
||||
elif isinstance(value, list):
|
||||
if isinstance(selector, int):
|
||||
if len(value) != 1:
|
||||
msg = "Cannot index list with int selector when value is not singleton"
|
||||
raise ValueError(msg)
|
||||
self.value = {
|
||||
int(selector): FlakeCacheEntry(value[0], selectors[1:]),
|
||||
}
|
||||
if isinstance(selector, set):
|
||||
if all(isinstance(v, int) for v in selector):
|
||||
self.value = {}
|
||||
for i, v in enumerate([selector]):
|
||||
assert isinstance(v, int)
|
||||
self.value[int(v)] = FlakeCacheEntry(value[i], selectors[1:])
|
||||
else:
|
||||
msg = "Cannot index list with non-int set"
|
||||
raise ValueError(msg)
|
||||
elif isinstance(self.selector, AllSelector):
|
||||
self.value = {}
|
||||
for i, v in enumerate(value):
|
||||
if isinstance(v, dict | list | str | float | int):
|
||||
self.value[i] = FlakeCacheEntry(v, selectors[1:])
|
||||
else:
|
||||
msg = f"expected integer selector or all for type list, but got {type(selector)}"
|
||||
raise TypeError(msg)
|
||||
|
||||
elif isinstance(value, str) and value.startswith("/nix/store/"):
|
||||
self.value = {}
|
||||
self.selector = self.selector = {"outPath"}
|
||||
self.value["outPath"] = FlakeCacheEntry(
|
||||
value, selectors[1:], is_out_path=True
|
||||
)
|
||||
|
||||
elif isinstance(value, (str | float | int | None)):
|
||||
self.value = value
|
||||
value: str | float | dict[str, Any] | None = field(default_factory=dict)
|
||||
is_list: bool = False
|
||||
exists: bool = True
|
||||
fetched_all: bool = False
|
||||
|
||||
def insert(
|
||||
self,
|
||||
@@ -159,175 +253,236 @@ class FlakeCacheEntry:
|
||||
selectors: list[Selector],
|
||||
) -> None:
|
||||
selector: Selector
|
||||
# if we have no more selectors, it means we select all keys from now one and futher down
|
||||
if selectors == []:
|
||||
selector = AllSelector()
|
||||
selector = Selector(type=SelectorType.ALL)
|
||||
else:
|
||||
selector = selectors[0]
|
||||
|
||||
if isinstance(selector, str):
|
||||
if isinstance(self.value, dict):
|
||||
if selector in self.value:
|
||||
self.value[selector].insert(value, selectors[1:])
|
||||
else:
|
||||
self.value[selector] = FlakeCacheEntry(value, selectors[1:])
|
||||
return
|
||||
msg = f"Cannot insert {selector} into non dict value"
|
||||
raise TypeError(msg)
|
||||
# first we find out if we have all subkeys already
|
||||
|
||||
if isinstance(selector, AllSelector):
|
||||
self.selector = AllSelector()
|
||||
elif isinstance(self.selector, set) and isinstance(selector, set):
|
||||
if all(isinstance(v, str) for v in self.selector) and all(
|
||||
isinstance(v, str) for v in selector
|
||||
):
|
||||
selector = cast(set[str], selector)
|
||||
self.selector = cast(set[str], self.selector)
|
||||
self.selector = self.selector.union(selector)
|
||||
elif all(isinstance(v, int) for v in self.selector) and all(
|
||||
isinstance(v, int) for v in selector
|
||||
):
|
||||
selector = cast(set[int], selector)
|
||||
self.selector = cast(set[int], self.selector)
|
||||
self.selector = self.selector.union(selector)
|
||||
else:
|
||||
msg = "Cannot union set of different types"
|
||||
raise ValueError(msg)
|
||||
elif isinstance(self.selector, set) and isinstance(selector, int):
|
||||
if all(isinstance(v, int) for v in self.selector):
|
||||
self.selector = cast(set[int], self.selector)
|
||||
self.selector.add(selector)
|
||||
if self.fetched_all:
|
||||
pass
|
||||
elif selector.type == SelectorType.ALL:
|
||||
self.fetched_all = True
|
||||
|
||||
elif isinstance(self.selector, set) and isinstance(selector, str):
|
||||
if all(isinstance(v, str) for v in self.selector):
|
||||
self.selector = cast(set[str], self.selector)
|
||||
self.selector.add(selector)
|
||||
# if we have a string selector, that means we are usually on a dict or a list, since we cannot walk down scalar values
|
||||
# so we passthrough the value to the next level
|
||||
if selector.type == SelectorType.STR:
|
||||
assert isinstance(selector.value, str)
|
||||
assert isinstance(self.value, dict)
|
||||
if selector.value not in self.value:
|
||||
self.value[selector.value] = FlakeCacheEntry()
|
||||
self.value[selector.value].insert(value, selectors[1:])
|
||||
|
||||
else:
|
||||
msg = f"Cannot insert {selector} into {self.selector}"
|
||||
raise TypeError(msg)
|
||||
# if we get a MAYBE, check if the selector is in the output, if not we create a entry with exists = False
|
||||
# otherwise we just insert the value into the current dict
|
||||
# we can skip creating the non existing entry if we already fetched all keys
|
||||
elif selector.type == SelectorType.MAYBE:
|
||||
assert isinstance(self.value, dict)
|
||||
assert isinstance(value, dict)
|
||||
assert isinstance(selector.value, str)
|
||||
if selector.value in value:
|
||||
if selector.value not in self.value:
|
||||
self.value[selector.value] = FlakeCacheEntry()
|
||||
self.value[selector.value].insert(value[selector.value], selectors[1:])
|
||||
elif not self.fetched_all:
|
||||
if selector.value not in self.value:
|
||||
self.value[selector.value] = FlakeCacheEntry()
|
||||
self.value[selector.value].exists = False
|
||||
|
||||
if isinstance(self.value, dict) and isinstance(value, dict):
|
||||
# insert a dict is pretty straight forward
|
||||
elif isinstance(value, dict):
|
||||
assert isinstance(self.value, dict)
|
||||
for key, value_ in value.items():
|
||||
if key in self.value:
|
||||
if key not in self.value:
|
||||
self.value[key] = FlakeCacheEntry()
|
||||
self.value[key].insert(value_, selectors[1:])
|
||||
else:
|
||||
self.value[key] = FlakeCacheEntry(value_, selectors[1:])
|
||||
|
||||
elif isinstance(self.value, dict) and isinstance(value, list):
|
||||
if isinstance(selector, set):
|
||||
if not all(isinstance(v, int) for v in selector):
|
||||
msg = "Cannot list with non-int set"
|
||||
raise ValueError(msg)
|
||||
for realindex, requested_index in enumerate(selector):
|
||||
assert isinstance(requested_index, int)
|
||||
if requested_index in self.value:
|
||||
self.value[requested_index].insert(
|
||||
value[realindex], selectors[1:]
|
||||
)
|
||||
elif isinstance(selector, AllSelector):
|
||||
for index, v in enumerate(value):
|
||||
if index in self.value:
|
||||
self.value[index].insert(v, selectors[1:])
|
||||
else:
|
||||
self.value[index] = FlakeCacheEntry(v, selectors[1:])
|
||||
elif isinstance(selector, int):
|
||||
if selector in self.value:
|
||||
self.value[selector].insert(value[0], selectors[1:])
|
||||
else:
|
||||
self.value[selector] = FlakeCacheEntry(value[0], selectors[1:])
|
||||
# to store a list we also use a dict, so we know which indices we have
|
||||
elif isinstance(value, list):
|
||||
self.is_list = True
|
||||
fetched_indices: list[str] = []
|
||||
# if we are in a set, we take all the selectors
|
||||
if selector.type == SelectorType.SET:
|
||||
assert isinstance(selector.value, list)
|
||||
for subselector in selector.value:
|
||||
fetched_indices.append(subselector.value)
|
||||
# if it's just a str, that is the index
|
||||
elif selector.type == SelectorType.STR:
|
||||
assert isinstance(selector.value, str)
|
||||
fetched_indices = [selector.value]
|
||||
# otherwise we just take all the indices, which is the length of the list
|
||||
elif selector.type == SelectorType.ALL:
|
||||
fetched_indices = list(map(str, range(len(value))))
|
||||
|
||||
# insert is the same is insert a dict
|
||||
assert isinstance(self.value, dict)
|
||||
for i, requested_index in enumerate(fetched_indices):
|
||||
assert isinstance(requested_index, str)
|
||||
if requested_index not in self.value:
|
||||
self.value[requested_index] = FlakeCacheEntry()
|
||||
self.value[requested_index].insert(value[i], selectors[1:])
|
||||
|
||||
# strings need to be checked if they are store paths
|
||||
# if they are, we store them as a dict with the outPath key
|
||||
# this is to mirror nix behavior, where the outPath of an attrset is used if no further key is specified
|
||||
elif isinstance(value, str) and value.startswith("/nix/store/"):
|
||||
self.value = {}
|
||||
self.value["outPath"] = FlakeCacheEntry(
|
||||
value, selectors[1:], is_out_path=True
|
||||
)
|
||||
assert selectors == []
|
||||
if value.startswith("/nix/store/"):
|
||||
self.value = {"outPath": FlakeCacheEntry(value)}
|
||||
|
||||
elif isinstance(value, (str | float | int)):
|
||||
if self.value:
|
||||
if self.value != value:
|
||||
msg = "value mismatch in cache, something is fishy"
|
||||
raise TypeError(msg)
|
||||
|
||||
elif value is None:
|
||||
if self.value is not None:
|
||||
msg = "value mismatch in cache, something is fishy"
|
||||
raise TypeError(msg)
|
||||
|
||||
else:
|
||||
msg = f"Cannot insert value of type {type(value)} into cache"
|
||||
# if we have a normal scalar, we check if it conflicts with a maybe already store value
|
||||
# since an empty attrset is the default value, we cannot check that, so we just set it to the value
|
||||
elif isinstance(value, float | int | str) or value is None:
|
||||
assert selectors == []
|
||||
if self.value == {}:
|
||||
self.value = value
|
||||
elif self.value != value:
|
||||
msg = f"Cannot insert {value} into cache, already have {self.value}"
|
||||
raise TypeError(msg)
|
||||
|
||||
def is_cached(self, selectors: list[Selector]) -> bool:
|
||||
selector: Selector
|
||||
if selectors == []:
|
||||
selector = AllSelector()
|
||||
else:
|
||||
return self.fetched_all
|
||||
selector = selectors[0]
|
||||
|
||||
# for store paths we have to check if they still exist, otherwise they have to be rebuild and are thus not cached
|
||||
if isinstance(self.value, str) and self.value.startswith("/nix/store/"):
|
||||
return Path(self.value).exists()
|
||||
|
||||
# if self.value is not dict but we request more selectors, we assume we are cached and an error will be thrown in the select function
|
||||
if isinstance(self.value, str | float | int | None):
|
||||
return selectors == []
|
||||
if isinstance(selector, AllSelector):
|
||||
if isinstance(self.selector, AllSelector):
|
||||
return True
|
||||
|
||||
# we just fetch all subkeys, so we need to check of we inserted all keys at this level before
|
||||
if selector.type == SelectorType.ALL:
|
||||
assert isinstance(self.value, dict)
|
||||
if self.fetched_all:
|
||||
result = all(
|
||||
self.value[sel].is_cached(selectors[1:]) for sel in self.value
|
||||
)
|
||||
return result
|
||||
# TODO: check if we already have all the keys anyway?
|
||||
return False
|
||||
if (
|
||||
isinstance(selector, set)
|
||||
and isinstance(self.selector, set)
|
||||
selector.type == SelectorType.SET
|
||||
and isinstance(selector.value, list)
|
||||
and isinstance(self.value, dict)
|
||||
):
|
||||
if not selector.issubset(self.selector):
|
||||
for requested_selector in selector.value:
|
||||
val = requested_selector.value
|
||||
if val not in self.value:
|
||||
# if we fetched all keys and we are not in the dict, we can assume we are cached
|
||||
return self.fetched_all
|
||||
# if a key does not exist from a previous fetch, we can assume it is cached
|
||||
if self.value[val].exists is False:
|
||||
return True
|
||||
if not self.value[val].is_cached(selectors[1:]):
|
||||
return False
|
||||
|
||||
result = all(
|
||||
self.value[sel].is_cached(selectors[1:]) if sel in self.value else True
|
||||
for sel in selector
|
||||
)
|
||||
return True
|
||||
|
||||
return result
|
||||
if isinstance(selector, str | int) and isinstance(self.value, dict):
|
||||
if selector in self.value:
|
||||
result = self.value[selector].is_cached(selectors[1:])
|
||||
return result
|
||||
return False
|
||||
# string and maybe work the same for cache checking
|
||||
if (
|
||||
selector.type == SelectorType.STR or selector.type == SelectorType.MAYBE
|
||||
) and isinstance(self.value, dict):
|
||||
assert isinstance(selector.value, str)
|
||||
val = selector.value
|
||||
if val not in self.value:
|
||||
# if we fetched all keys and we are not in there, refetching won't help, so we can assume we are cached
|
||||
return self.fetched_all
|
||||
if self.value[val].exists is False:
|
||||
return True
|
||||
return self.value[val].is_cached(selectors[1:])
|
||||
|
||||
return False
|
||||
|
||||
def select(self, selectors: list[Selector]) -> Any:
|
||||
selector: Selector
|
||||
if selectors == []:
|
||||
selector = AllSelector()
|
||||
selector = Selector(type=SelectorType.ALL)
|
||||
else:
|
||||
selector = selectors[0]
|
||||
|
||||
# mirror nix behavior where we return outPath if no further selector is specified
|
||||
if selectors == [] and isinstance(self.value, dict) and "outPath" in self.value:
|
||||
return self.value["outPath"].value
|
||||
|
||||
if isinstance(self.value, str | float | int | None):
|
||||
# if we are at the end of the selector chain, we return the value
|
||||
if selectors == [] and isinstance(self.value, str | float | int | None):
|
||||
return self.value
|
||||
if isinstance(self.value, dict):
|
||||
if isinstance(selector, AllSelector):
|
||||
return {k: v.select(selectors[1:]) for k, v in self.value.items()}
|
||||
if isinstance(selector, set):
|
||||
|
||||
# if we fetch a specific key, we return the recurse into that value in the dict
|
||||
if selector.type == SelectorType.STR and isinstance(self.value, dict):
|
||||
assert isinstance(selector.value, str)
|
||||
return self.value[selector.value].select(selectors[1:])
|
||||
|
||||
# if we are a MAYBE selector, we check if the key exists in the dict
|
||||
if selector.type == SelectorType.MAYBE and isinstance(self.value, dict):
|
||||
assert isinstance(selector.value, str)
|
||||
if selector.value in self.value:
|
||||
if self.value[selector.value].exists:
|
||||
return {
|
||||
k: v.select(selectors[1:])
|
||||
for k, v in self.value.items()
|
||||
if k in selector
|
||||
selector.value: self.value[selector.value].select(selectors[1:])
|
||||
}
|
||||
if isinstance(selector, str | int):
|
||||
return self.value[selector].select(selectors[1:])
|
||||
msg = f"Cannot select {selector} from type {type(self.value)}"
|
||||
raise TypeError(msg)
|
||||
return {}
|
||||
if self.fetched_all:
|
||||
return {}
|
||||
|
||||
# otherwise we return a list or a dict
|
||||
if isinstance(self.value, dict):
|
||||
keys_to_select: list[str] = []
|
||||
# if we want to select all keys, we take all existing sub elements
|
||||
if selector.type == SelectorType.ALL:
|
||||
for key in self.value:
|
||||
if self.value[key].exists:
|
||||
keys_to_select.append(key)
|
||||
|
||||
# if we want to select a set of keys, we take the keys from the selector
|
||||
if selector.type == SelectorType.SET:
|
||||
assert isinstance(selector.value, list)
|
||||
for subselector in selector.value:
|
||||
# make sure the keys actually exist if we have a maybe selector
|
||||
if subselector.type == SetSelectorType.MAYBE:
|
||||
if (
|
||||
subselector.value in self.value
|
||||
and self.value[subselector.value].exists
|
||||
):
|
||||
keys_to_select.append(subselector.value)
|
||||
else:
|
||||
keys_to_select.append(subselector.value)
|
||||
|
||||
# if we are a list, return a list
|
||||
if self.is_list:
|
||||
result = []
|
||||
for index in keys_to_select:
|
||||
result.append(self.value[index].select(selectors[1:]))
|
||||
return result
|
||||
|
||||
# otherwise return a dict
|
||||
return {k: self.value[k].select(selectors[1:]) for k in keys_to_select}
|
||||
|
||||
# return a KeyError if we cannot fetch the key
|
||||
str_selector: str
|
||||
if selector.type == SelectorType.ALL:
|
||||
str_selector = "*"
|
||||
elif selector.type == SelectorType.SET:
|
||||
subselectors: list[str] = []
|
||||
assert isinstance(selector.value, list)
|
||||
for subselector in selector.value:
|
||||
subselectors.append(subselector.value)
|
||||
str_selector = "{" + ",".join(subselectors) + "}"
|
||||
else:
|
||||
assert isinstance(selector.value, str)
|
||||
str_selector = selector.value
|
||||
|
||||
raise KeyError(str_selector)
|
||||
|
||||
def __getitem__(self, name: str) -> "FlakeCacheEntry":
|
||||
if isinstance(self.value, dict):
|
||||
return self.value[name]
|
||||
msg = f"value is a {type(self.value)}, so cannot subscribe"
|
||||
raise TypeError(msg)
|
||||
raise KeyError(name)
|
||||
|
||||
def as_json(self) -> dict[str, Any]:
|
||||
json_data: Any = {}
|
||||
@@ -338,21 +493,13 @@ class FlakeCacheEntry:
|
||||
else: # == str | float | None
|
||||
json_data["value"] = self.value
|
||||
|
||||
if isinstance(self.selector, AllSelector):
|
||||
json_data["selector"] = "all-selector"
|
||||
else: # == set[int] | set[str]
|
||||
json_data["selector"] = list(self.selector)
|
||||
json_data["is_list"] = self.is_list
|
||||
json_data["exists"] = self.exists
|
||||
json_data["fetched_all"] = self.fetched_all
|
||||
return json_data
|
||||
|
||||
@staticmethod
|
||||
def from_json(json_data: dict[str, Any]) -> "FlakeCacheEntry":
|
||||
raw_selector = json_data.get("selector")
|
||||
if raw_selector == "all-selector":
|
||||
selector: Any = AllSelector()
|
||||
else: # == set[int] | set[str]
|
||||
assert isinstance(raw_selector, list)
|
||||
selector = set(raw_selector)
|
||||
|
||||
raw_value = json_data.get("value")
|
||||
if isinstance(raw_value, dict):
|
||||
value: Any = {}
|
||||
@@ -361,9 +508,13 @@ class FlakeCacheEntry:
|
||||
else: # == str | float | None
|
||||
value = raw_value
|
||||
|
||||
entry = FlakeCacheEntry(None, [], is_out_path=False)
|
||||
entry.selector = selector
|
||||
entry.value = value
|
||||
is_list = json_data.get("is_list", False)
|
||||
exists = json_data.get("exists", True)
|
||||
fetched_all = json_data.get("fetched_all", False)
|
||||
|
||||
entry = FlakeCacheEntry(
|
||||
value=value, is_list=is_list, exists=exists, fetched_all=fetched_all
|
||||
)
|
||||
return entry
|
||||
|
||||
def __repr__(self) -> str:
|
||||
@@ -379,22 +530,22 @@ class FlakeCache:
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.cache: FlakeCacheEntry = FlakeCacheEntry({}, [])
|
||||
self.cache: FlakeCacheEntry = FlakeCacheEntry()
|
||||
|
||||
def insert(self, data: dict[str, Any], selector_str: str) -> None:
|
||||
if selector_str:
|
||||
selectors = split_selector(selector_str)
|
||||
selectors = parse_selector(selector_str)
|
||||
else:
|
||||
selectors = []
|
||||
|
||||
self.cache.insert(data, selectors)
|
||||
|
||||
def select(self, selector_str: str) -> Any:
|
||||
selectors = split_selector(selector_str)
|
||||
selectors = parse_selector(selector_str)
|
||||
return self.cache.select(selectors)
|
||||
|
||||
def is_cached(self, selector_str: str) -> bool:
|
||||
selectors = split_selector(selector_str)
|
||||
selectors = parse_selector(selector_str)
|
||||
return self.cache.is_cached(selectors)
|
||||
|
||||
def save_to_file(self, path: Path) -> None:
|
||||
@@ -544,13 +695,17 @@ class Flake:
|
||||
if nix_options is None:
|
||||
nix_options = []
|
||||
|
||||
str_selectors: list[str] = []
|
||||
for selector in selectors:
|
||||
str_selectors.append(selectors_as_json(parse_selector(selector)))
|
||||
|
||||
config = nix_config()
|
||||
nix_code = f"""
|
||||
let
|
||||
flake = builtins.getFlake("path:{self.store_path}?narHash={self.hash}");
|
||||
in
|
||||
flake.inputs.nixpkgs.legacyPackages.{config["system"]}.writeText "clan-flake-select" (
|
||||
builtins.toJSON [ {" ".join([f"(flake.clanInternals.clanLib.select ''{attr}'' flake)" for attr in selectors])} ]
|
||||
builtins.toJSON [ {" ".join([f"(flake.clanInternals.clanLib.select.applySelectors (builtins.fromJSON ''{attr}'') flake)" for attr in str_selectors])} ]
|
||||
)
|
||||
"""
|
||||
if tmp_store := nix_test_store():
|
||||
|
||||
@@ -1,35 +1,282 @@
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from clan_cli.flake import Flake, FlakeCache, FlakeCacheEntry
|
||||
from clan_cli.flake import (
|
||||
Flake,
|
||||
FlakeCache,
|
||||
FlakeCacheEntry,
|
||||
parse_selector,
|
||||
selectors_as_dict,
|
||||
)
|
||||
from clan_cli.tests.fixtures_flakes import ClanFlake
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_parse_selector() -> None:
|
||||
selectors = parse_selector("x")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
]
|
||||
selectors = parse_selector("?x")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "maybe", "value": "x"},
|
||||
]
|
||||
selectors = parse_selector('"x"')
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
]
|
||||
selectors = parse_selector("*")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "all"},
|
||||
]
|
||||
selectors = parse_selector("{x}")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{
|
||||
"type": "set",
|
||||
"value": [
|
||||
{"type": "str", "value": "x"},
|
||||
],
|
||||
},
|
||||
]
|
||||
selectors = parse_selector("{x}.y")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{
|
||||
"type": "set",
|
||||
"value": [
|
||||
{"type": "str", "value": "x"},
|
||||
],
|
||||
},
|
||||
{"type": "str", "value": "y"},
|
||||
]
|
||||
selectors = parse_selector("x.y.z")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
{"type": "str", "value": "y"},
|
||||
{"type": "str", "value": "z"},
|
||||
]
|
||||
selectors = parse_selector("x.*")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
{"type": "all"},
|
||||
]
|
||||
selectors = parse_selector("*.x")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "all"},
|
||||
{"type": "str", "value": "x"},
|
||||
]
|
||||
selectors = parse_selector("x.*.z")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
{"type": "all"},
|
||||
{"type": "str", "value": "z"},
|
||||
]
|
||||
selectors = parse_selector("x.{y,z}")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
{
|
||||
"type": "set",
|
||||
"value": [
|
||||
{"type": "str", "value": "y"},
|
||||
{"type": "str", "value": "z"},
|
||||
],
|
||||
},
|
||||
]
|
||||
selectors = parse_selector("x.?zzz.{y,?z,x,*}")
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
{"type": "maybe", "value": "zzz"},
|
||||
{
|
||||
"type": "set",
|
||||
"value": [
|
||||
{"type": "str", "value": "y"},
|
||||
{"type": "maybe", "value": "z"},
|
||||
{"type": "str", "value": "x"},
|
||||
{"type": "str", "value": "*"},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
selectors = parse_selector('x."?zzz".?zzz\\.asd..{y,\\?z,"x,",*}')
|
||||
assert selectors_as_dict(selectors) == [
|
||||
{"type": "str", "value": "x"},
|
||||
{"type": "str", "value": "?zzz"},
|
||||
{"type": "maybe", "value": "zzz.asd"},
|
||||
{"type": "str", "value": ""},
|
||||
{
|
||||
"type": "set",
|
||||
"value": [
|
||||
{"type": "str", "value": "y"},
|
||||
{"type": "str", "value": "?z"},
|
||||
{"type": "str", "value": "x,"},
|
||||
{"type": "str", "value": "*"},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def test_insert_and_iscached() -> None:
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.y.z")
|
||||
test_cache.insert("x", selectors)
|
||||
assert test_cache["x"]["y"]["z"].value == "x"
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert not test_cache.is_cached(parse_selector("x.y"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert not test_cache.is_cached(parse_selector("x.*.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.z"))
|
||||
assert not test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.*.z")
|
||||
test_cache.insert({"y": "x"}, selectors)
|
||||
assert test_cache["x"]["y"]["z"].value == "x"
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert not test_cache.is_cached(parse_selector("x.y"))
|
||||
assert not test_cache.is_cached(parse_selector("x.y.x"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y,z}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y,?z}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.{y}.z")
|
||||
test_cache.insert({"y": "x"}, selectors)
|
||||
assert test_cache["x"]["y"]["z"].value == "x"
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert not test_cache.is_cached(parse_selector("x.y"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert not test_cache.is_cached(parse_selector("x.*.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.z"))
|
||||
assert not test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.?y.z")
|
||||
test_cache.insert({"y": "x"}, selectors)
|
||||
assert test_cache["x"]["y"]["z"].value == "x"
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert not test_cache.is_cached(parse_selector("x.y"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert not test_cache.is_cached(parse_selector("x.*.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.z"))
|
||||
assert not test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.?y.z")
|
||||
test_cache.insert({}, selectors)
|
||||
assert test_cache["x"]["y"].exists is False
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert test_cache.is_cached(parse_selector("x.y"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.z.1"))
|
||||
assert not test_cache.is_cached(parse_selector("x.*.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.abc"))
|
||||
assert not test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.{y,z}.z")
|
||||
test_cache.insert({"y": 1, "z": 2}, selectors)
|
||||
assert test_cache["x"]["y"]["z"].value == 1
|
||||
assert test_cache["x"]["z"]["z"].value == 2
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert not test_cache.is_cached(parse_selector("x.y"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert not test_cache.is_cached(parse_selector("x.*.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert not test_cache.is_cached(parse_selector("x.?y.abc"))
|
||||
assert test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.y")
|
||||
test_cache.insert(1, selectors)
|
||||
selectors = parse_selector("x.z")
|
||||
test_cache.insert(2, selectors)
|
||||
assert test_cache["x"]["y"].value == 1
|
||||
assert test_cache["x"]["z"].value == 2
|
||||
assert test_cache.is_cached(parse_selector("x.y"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.1"))
|
||||
assert not test_cache.is_cached(parse_selector("x.*.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.{y}.z"))
|
||||
assert test_cache.is_cached(parse_selector("x.?y.abc"))
|
||||
assert test_cache.is_cached(parse_selector("x.?z.z"))
|
||||
assert not test_cache.is_cached(parse_selector("x.?x.z"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.y.z")
|
||||
test_cache.insert({"a": {"b": {"c": 1}}}, selectors)
|
||||
assert test_cache.is_cached(selectors)
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.a.b.c"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.a.b"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.a"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z"))
|
||||
assert not test_cache.is_cached(parse_selector("x.y"))
|
||||
assert not test_cache.is_cached(parse_selector("x"))
|
||||
assert test_cache.is_cached(parse_selector("x.y.z.xxx"))
|
||||
|
||||
test_cache = FlakeCacheEntry()
|
||||
selectors = parse_selector("x.y")
|
||||
test_cache.insert(1, selectors)
|
||||
with pytest.raises(TypeError):
|
||||
test_cache.insert(2, selectors)
|
||||
assert test_cache["x"]["y"].value == 1
|
||||
|
||||
|
||||
def test_select() -> None:
|
||||
test_cache = FlakeCacheEntry()
|
||||
|
||||
test_cache.insert("bla", parse_selector("a.b.c"))
|
||||
assert test_cache.select(parse_selector("a.b.c")) == "bla"
|
||||
assert test_cache.select(parse_selector("a.b")) == {"c": "bla"}
|
||||
assert test_cache.select(parse_selector("a")) == {"b": {"c": "bla"}}
|
||||
assert test_cache.select(parse_selector("a.b.?c")) == {"c": "bla"}
|
||||
assert test_cache.select(parse_selector("a.?b.?c")) == {"b": {"c": "bla"}}
|
||||
assert test_cache.select(parse_selector("a.?c")) == {}
|
||||
assert test_cache.select(parse_selector("a.?x.c")) == {}
|
||||
assert test_cache.select(parse_selector("a.*")) == {"b": {"c": "bla"}}
|
||||
assert test_cache.select(parse_selector("a.*.*")) == {"b": {"c": "bla"}}
|
||||
assert test_cache.select(parse_selector("a.*.c")) == {"b": "bla"}
|
||||
assert test_cache.select(parse_selector("a.b.*")) == {"c": "bla"}
|
||||
assert test_cache.select(parse_selector("a.{b}.c")) == {"b": "bla"}
|
||||
assert test_cache.select(parse_selector("a.{b}.{c}")) == {"b": {"c": "bla"}}
|
||||
assert test_cache.select(parse_selector("a.b.{c}")) == {"c": "bla"}
|
||||
assert test_cache.select(parse_selector("a.{?b}.c")) == {"b": "bla"}
|
||||
assert test_cache.select(parse_selector("a.{?b,?x}.c")) == {"b": "bla"}
|
||||
with pytest.raises(KeyError):
|
||||
test_cache.select(parse_selector("a.b.x"))
|
||||
with pytest.raises(KeyError):
|
||||
test_cache.select(parse_selector("a.b.c.x"))
|
||||
with pytest.raises(KeyError):
|
||||
test_cache.select(parse_selector("a.{b,x}.c"))
|
||||
|
||||
testdict = {"x": {"y": [123, 345, 456], "z": "bla"}}
|
||||
test_cache = FlakeCacheEntry(testdict, [])
|
||||
assert test_cache["x"]["z"].value == "bla"
|
||||
assert test_cache.is_cached(["x", "z"])
|
||||
assert not test_cache.is_cached(["x", "y", "z"])
|
||||
assert test_cache.select(["x", "y", 0]) == 123
|
||||
assert not test_cache.is_cached(["x", "z", 1])
|
||||
|
||||
|
||||
def test_insert() -> None:
|
||||
test_cache = FlakeCacheEntry({}, [])
|
||||
# Inserting the same thing twice should succeed
|
||||
test_cache.insert(None, ["nix"])
|
||||
test_cache.insert(None, ["nix"])
|
||||
assert test_cache.select(["nix"]) is None
|
||||
test_cache.insert(testdict, parse_selector("testdict"))
|
||||
assert test_cache["testdict"]["x"]["z"].value == "bla"
|
||||
selectors = parse_selector("testdict.x.z")
|
||||
assert test_cache.select(selectors) == "bla"
|
||||
selectors = parse_selector("testdict.x.z.z")
|
||||
with pytest.raises(KeyError):
|
||||
test_cache.select(selectors)
|
||||
selectors = parse_selector("testdict.x.y.0")
|
||||
assert test_cache.select(selectors) == 123
|
||||
selectors = parse_selector("testdict.x.z.1")
|
||||
with pytest.raises(KeyError):
|
||||
test_cache.select(selectors)
|
||||
|
||||
|
||||
def test_out_path() -> None:
|
||||
testdict = {"x": {"y": [123, 345, 456], "z": "/nix/store/bla"}}
|
||||
test_cache = FlakeCacheEntry(testdict, [])
|
||||
assert test_cache.select(["x", "z"]) == "/nix/store/bla"
|
||||
assert test_cache.select(["x", "z", "outPath"]) == "/nix/store/bla"
|
||||
test_cache = FlakeCacheEntry()
|
||||
test_cache.insert(testdict, [])
|
||||
selectors = parse_selector("x.z")
|
||||
assert test_cache.select(selectors) == "/nix/store/bla"
|
||||
selectors = parse_selector("x.z.outPath")
|
||||
assert test_cache.select(selectors) == "/nix/store/bla"
|
||||
|
||||
|
||||
@pytest.mark.with_core
|
||||
@@ -85,10 +332,10 @@ def test_conditional_all_selector(flake: ClanFlake) -> None:
|
||||
assert isinstance(flake1._cache, FlakeCache) # noqa: SLF001
|
||||
assert isinstance(flake2._cache, FlakeCache) # noqa: SLF001
|
||||
log.info("First select")
|
||||
res1 = flake1.select("inputs.*.{clan,missing}")
|
||||
res1 = flake1.select("inputs.*.{?clan,?missing}")
|
||||
|
||||
log.info("Second (cached) select")
|
||||
res2 = flake1.select("inputs.*.{clan,missing}")
|
||||
res2 = flake1.select("inputs.*.{?clan,?missing}")
|
||||
|
||||
assert res1 == res2
|
||||
assert res1["clan-core"].get("clan") is not None
|
||||
|
||||
Reference in New Issue
Block a user