diff --git a/pyproject.toml b/pyproject.toml index 54041d85..def37e64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ markers = [ "local: tests of functionality which do not involve a server or writing to an offline cache file", "object_retrieval: tests relating to retrieval of objects from the server", "object_removal: tests relating to removal of objects from the server", + "filters: tests of filter objects", ] [tool.interrogate] diff --git a/simvue/api/objects/base.py b/simvue/api/objects/base.py index 30bf85e8..d258827f 100644 --- a/simvue/api/objects/base.py +++ b/simvue/api/objects/base.py @@ -33,9 +33,9 @@ from simvue.api.url import URL try: - from typing import Self + from typing import Self, override except ImportError: - from typing_extensions import Self + from typing_extensions import Self, override # noqa: UP035 # Need to use this inside of Generator typing to fix bug present in Python 3.10 - see issue #745 T = typing.TypeVar("T", bound="SimvueObject") @@ -806,18 +806,25 @@ def staged(self) -> dict[str, typing.Any] | None: """ return self._staging or None + @override def __str__(self) -> str: """String representation of Simvue object.""" return f"{self.__class__.__name__}({self.id=})" + @override def __repr__(self) -> str: _out_str = f"{self.__class__.__module__}.{self.__class__.__qualname__}(" _property_values: list[str] = [] + _property_warn_list: list[str] = [] for property in self._properties: try: _value = getattr(self, property) - except KeyError: + except (KeyError, Exception): + # Display a warning only once if a property could not be retrieved + if property not in _property_warn_list: + self._logger.warning(f"Failed to retrieve property '{property}'") + _property_warn_list.append(property) continue if isinstance(_value, types.GeneratorType): diff --git a/simvue/api/objects/filter.py b/simvue/api/objects/filter.py deleted file mode 100644 index aed6d795..00000000 --- a/simvue/api/objects/filter.py +++ /dev/null @@ -1,331 +0,0 @@ -"""Simvue Object Filters. - -Provides an interface for the creation and use of filters when retrieving -objects from the Simvue server. -""" - -import abc -from collections.abc import Generator -import enum -import json -import typing - -import pydantic - -try: - from typing import Self -except ImportError: - from typing_extensions import Self - - -if typing.TYPE_CHECKING: - from .base import SimvueObject - - -class Status(str, enum.Enum): - """Status of run.""" - - Created = "created" - Running = "running" - Completed = "completed" - Lost = "lost" - Terminated = "terminated" - Failed = "failed" - - -class Time(str, enum.Enum): - """Run stage.""" - - Created = "created" - Started = "started" - Modified = "modified" - Ended = "ended" - - -class System(str, enum.Enum): - """System metadata filtering.""" - - Working_Directory = "cwd" - Hostname = "hostname" - Python_Version = "pythonversion" - Platform_System = "platform.system" - Platform_Release = "platform.release" - Platform_Version = "platform.version" - CPU_Architecture = "cpu.arch" - CPU_Processor = "cpu.processor" - GPU_Name = "gpu.name" - GPU_Driver = "gpu.driver" - - -class RestAPIFilter(abc.ABC): - """RestAPI query filter object.""" - - def __init__(self, simvue_object: "type[SimvueObject] | None" = None) -> None: - """Initialise a query object using a Simvue object class.""" - self._sv_object: "type[SimvueObject] | None" = simvue_object - self._filters: list[str] = [] - self._generate_members() - - def _time_within( - self, time_type: Time, *, hours: int = 0, days: int = 0, years: int = 0 - ) -> Self: - """Define filter using time range.""" - if len(_non_zero := list(i for i in (hours, days, years) if i != 0)) > 1: - raise AssertionError( - "Only one duration type may be provided: hours, days or years" - ) - if len(_non_zero) < 1: - raise AssertionError( - f"No duration provided for filter '{time_type.value}_within'" - ) - - if hours: - self._filters.append(f"{time_type.value} < {hours}h") - elif days: - self._filters.append(f"{time_type.value} < {days}d") - else: - self._filters.append(f"{time_type.value} < {years}y") - return self - - @abc.abstractmethod - def _generate_members(self) -> None: - """Generate filters using specified definitions.""" - - def has_name(self, name: str) -> Self: - """Filter based on absolute object name.""" - self._filters.append(f"name == {name}") - return self - - def has_name_containing(self, name: str) -> Self: - """Filter base on object name containing a term.""" - self._filters.append(f"name contains {name}") - return self - - def created_within(self, *, hours: int = 0, days: int = 0, years: int = 0) -> Self: - """Find objects created within the last specified time period.""" - return self._time_within(Time.Created, hours=hours, days=days, years=years) - - def has_description_containing(self, search_str: str) -> Self: - """Return objects containing the specified term within the description.""" - self._filters.append(f"description contains {search_str}") - return self - - def exclude_description_containing(self, search_str: str) -> Self: - """Find objects not containing the specified term in their description.""" - self._filters.append(f"description not contains {search_str}") - return self - - def has_tag(self, tag: str) -> Self: - """Find objects with the given tag.""" - self._filters.append(f"has tag.{tag}") - return self - - def starred(self) -> Self: - self._filters.append("starred") - return self - - def as_list(self) -> list[str]: - """Returns the filters as a list.""" - return self._filters - - def clear(self) -> None: - """Clear all current filters.""" - self._filters = [] - - def get( - self, - count: pydantic.PositiveInt | None = None, - offset: pydantic.NonNegativeInt | None = None, - **kwargs, - ) -> Generator[tuple[str, "SimvueObject | None"], None, None]: - """Call the get method from the simvue object class.""" - if not self._sv_object: - raise RuntimeError("No object type associated with filter.") - _filters: str = json.dumps(self._filters) - return self._sv_object.get( - count=count, offset=offset, filters=_filters, **kwargs - ) - - def count(self, **kwargs) -> int: - """Return object count.""" - if not self._sv_object: - raise RuntimeError("No object type associated with filter.") - _ = kwargs.pop("count", None) - _filters: str = json.dumps(self._filters) - return self._sv_object.count(filters=_filters, **kwargs) - - -class FoldersFilter(RestAPIFilter): - """Filter for Folders.""" - - def has_path(self, name: str) -> "FoldersFilter": - """Check if a folder has the given path.""" - self._filters.append(f"path == {name}") - return self - - def has_path_containing(self, name: str) -> "FoldersFilter": - """Check if the folder path contains a search term.""" - self._filters.append(f"path contains {name}") - return self - - def _generate_members(self) -> None: - return super()._generate_members() - - -class RunsFilter(RestAPIFilter): - """Filter for Runs.""" - - def _generate_members(self) -> None: - _global_comparators = [self._value_contains, self._value_eq, self._value_neq] - - _numeric_comparators = [ - self._value_geq, - self._value_leq, - self._value_lt, - self._value_gt, - ] - - for label, system_spec in System.__members__.items(): - for function in _global_comparators: - _label: str = label.lower() - _func_name: str = function.__name__.replace("_value", _label) - - def _out_func(value: str | int | float, func=function) -> Self: - return func("system", system_spec.value, value) - - _out_func.__name__ = _func_name - setattr(self, _func_name, _out_func) - - for function in _global_comparators + _numeric_comparators: - _func_name = function.__name__.replace("_value", "metadata") - - def _out_func( - attribute: str, value: str | int | float, func=function - ) -> Self: - return func("metadata", attribute, value) - - _out_func.__name__ = _func_name - setattr(self, _func_name, _out_func) - - def owner(self, username: str = "self") -> "RunsFilter": - """Filter by run owner.""" - self._filters.append(f"user == {username}") - return self - - def exclude_owner(self, username: str = "self") -> "RunsFilter": - """Veto by run owner.""" - self._filters.append(f"user != {username}") - return self - - def has_status(self, status: Status) -> "RunsFilter": - """Filter by run status.""" - self._filters.append(f"status == {status.value}") - return self - - def is_running(self) -> "RunsFilter": - """Filter by if run is running.""" - return self.has_status(Status.Running) - - def is_lost(self) -> "RunsFilter": - """Filter by if run is lost.""" - return self.has_status(Status.Lost) - - def has_completed(self) -> "RunsFilter": - """Filter by if run has completed.""" - return self.has_status(Status.Completed) - - def has_failed(self) -> "RunsFilter": - """Filter by if run has failed.""" - return self.has_status(Status.Failed) - - def has_alert( - self, alert_name: str, is_critical: bool | None = None - ) -> "RunsFilter": - """Filter by if run has a given alert.""" - self._filters.append(f"alert.name == {alert_name}") - if is_critical is True: - self._filters.append("alert.status == critical") - elif is_critical is False: - self._filters.append("alert.status == ok") - return self - - def started_within( - self, *, hours: int = 0, days: int = 0, years: int = 0 - ) -> "RunsFilter": - """Filter by run start time interval.""" - return self._time_within(Time.Started, hours=hours, days=days, years=years) - - def modified_within( - self, *, hours: int = 0, days: int = 0, years: int = 0 - ) -> "RunsFilter": - """Filter by run modified time interval.""" - return self._time_within(Time.Modified, hours=hours, days=days, years=years) - - def ended_within( - self, *, hours: int = 0, days: int = 0, years: int = 0 - ) -> "RunsFilter": - """Filter by run end time interval.""" - return self._time_within(Time.Ended, hours=hours, days=days, years=years) - - def in_folder(self, folder_name: str) -> "RunsFilter": - """Filter by whether run is within the given folder.""" - self._filters.append(f"folder.path == {folder_name}") - return self - - def has_metadata_attribute(self, attribute: str) -> "RunsFilter": - """Filter by whether run has the given metadata attribute.""" - self._filters.append(f"metadata.{attribute} exists") - return self - - def exclude_metadata_attribute(self, attribute: str) -> "RunsFilter": - """Veto by whether run has the given metadata attribute.""" - self._filters.append(f"metadata.{attribute} not exists") - return self - - def _value_eq( - self, category: str, attribute: str, value: str | int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} == {value}") - return self - - def _value_neq( - self, category: str, attribute: str, value: str | int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} != {value}") - return self - - def _value_contains( - self, category: str, attribute: str, value: str | int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} contains {value}") - return self - - def _value_leq( - self, category: str, attribute: str, value: int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} <= {value}") - return self - - def _value_geq( - self, category: str, attribute: str, value: int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} >= {value}") - return self - - def _value_lt( - self, category: str, attribute: str, value: int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} < {value}") - return self - - def _value_gt( - self, category: str, attribute: str, value: int | float - ) -> "RunsFilter": - self._filters.append(f"{category}.{attribute} > {value}") - return self - - def __str__(self) -> str: - return " && ".join(self._filters) if self._filters else "None" - - def __repr__(self) -> str: - return f"{super().__repr__()[:-1]}, filters={self._filters}>" diff --git a/simvue/api/objects/filter/__init__.py b/simvue/api/objects/filter/__init__.py new file mode 100644 index 00000000..f14b097c --- /dev/null +++ b/simvue/api/objects/filter/__init__.py @@ -0,0 +1,6 @@ +"""Simvue server object filters.""" + +from .folder import FoldersFilter +from .run import RunsFilter, Status + +__all__ = ["FoldersFilter", "RunsFilter", "Status"] diff --git a/simvue/api/objects/filter/base.py b/simvue/api/objects/filter/base.py new file mode 100644 index 00000000..2fa8c891 --- /dev/null +++ b/simvue/api/objects/filter/base.py @@ -0,0 +1,193 @@ +"""Base Filter object for RestAPI queries.""" + +import abc +from collections.abc import Generator +import typing +import enum +import json +import pydantic as pyd + +from simvue.utilities import prettify_pydantic + +if typing.TYPE_CHECKING: + from simvue.api.objects.base import SimvueObject + +try: + from typing import Self +except ImportError: + from typing_extensions import Self # noqa: UP035 + + +class Time(str, enum.Enum): + """Run stage.""" + + Created = "created" + Started = "started" + Modified = "modified" + Ended = "ended" + + +class RestAPIFilter(abc.ABC): + """RestAPI query filter object.""" + + def __init__(self, simvue_object: "type[SimvueObject] | None" = None) -> None: + """Initialise a query object using a Simvue object class.""" + self._sv_object: "type[SimvueObject] | None" = simvue_object + self._filters: list[str] = [] + + def _time_within( + self, time_type: Time, *, hours: int = 0, days: int = 0, years: int = 0 + ) -> Self: + """Define filter using time range.""" + if len(_non_zero := list(i for i in (hours, days, years) if i != 0)) > 1: + raise AssertionError( + "Only one duration type may be provided: hours, days or years" + ) + if len(_non_zero) < 1: + raise AssertionError( + f"No duration provided for filter '{time_type.value}_within'" + ) + + if hours: + self._filters.append(f"{time_type.value} < {hours}h") + elif days: + self._filters.append(f"{time_type.value} < {days}d") + else: + self._filters.append(f"{time_type.value} < {years}y") + return self + + @prettify_pydantic + @pyd.validate_call + def created_within( + self, + *, + hours: pyd.NonNegativeInt = 0, + days: pyd.NonNegativeInt = 0, + years: pyd.NonNegativeInt = 0, + ) -> Self: + """Find objects created within the last specified time period.""" + return self._time_within(Time.Created, hours=hours, days=days, years=years) + + @prettify_pydantic + @pyd.validate_call + def has_description_containing(self, search_str: str) -> Self: + """Return objects containing the specified term within the description.""" + self._filters.append(f"description contains {search_str}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_description_containing(self, search_str: str) -> Self: + """Find objects not containing the specified term in their description.""" + self._filters.append(f"description not contains {search_str}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_tag(self, tag: str) -> Self: + """Find objects with the given tag.""" + self._filters.append(f"has tag.{tag}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_tag(self, tag: str) -> Self: + """Find objects with the given tag.""" + self._filters.append(f"does not have tag.{tag}") + return self + + def starred(self) -> Self: + self._filters.append("starred") + return self + + def as_list(self) -> list[str]: + """Returns the filters as a list.""" + return self._filters + + def clear(self) -> None: + """Clear all current filters.""" + self._filters = [] + + @prettify_pydantic + @pyd.validate_call + def has_metadata_attribute(self, attribute: str) -> Self: + """Filter by whether run has the given metadata attribute.""" + self._filters.append(f"metadata.{attribute} exists") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_metadata_attribute(self, attribute: str) -> Self: + """Veto by whether run has the given metadata attribute.""" + self._filters.append(f"metadata.{attribute} not exists") + return self + + @prettify_pydantic + @pyd.validate_call + def has_metadata_value(self, attribute: str, value: str | float | int) -> Self: + """Filter by the value of a metadata attribute.""" + self._filters.append(f"metadata.{attribute} == {value}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_metadata_value(self, attribute: str, value: str | float | int) -> Self: + """Veto by the value of a metadata attribute.""" + self._filters.append(f"metadata.{attribute} != {value}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_metadata_value_greater_than( + self, attribute: str, value: float | int + ) -> Self: + """Filter by the value of a metadata value threshold.""" + self._filters.append(f"metadata.{attribute} > {value}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_metadata_value_less_than(self, attribute: str, value: float | int) -> Self: + """Filter by the value of a metadata value threshold.""" + self._filters.append(f"metadata.{attribute} < {value}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_metadata_value_greater_than_or_equal_to( + self, attribute: str, value: float | int + ) -> Self: + """Filter by the value of a metadata value threshold.""" + self._filters.append(f"metadata.{attribute} >= {value}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_metadata_value_less_than_or_equal_to( + self, attribute: str, value: float | int + ) -> Self: + """Filter by the value of a metadata value threshold.""" + self._filters.append(f"metadata.{attribute} <= {value}") + return self + + def get( + self, + count: pyd.PositiveInt | None = None, + offset: pyd.NonNegativeInt | None = None, + **kwargs, + ) -> Generator[tuple[str, "SimvueObject | None"]]: + """Call the get method from the simvue object class.""" + if not self._sv_object: + raise RuntimeError("No object type associated with filter.") + _filters: str = json.dumps(self._filters) + return self._sv_object.get( + count=count, offset=offset, filters=_filters, **kwargs + ) + + def count(self, **kwargs) -> int: + """Return object count.""" + if not self._sv_object: + raise RuntimeError("No object type associated with filter.") + _ = kwargs.pop("count", None) + _filters: str = json.dumps(self._filters) + return self._sv_object.count(filters=_filters, **kwargs) diff --git a/simvue/api/objects/filter/folder.py b/simvue/api/objects/filter/folder.py new file mode 100644 index 00000000..a1ff94e2 --- /dev/null +++ b/simvue/api/objects/filter/folder.py @@ -0,0 +1,33 @@ +"""Simvue RestAPI Folders Filter.""" + +import typing +import pydantic as pyd + +try: + from typing import Self +except ImportError: + from typing_extensions import Self # noqa: UP035 + +from simvue.models import FOLDER_REGEX +from simvue.utilities import prettify_pydantic +from .base import RestAPIFilter + + +class FoldersFilter(RestAPIFilter): + """Filter for Folders.""" + + @prettify_pydantic + @pyd.validate_call + def has_path( + self, name: typing.Annotated[str, pyd.Field(pattern=FOLDER_REGEX)] + ) -> Self: + """Check if a folder has the given path.""" + self._filters.append(f"path == {name}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_path_containing(self, name: str) -> Self: + """Check if the folder path contains a search term.""" + self._filters.append(f"path contains {name}") + return self diff --git a/simvue/api/objects/filter/run.py b/simvue/api/objects/filter/run.py new file mode 100644 index 00000000..75d6d7ef --- /dev/null +++ b/simvue/api/objects/filter/run.py @@ -0,0 +1,303 @@ +"""Simvue RestAPI Runs Filter.""" + +import typing +import semver +import pydantic as pyd + +try: + from typing import Self +except ImportError: + from typing_extensions import Self # noqa: UP035 + +from simvue.models import FOLDER_REGEX +from simvue.utilities import prettify_pydantic + +from .base import RestAPIFilter, Time + +try: + from typing import override +except ImportError: + from typing_extensions import override # noqa: UP035 + +Status = typing.Literal[ + "lost", "failed", "completed", "terminated", "running", "created" +] + + +class RunsFilter(RestAPIFilter): + """Filter for searching runs on the Simvue server.""" + + @prettify_pydantic + @pyd.validate_call + def has_name(self, name: str) -> Self: + """Filter based on absolute object name.""" + self._filters.append(f"name == {name}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_name_containing(self, name: str) -> Self: + """Filter base on object name containing a term.""" + self._filters.append(f"name contains {name}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_name(self, name: str) -> Self: + """Veto by object name.""" + self._filters.append(f"name != {name}") + return self + + @prettify_pydantic + @pyd.validate_call + def owner(self, username: str = "self") -> Self: + """Filter by run owner.""" + self._filters.append(f"user == {username}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_owner(self, username: str = "self") -> Self: + """Veto by run owner.""" + self._filters.append(f"user != {username}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_status(self, status: Status) -> Self: + """Filter by run status.""" + self._filters.append(f"status == {status}") + return self + + def is_running(self) -> Self: + """Filter by if run is running.""" + return self.has_status("running") + + def is_lost(self) -> Self: + """Filter by if run is lost.""" + return self.has_status("lost") + + def has_completed(self) -> Self: + """Filter by if run has completed.""" + return self.has_status("completed") + + def has_failed(self) -> Self: + """Filter by if run has failed.""" + return self.has_status("failed") + + @prettify_pydantic + @pyd.validate_call + def has_alert(self, alert_name: str, *, is_critical: bool | None = None) -> Self: + """Filter by if run has a given alert.""" + self._filters.append(f"alert.name == {alert_name}") + if is_critical is True: + self._filters.append("alert.status == critical") + elif is_critical is False: + self._filters.append("alert.status == ok") + return self + + @prettify_pydantic + @pyd.validate_call + def started_within( + self, + *, + hours: pyd.NonNegativeInt = 0, + days: pyd.NonNegativeInt = 0, + years: pyd.NonNegativeInt = 0, + ) -> Self: + """Filter by run start time interval.""" + return self._time_within(Time.Started, hours=hours, days=days, years=years) + + @prettify_pydantic + @pyd.validate_call + def modified_within( + self, + *, + hours: pyd.NonNegativeInt = 0, + days: pyd.NonNegativeInt = 0, + years: pyd.NonNegativeInt = 0, + ) -> Self: + """Filter by run modified time interval.""" + return self._time_within(Time.Modified, hours=hours, days=days, years=years) + + @prettify_pydantic + @pyd.validate_call + def ended_within( + self, + *, + hours: pyd.NonNegativeInt = 0, + days: pyd.NonNegativeInt = 0, + years: pyd.NonNegativeInt = 0, + ) -> Self: + """Filter by run end time interval.""" + return self._time_within(Time.Ended, hours=hours, days=days, years=years) + + @prettify_pydantic + @pyd.validate_call + def in_folder( + self, folder_path: typing.Annotated[str, pyd.Field(pattern=FOLDER_REGEX)] + ) -> Self: + """Filter by whether run is within the given folder.""" + self._filters.append(f"folder.path == {folder_path}") + return self + + @prettify_pydantic + @pyd.validate_call + def in_folder_containing(self, folder_path: str) -> Self: + """Filter by whether run is in folder path with expression.""" + self._filters.append(f"folder.path contains {folder_path}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_in_folder( + self, folder_path: typing.Annotated[str, pyd.Field(pattern=FOLDER_REGEX)] + ) -> Self: + """Filter by whether run is not within the given folder.""" + self._filters.append(f"folder.path != {folder_path}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_working_directory(self, working_dir: str) -> Self: + """Filter by whether run was executed in a given directory.""" + self._filters.append(f"system.cwd == {working_dir}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_working_directory(self, working_dir: str) -> Self: + """Veto by whether run was executed in a given directory.""" + self._filters.append(f"system.cwd != {working_dir}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_hostname(self, hostname: str) -> Self: + """Filter by simulation host machine.""" + self._filters.append(f"system.hostname == {hostname}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_hostname(self, hostname: str) -> Self: + """Veto by simulation host machine.""" + self._filters.append(f"system.hostname != {hostname}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_cpu( + self, *, architecture: str | None = None, processor: str | None = None + ) -> Self: + """Filter by CPU architecture and processor.""" + if architecture: + self._filters.append(f"system.cpu.arch == {architecture}") + if processor: + self._filters.append(f"system.cpu.processor == {processor}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_cpu( + self, *, architecture: str | None = None, processor: str | None = None + ) -> Self: + """Veto by CPU architecture and processor.""" + if architecture: + self._filters.append(f"system.cpu.arch != {architecture}") + if processor: + self._filters.append(f"system.cpu.processor != {processor}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_gpu(self, *, name: str | None = None, processor: str | None = None) -> Self: + """Filter by GPU name or processor. + + If no arguments are given this filters by runs which are on a + system which has GPU capability. + """ + if name: + self._filters.append(f"system.gpu.name == {name}") + if processor: + self._filters.append(f"system.gpu.processor == {name}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_gpu( + self, *, name: str | None = None, processor: str | None = None + ) -> Self: + """Veto by GPU name or processor.""" + if name: + self._filters.append(f"system.gpu.name != {name}") + if processor: + self._filters.append(f"system.gpu.processor != {name}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_python_version(self, python_version: str) -> Self: + try: + _ = semver.Version.parse(python_version) + except ValueError as e: + raise ValueError( + f"'{python_version}' is not a valid semantic version." + ) from e + self._filters.append(f"system.pythonversion == {python_version}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_python_version(self, python_version: str) -> Self: + try: + _ = semver.Version.parse(python_version) + except ValueError as e: + raise ValueError( + f"'{python_version}' is not a valid semantic version." + ) from e + self._filters.append(f"system.pythonversion != {python_version}") + return self + + @prettify_pydantic + @pyd.validate_call + def has_platform( + self, platform: str, *, release: str | None = None, version: str | None = None + ) -> Self: + """Filter by simulation host platform.""" + self._filters.append(f"system.platform.system == {platform}") + if release: + self._filters.append(f"system.platform.release == {release}") + if version: + self._filters.append(f"system.platform.version == {version}") + return self + + @prettify_pydantic + @pyd.validate_call + def exclude_platform( + self, platform: str, *, release: str | None = None, version: str | None = None + ) -> Self: + """Veto by simulation host platform. + + If platform is specified then results WITHOUT this platform are returned. + However if a version and/or release is given then results WITH the given platform + but NOT the given release/version are returned. + """ + self._filters.append( + "system.platform.system " + "!=" + if not release and not version + else "==" + " " + platform + ) + if release: + self._filters.append(f"system.platform.release != {release}") + if version: + self._filters.append(f"system.platform.version != {version}") + return self + + @override + def __str__(self) -> str: + return " && ".join(self._filters) if self._filters else "None" + + @override + def __repr__(self) -> str: + return f"{super().__repr__()[:-1]}, filters={self._filters}>" diff --git a/simvue/utilities.py b/simvue/utilities.py index faa567ec..ce2bff29 100644 --- a/simvue/utilities.py +++ b/simvue/utilities.py @@ -283,12 +283,12 @@ def wrapper(self: "Run", *args, **kwargs) -> typing.Any: return decorator -def prettify_pydantic(class_func: typing.Callable) -> typing.Callable: +def prettify_pydantic(func: typing.Callable) -> typing.Callable: """Converts pydantic validation errors to a table Parameters ---------- - class_func : typing.Callable + func : typing.Callable function to wrap Returns @@ -302,10 +302,10 @@ def prettify_pydantic(class_func: typing.Callable) -> typing.Callable: the formatted validation error """ - @functools.wraps(class_func) - def wrapper(self, *args, **kwargs) -> typing.Any: + @functools.wraps(func) + def wrapper(*args, **kwargs) -> typing.Any: try: - return class_func(self, *args, **kwargs) + return func(*args, **kwargs) except pydantic.ValidationError as e: error_str = parse_pydantic_error(e) raise RuntimeError(error_str) diff --git a/tests/conftest.py b/tests/conftest.py index aee96c6f..75662bf3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -224,7 +224,8 @@ def setup_test_run(run: sv_run.Run, *, temp_dir: pathlib.Path, create_objects: b "test_identifier": f"{_test_name}_{fix_use_id}" }, "folder": f"/simvue_unit_testing/{fix_use_id}", - "tags": ["simvue_client_unit_tests", _test_name, f"{platform.system()}"] + "tags": ["simvue_client_unit_tests", _test_name, f"{platform.system()}"], + "name": f"{_test_name}_{fix_use_id}" } if os.environ.get("CI"): @@ -232,7 +233,7 @@ def setup_test_run(run: sv_run.Run, *, temp_dir: pathlib.Path, create_objects: b run.config(suppress_errors=False) run.init( - name=TEST_DATA['metadata']['test_identifier'], + name=TEST_DATA["name"], tags=TEST_DATA["tags"], folder=TEST_DATA["folder"], visibility="tenant" if os.environ.get("CI") else None, diff --git a/tests/unit/test_filters.py b/tests/unit/test_filters.py new file mode 100644 index 00000000..17aa8a48 --- /dev/null +++ b/tests/unit/test_filters.py @@ -0,0 +1,20 @@ +"""Tests for Simvue Object filters.""" +import pytest + +from simvue.api.objects import Run + +@pytest.mark.filter +@pytest.mark.online +def test_run_filters(create_test_run: tuple[Run, dict]) -> None: + """Test retrieving a single run by filter set.""" + _run, TEST_DATA = create_test_run + _tags=TEST_DATA["tags"] + _folder=TEST_DATA["folder"] + _name=TEST_DATA["name"] + _filter = Run.filter() + for tag in _tags: + _filter = _filter.has_tag(tag) + _filter = _filter.in_folder(_folder) + _filter = _filter.has_name(_name) + _filter = _filter.created_within(hours=1) + assert _filter.count() == 1