# -*- coding: utf-8 -*-
"""
Datatypes and parsers for the various structures, specific to Redis Gears.
These datatypes are returned from various redgrease functions, merely for the purpose
of providing more convenient structure, typing and documentation compared to the native
'list-based' structures.
They are generally not intended to be instantiated by end-users.
"""
__author__ = "Anders Åström"
__contact__ = "anders@lyngon.com"
__copyright__ = "2021, Lyngon Pte. Ltd."
__licence__ = """The MIT License
Copyright © 2021 Lyngon Pte. Ltd.
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the “Software”), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be included in all copies
or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import ast
from typing import Any, Dict, Generic, Iterable, List, Optional, TypeVar, Union
import attr
import cloudpickle
import wrapt
from redgrease.utils import (
_REnum,
bool_ok,
list_parser,
optional,
safe_bool,
safe_str,
str_if_bytes,
to_bytes,
to_dict,
to_kwargs,
)
T = TypeVar("T")
[docs]@attr.s(frozen=True, auto_attribs=True, repr=False)
class ExecID:
"""Execution ID
Attributes:
shard_id (str):
Shard Identifier
sequence (in):
Sequence number
"""
shard_id: str = "0000000000000000000000000000000000000000"
sequence: int = 0
def __repr__(self) -> str:
class_name = self.__class__.__name__
if self.shard_id == "0000000000000000000000000000000000000000":
return f"{class_name}(sequence={self.sequence})"
else:
return f"""{class_name}("{self.shard_id}", {self.sequence})"""
def __str__(self):
return f"{self.shard_id}-{self.sequence}"
def __bytes__(self):
return str(self).encode()
[docs] @staticmethod
def parse(value: Union[str, bytes]) -> "ExecID":
"""Parses a string or bytes representation into a `redgrease.data.ExecID`
Returns:
redgrease.data.ExecID:
The parsed ExecID
Raises:
ValueError:
If the the value cannot be parsed.
"""
if isinstance(value, bytes):
value = value.decode()
try:
values = value.split("-")
shard_id = values[0]
sequence = int(values[1])
except (AttributeError, IndexError) as e:
raise ValueError(
f"Unable to parse ExecID. Invalid serialization: '{value}'"
) from e
return ExecID(shard_id=shard_id, sequence=sequence)
[docs]class ExecutionResult(wrapt.ObjectProxy, Generic[T]):
"""Common class for all types of execution results.
Generic / Polymorphic on the result type (T) of the Gears function.
Redis Gears specifies a few different commands for getting the results of a
function execution (`pyexecute`, `getresults`, `getresultsblocking` and `trigger`),
each potentially having more than one different possible value type, depending on
context.
In addition, while most gears functions result in collection of values, some
functions (notably those ending with a `count` or `avg` operation) semantically
always have scalar results, but are still natively returned as a list.
redgrease.data.ExecutionResult is a unified result type for all scenarios,
aiming at providing as intuitive API experience as possible.
It is generic and walks and quacks just like the main result type (T) it wraps.
With some notable exceptions:
- It has an additional property `errors` containing any accumulated errors
- Scalar results, behaves like scalars, but **also** like a single element list.
This behavior isn't always perfect but gives for the most part an intuitive api
experience.
If the behavior in some situations are confusing, the raw wrapped value can be
accessed through the `value` property.
"""
def __init__(self, value: T, errors: Optional[List] = None):
wrapt.ObjectProxy.__init__(self, value)
self._self_errors: Optional[List] = errors
@property
def value(self):
"""Gets the raw result value of the Gears function execution.
Returns:
T:
The result value / values
"""
return self.__wrapped__
@property
def errors(self) -> Optional[List]:
return self._self_errors
def __repr__(self) -> str:
cls_nm = "ExecutionResult"
val_typ_nm = self.__class__.__name__
if self._self_errors:
err = f", errors={repr(self._self_errors)}"
else:
err = ""
return f"{cls_nm}[{val_typ_nm}]({self.__wrapped__.__repr__()}{err})"
def __iter__(self):
if hasattr(self.value, "__iter__"):
return iter(self.value)
else:
return iter([] if self.value is None else [self.value])
def __len__(self) -> int:
if hasattr(self.value, "__len__") and not isinstance(self.value, (str, bytes)):
return len(self.value)
else:
return 0 if self.value is None else 1
def __eq__(self, other):
if not isinstance(self.value, list) and isinstance(other, list):
return [self.value] == other
else:
return self.value == other
def __getitem__(self, *args, **kwargs):
if hasattr(self.value, "__getitem__"):
return self.value.__getitem__(*args, **kwargs)
else:
if args and args[0] == 0:
return self.value
else:
raise TypeError(f"{self} is not subscriptable")
def __contains__(self, val) -> bool:
if hasattr(self.value, "__contains__"):
return val in self.value
else:
return self.value == val
def __bytes__(self) -> bytes:
return to_bytes(self.value)
def __call__(self, *args: Any, **kwds: Any) -> Any:
return self.value(*args, **kwds)
[docs]def parse_execute_response(response) -> ExecutionResult:
"""Parses raw responses from `pyexecute`, `getresults` and `getresultsblocking`
into a `redgrease.data.ExecuteResponse` object.
Args:
response (Any):
The raw gears function response.
This is most commonly a tuple of a list with the actual results and a list
of errors List[List[Union[T, Any]]].
For some scenarios the response may take other forms, like a simple `Ok`
(e.g. in the absence of a closing `run()` operation) or an execution ID
(e.g. for non-blocking executions).
Returns:
ExecutionResult[T]:
A parsed execution response
"""
if bool_ok(response):
# Any "Ok" response is just True
return ExecutionResult(True)
elif isinstance(response, list) and len(response) == 2:
# The 'normal' list-of-lists response
# Results are unpickled if they are pickled
# Note that the special case when the result only has one element,
# then the single value is used instread
# as the ExecutionResults would pretend it is a list anyway, if needed
# This way scalar results from for example `count` and `avg` will behave
# like scalars which is what is typically wanted
result, errors = response
if isinstance(result, list):
try:
result = [cloudpickle.loads(value) for value in result]
except (TypeError, cloudpickle.pickle.UnpicklingError):
pass
if len(result) == 1:
result = result[0]
return ExecutionResult(result, errors=errors)
elif isinstance(response, bytes):
# Bytes response means ExecID
return ExecutionResult(ExecID.parse(response))
else:
# If the response doesn't fit any known pattern, its returned as-is
return ExecutionResult(response)
[docs]def parse_trigger_response(response) -> ExecutionResult:
"""Parses raw responses from `trigger` into a `redgrease.data.ExecuteResponse`
object.
Args:
response (Any):
The gears function response.
This is a tuple of a list with the actual results and a list of errors
List[List[Union[T, Any]]].
pickled (bool, optional):
Indicates if the response is pickled and need to be unpickled.
Defaults to False.
Returns:
ExecutionResult[T]:
A parsed execution response
"""
try:
response = [cloudpickle.loads(value) for value in response]
except (TypeError, cloudpickle.pickle.UnpicklingError):
pass
if len(response) == 1:
response = response[0]
return ExecutionResult(response)
[docs]class ExecutionStatus(_REnum):
"""Representation of the various states an execution could be in."""
created = b"created"
"""Created - The execution has been created."""
running = b"running"
"""Running - The execution is running."""
done = b"done"
"""Done - Thee execution is done."""
aborted = b"aborted"
"""Aborted - The execution has been aborted."""
pending_cluster = b"pending_cluster"
"""Pending Cluster - Initiator is waiting for all workers to finish."""
pending_run = b"pending_run"
"""Pending Run - Worker is pending ok from initiator to execute."""
pending_receive = b"pending_receive"
"""Pending Receive - Initiator is pending acknowledgement from workers on receiving
execution.
"""
pending_termination = b"pending_termination"
"""Pending Termination - Worker is pending termination messaging from initiator"""
# TODO: Isn't this sugar rather than data?
[docs]class ExecLocality(_REnum):
"""Locality of execution: Shard or Cluster"""
Shard = "Shard"
Cluster = "Cluster"
[docs]class RedisObject:
"""Base class for many of the more complex Redis Gears configuration values"""
[docs] @classmethod
def from_redis(cls, params) -> "RedisObject":
"""Default parser
Assumes the object is seralized as a list of alternating attribute names and
values.
Note: This method should not be invoked directly on the 'RedisObject'
base class.
It should be only be invoked on subclasses of RedisObjects.
Returns:
RedisObject:
Returns the RedisObject subclass if, and only if, its constructor
argument names and value types exactly match the names and values
in the input list.
Raises:
TypeError:
If either the input list contains attributes not defined in the
subclass constructor, or if the subclass defines mandatory constructor
arguments that are not present in the input list.
"""
return cls(**to_kwargs(params)) # type: ignore
[docs]def parse_PD(value: Union[str, bytes]) -> Dict:
"""Parses str or bytes to a dict.
Used for the 'PD' field in the 'Registration' type, returned by 'getregistrations'.
Args:
value (Union[str,bytes]):
Serialized version of a dict.
Returns:
Dict:
A dictionary
"""
str_val = safe_str(value)
return dict(ast.literal_eval(str_val))
[docs]@attr.s(auto_attribs=True, frozen=True)
class ExecutionInfo(RedisObject):
"""Return object for `redgrease.client.Redis.dumpexecutions` command."""
executionId: ExecID = attr.ib(converter=ExecID.parse) # type: ignore #7912
"""The execution Id"""
status: ExecutionStatus = attr.ib(converter=ExecutionStatus)
"""The status"""
registered: bool = attr.ib(converter=safe_bool)
"""Indicates whether this is a registered execution"""
[docs]@attr.s(auto_attribs=True, frozen=True)
class RegData(RedisObject):
"""Object reprenting the values for the `Registration.RegistrationData`, part of
the return value of `redgrease.client.dupregistrations` command.
"""
mode: str = attr.ib(converter=safe_str)
"""Registration mode."""
numTriggered: int
"""A counter of triggered executions."""
numSuccess: int
"""A counter of successful executions."""
numFailures: int
"""A counter of failed executions."""
numAborted: int
"""A conter of aborted executions."""
lastError: str
"""The last error returned."""
args: Dict[str, Any] = attr.ib(converter=to_kwargs)
"""Reader-specific arguments"""
status: Optional[bool] = attr.ib(
converter=optional(bool_ok), # type: ignore #7912
default=None,
)
"""Undocumented status field"""
# @dataclass
[docs]@attr.s(auto_attribs=True, frozen=True)
class Registration(RedisObject):
"""Return object for `redgrease.client.Redis.dumpregistrations` command.
Contains the information about a function registration.
"""
id: ExecID = attr.ib(converter=ExecID.parse) # type: ignore #7912
"""The registration ID."""
reader: str = attr.ib(converter=safe_str)
"""The Reader."""
desc: str
"""The description."""
RegistrationData: RegData = attr.ib(
converter=RegData.from_redis # type: ignore #7912
)
"""Registration Data, see `RegData`."""
PD: Dict[Any, Any] = attr.ib(converter=parse_PD, repr=False)
"""Private data"""
ExecutionThreadPool: Optional[str] = attr.ib(converter=safe_str, default=None)
[docs]@attr.s(auto_attribs=True, frozen=True)
class ExecutionStep(RedisObject):
"""Object reprenting a 'step' in the `ExecutionPlan.steps`, attribute of
the return value of `redgrease.client.getexecution` command.
"""
type: str = attr.ib(converter=safe_str)
"""Step type."""
duration: int
"""The step's duration in milliseconds (0 when `ProfileExecutions` is disabled)"""
name: str = attr.ib(converter=safe_str)
"""Step callback"""
arg: str = attr.ib(converter=safe_str)
"""Step argument"""
[docs]@attr.s(auto_attribs=True, frozen=True)
class ExecutionPlan(RedisObject):
"""Object representing the execution plan for a given shard in the response from the
`redgrease.client.Redis.getexecution` command.
"""
status: ExecutionStatus = attr.ib(converter=ExecutionStatus)
"""The current status of the execution."""
shards_received: int
"""Number of shards that received the execution."""
shards_completed: int
"""Number of shards that completed the execution."""
results: int
"""Count of results returned."""
errors: int
"""Count of the errors raised."""
total_duration: int
"""Total execution duration in milliseconds."""
read_duration: int
"""Reader execution duration in milliseconds."""
steps: List[ExecutionStep] = attr.ib(
converter=list_parser(ExecutionStep.from_redis) # type: ignore #7912
)
"""The steps of the execution plan."""
[docs] @staticmethod
def parse(res: Iterable[bytes]) -> Dict[str, "ExecutionPlan"]:
"""Parse the raw results of `redgrease.client.Redis.getexecution` into a dict
that maps from shard identifiers to ExecutionStep objects.
Returns:
Dict[str, ExecutionPlan]:
Execution plan mapping.
"""
return {
str_if_bytes(shard[b"shard_id"]): ExecutionPlan.from_redis( # type: ignore
shard[b"execution_plan"] # type: ignore
)
for shard in map(to_dict, res)
}
[docs]@attr.s(auto_attribs=True, frozen=True)
class ShardInfo(RedisObject):
"""Object representing a shard in the `ClusterInfo.shards` attribute in the response
from `redgrease.client.Redis.infocluster` command.
"""
id: str = attr.ib(converter=safe_str)
"""The shard's identifier int the cluster."""
ip: str = attr.ib(converter=safe_str)
"""The shard's IP address."""
port: int
"""The shard's port."""
unixSocket: str = attr.ib(converter=safe_str)
"""The shards UDS."""
runid: str = attr.ib(converter=safe_str)
"""The engines run identifier."""
minHslot: int
"""Lowest hash slot served by the shard."""
maxHslot: int
"""Highest hash slot served by the shard."""
pendingMessages: int
"""Number of pending messages"""
[docs]@attr.s(auto_attribs=True, frozen=True)
class ClusterInfo(RedisObject):
"""Information about the Redis Gears cluster.
Return object for `redgrease.client.Redis.infocluster` command.
"""
my_id: str
"""The identifier of the shard the client is connected to."""
my_run_id: str
shards: List[ShardInfo] = attr.ib(
converter=list_parser(ShardInfo.from_redis) # type: ignore
)
"""List of the all the shards in the cluster."""
[docs] @staticmethod
def parse(res: bytes) -> Optional["ClusterInfo"]:
"""Parses the response from `redgrease.client.Redis.infocluster` into a
`ClusterInfo` object.
If the client is not connected to a Redis Gears cluster, `None` is returned.
Returns:
Optional[ClusterInfo]:
A ClusterInfo object or None (if not in cluster mode).s
"""
if not res or safe_str(res) == "no cluster mode":
return None
cluster_info = ClusterInfo(
my_id=safe_str(res[1]),
my_run_id=safe_str(res[3]),
shards=res[4],
)
return cluster_info
[docs]@attr.s(auto_attribs=True, frozen=True)
class PyStats(RedisObject):
"""Memory usage statistics from the Python interpreter.
As returned by `redgrease.client.Redis.pystats`
"""
TotalAllocated: int
"""A total of all allocations over time, in bytes."""
PeakAllocated: int
"""The peak allocations, in bytes."""
CurrAllocated: int
"""The currently allocated memory, in bytes."""
[docs]@attr.s(auto_attribs=True, frozen=True)
class PyRequirementInfo(RedisObject):
"""Information about a Python requirement / dependency."""
GearReqVersion: int
"""An internally-assigned version of the requirement.
(note: this isn't the package's version)
"""
Name: str = attr.ib(converter=safe_str)
"""The name of the requirement as it was given to the 'requirements' argument
of the `pyexecute` command.
"""
IsDownloaded: bool = attr.ib(converter=safe_bool)
"""`True` if the requirement wheels was successfully download, otherwise `False`.
"""
IsInstalled: bool = attr.ib(converter=safe_bool)
"""`True` if the requirement wheels was successfully installed, otherwise `False`.
"""
CompiledOs: str = attr.ib(converter=safe_str)
"""The underlying Operating System"""
Wheels: List[str] = attr.ib(
converter=lambda wheels: safe_str(wheels) # type: ignore
if isinstance(wheels, bytes)
else [safe_str(wheel) for wheel in wheels]
)
"""A List of Wheels required by the requirement."""