Source code for redgrease.runtime

# -*- coding: utf-8 -*-
# from __future__ import annotations

"""
Redgrease's (overloaded) variants of the symbols loaded per default into the top level
namespace of Gear functions in the Redis server Python runtime.
"""
__author__ = "Anders Åström"
__contact__ = "anders@lyngon.com"
__copyright__ = "2021, Lyngon Pte. Ltd."
__licence__ = """The MIT License
Copyright © 2021 Lyngon Pte. Ltd.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
 software and associated documentation files (the “Software”), to deal in the Software
 without restriction, including without limitation the rights to use, copy, modify,
 merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
 permit persons to whom the Software is furnished to do so, subject to the following
 conditions:

The above copyright notice and this permission notice shall be included in all copies
 or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
 INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
 PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
 OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from typing import Hashable, Iterable, TypeVar

import redgrease.gears

# if TYPE_CHECKING:
from redgrease.typing import (
    Accumulator,
    BatchReducer,
    Expander,
    Extractor,
    Filterer,
    InputRecord,
    Key,
    Mapper,
    OutputRecord,
    Processor,
    Reducer,
    Registrator,
)

T = TypeVar("T")


[docs]class GearsBuilder(redgrease.gears.OpenGearFunction): """The RedisGears :ref:`GearsBuilder` class is imported to the runtime's environment by default, and this class is a RedGrease wrapper of it. It exposes the functionality of the function's `context builder <https://oss.redislabs.com/redisgears/1.0/functions.html#context-builder>`_. .. warning:: GearsBuilder is mutable with respect to the operations. The :class:`.GearsBuilder` is a subclass of :class:`.gears.OpenGearFunction`, but unlike other OpenGearFunctions, the GearsBuilder mutates an internal GearFunction instead of creating a new one for each operation. This behavior is deliberate, in order to be consistent with the original GearsBuilder. """ def __init__( self, reader: str = "KeysReader", defaultArg: str = "*", desc: str = None, *args, **kwargs, ): """Gear function / process factory Args: reader (str, optional): Input records reader Defining where the input to the gear will come from. One of: - ``"KeysReader"`` - ``"KeysOnlyReader"`` - ``"StreamReader"`` - ``"PythonReader"`` - ``"ShardsReader"`` - ``"CommandReader"`` Defaults to 'KeysReader'. defaultArg (str, optional): Additional arguments to the reader. These are usually a key's name, prefix, glob-like or a regular expression. Its use depends on the function's reader type and action. Defaults to '*'. desc (str, optional): An optional description. Defaults to None. """ requirements = kwargs.pop("requirements", None) reader_op = redgrease.gears.Reader(reader, defaultArg, desc, *args, **kwargs) self._function: redgrease.gears.OpenGearFunction = ( redgrease.gears.OpenGearFunction( operation=reader_op, requirements=requirements, ) ) @property def gearfunction(self): """The "open" GearFunction object at this step in the pipeline. This GearFunction is itself immutable but can be built upon to create new GearFunctions, independently from the GearsBuilder. Returns: redgrease.gears.OpenGearFunction: The current GearFunction object. """ return self._function
[docs] def run( self, arg: str = None, # TODO: This can also be a Python generator convertToStr: bool = True, collect: bool = True, # Helpers, all must be None # Other Redgrease args requirements: Iterable[str] = None, on=None, # Other Redis Gears args **kwargs, # TODO: Add all the Reader specific args here ) -> redgrease.gears.ClosedGearFunction: return self._function.run( arg=arg, convertToStr=convertToStr, collect=collect, requirements=requirements, on=on, **kwargs, )
[docs] def register( self, prefix: str = "*", convertToStr: bool = True, collect: bool = True, # Helpers, all must be None mode: str = None, onRegistered: Registrator = None, eventTypes: Iterable[str] = None, keyTypes: Iterable[str] = None, readValue: bool = None, batch: int = None, duration: int = None, onFailedPolicy: str = None, onFailedRetryInterval: int = None, trimStream: bool = None, trigger: str = None, # Reader Specific: CommandReader # Other Redgrease args requirements: Iterable[str] = None, on=None, # Other Redis Gears args **kwargs, # TODO: Add all the Reader specific args here ) -> redgrease.gears.ClosedGearFunction: return self._function.register( prefix=prefix, convertToStr=convertToStr, collect=collect, # Helpers mode=mode, onRegistered=onRegistered, eventTypes=eventTypes, keyTypes=keyTypes, readValue=readValue, batch=batch, duration=duration, onFailedPolicy=onFailedPolicy, onFailedRetryInterval=onFailedRetryInterval, trimStream=trimStream, trigger=trigger, # CommandReader specific # Other Redgrease args requirements=requirements, on=on, # Other Redis Gears args **kwargs, )
[docs] def map( self, op: Mapper[InputRecord, OutputRecord], # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_map` operation that performs a one-to-one (1:1) mapping of records. Args: op :data:`redgrease.typing.Mapper`): Function to map on the input records. The function must take one argument as input (input record) and return something as an output (output record). requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Map operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Map operation as last step. """ self._function = self._function.map(op=op, requirements=requirements, **kwargs) return self
[docs] def flatmap( self, op: Expander[InputRecord, OutputRecord] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_flatmap` operation that performs one-to-many (1:N) mapping of records. Args: op :data:`redgrease.typing.Expander`, optional): Function to map on the input records. The function must take one argument as input (input record) and return an iterable as an output (output records). Defaults to the 'identity-function', I.e. if input is an iterable will be expanded. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the FlatMap operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a FlatMap operation as last step. """ self._function = self._function.flatmap( op=op, requirements=requirements, **kwargs ) return self
[docs] def foreach( self, op: Processor[InputRecord], # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_foreach` operation performs one-to-the-same (1=1) mapping. Args: op :data:`redgrease.typing.Processor`): Function to run on each of the input records. The function must take one argument as input (input record) and should not return anything. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the ForEach operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a ForEach operation as last step. """ self._function = self._function.foreach( op=op, requirements=requirements, **kwargs ) return self
[docs] def filter( self, op: Filterer[InputRecord] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_filter` operation performs one-to-zero-or-one (1:bool) filtering of records. Args: op :data:`redgrease.typing.Filterer`, optional): Function to apply on the input records, to decide which ones to keep. The function must take one argument as input (input record) and return a bool. The input records evaluated to `True` will be kept as output records. Defaults to the 'identity-function', i.e. records are filtered based on their own trueess or falseness. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Filter operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a FIlter operation as last step. """ self._function = self._function.filter( op=op, requirements=requirements, **kwargs ) return self
[docs] def accumulate( self, op: Accumulator[T, InputRecord] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_accumulate` operation performs many-to-one mapping (N:1) of records. Args: op :data:`redgrease.typing.Accumulator`, optional): Function to to apply on the input records. The function must take two arguments as input: - An accumulator value, and - The input record. It should aggregate the input record into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value. Defaults to a list accumulator, I.e. the output will be a list of all inputs. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Accumulate operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with Accumulate operation as last step. """ self._function = self._function.accumulate( op=op, requirements=requirements, **kwargs ) return self
[docs] def localgroupby( self, extractor: Extractor[InputRecord, Key] = None, reducer: Reducer[Key, T, InputRecord] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_localgroupby` operation performs many-to-less mapping (N:M) of records. Args: extractor :data:`redgrease.typing.Extractor`, optional): Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input. reducer :data:`redgrease.typing.Reducer`, optional): Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that's called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Defaults to a list accumulator, I.e. the output will be a list of all inputs, for each group. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the LocalGroupBy operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a LocalGroupBy operation as last step. """ if self._function: self._function = self._function.localgroupby( extractor=extractor, reducer=reducer, requirements=requirements, **kwargs, ) return self
[docs] def limit( self, length: int, start: int = 0, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Instance-local :ref:`op_limit` operation limits the number of records. Args: length (int): The maximum number of records. start (int, optional): The index of the first input record. Defaults to 0. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Limit operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Limit operation as last step. """ self._function = self._function.limit(length=length, start=start, **kwargs) return self
[docs] def collect(self, **kwargs) -> "GearsBuilder": """Cluster-global :ref:`op_collect` operation collects the result records. Args: **kwargs: Additional parameters to the Collect operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Collect operation as last step. """ if self._function: self._function = self._function.collect(**kwargs) return self
[docs] def repartition( self, extractor: Extractor[InputRecord, Hashable], # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Cluster-global :ref:`op_repartition` operation repartitions the records by shuffling them between shards. Args: extractor :data:`redgrease.typing.Extractor`): Function that takes a record and calculates a key that is used to determine the hash slot, and consequently the shard, that the record should migrate to to. The function must take one argument as input (input record) and return a string (key). The hash slot, and consequently the destination shard, is determined by the value of the key. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Repartition operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Repartition operation as last step. """ self._function = self._function.repartition( extractor=extractor, requirements=requirements, **kwargs ) return self
[docs] def aggregate( self, zero: T = None, seqOp: Accumulator[T, InputRecord] = None, combOp: Accumulator[T, T] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Distributed :ref:`op_aggregate` operation perform an aggregation on local data then a global aggregation on the local aggregations. Args: zero (Any, optional): The initial / zero value of the accumulator variable. Defaults to an empty list. seqOp :data:`redgrease.typing.Accumulator`, optional): A function to be applied on each of the input records, locally per shard. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value. Defaults to addition, if 'zero' is a number and to a list accumulator if 'zero' is a list. combOp :data:`redgrease.typing.Accumulator`, optional): A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of `seqOp`). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value. Defaults to re-use the `seqOp` function. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Aggregate operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Aggregate operation as last step. """ self._function = self._function.aggregate( zero=zero, seqOp=seqOp, combOp=combOp, requirements=requirements, **kwargs, ) return self
[docs] def aggregateby( self, extractor: Extractor[InputRecord, Key] = None, zero: T = None, seqOp: Reducer[Key, T, InputRecord] = None, combOp: Reducer[Key, T, T] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Distributed :ref:`op_aggregateby` operation, behaves like aggregate, but separated on each key, extracted using the extractor. Args: extractor :data:`redgrease.typing.Extractor`, optional): Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input. zero (Any, optional): The initial / zero value of the accumulator variable. Defaults to an empty list. seqOp :data:`redgrease.typing.Accumulator`, optional): A function to be applied on each of the input records, locally per shard and group. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value. Defaults to a list reducer. combOp :data:`redgrease.typing.Accumulator`): A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of `seqOp`). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value. Defaults to re-use the `seqOp` function. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the AggregateBy operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a AggregateBy operation as last step. """ self._function = self._function.aggregateby( extractor=extractor, zero=zero, seqOp=seqOp, combOp=combOp, requirements=requirements, **kwargs, ) return self
[docs] def groupby( self, extractor: Extractor[InputRecord, Key] = None, reducer: Reducer[Key, T, InputRecord] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Cluster-local :ref:`op_groupby` operation performing a many-to-less (N:M) grouping of records. Args: extractor :data:`redgrease.typing.Extractor`, optional): Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input. reducer :data:`redgrease.typing.Reducer`, optional): Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that's called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Defaults to a list reducer. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the GroupBy operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a GroupBy operation as last step. """ self._function = self._function.groupby( extractor=extractor, reducer=reducer, requirements=requirements, **kwargs ) return self
[docs] def batchgroupby( self, extractor: Extractor[InputRecord, Key] = None, reducer: BatchReducer[Key, T, InputRecord] = None, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Cluster-local :ref:`op_groupby` operation, performing a many-to-less (N:M) grouping of records. Note: Using this operation may cause a substantial increase in memory usage during runtime. Consider using the GroupBy Args: extractor :data:`redgrease.typing.Extractor`, optional): Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input. reducer :data:`redgrease.typing.Reducer`): Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that's called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Default is the length (`len`) of the input. **kwargs: Additional parameters to the BatchGroupBy operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a BatchGroupBy operation as last step. """ self._function = self._function.batchgroupby( extractor=extractor, reducer=reducer, requirements=requirements, **kwargs ) return self
[docs] def sort( self, reverse: bool = True, # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """:ref:`op_sort` the records Args: reverse (bool, optional): Sort in descending order (higher to lower). Defaults to True. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the Sort operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Sort operation as last step. """ self._function = self._function.sort( reverse=reverse, requirements=requirements, **kwargs ) return self
[docs] def distinct(self, **kwargs) -> "GearsBuilder": """Keep only the :ref:`op_distinct` values in the data. Args: **kwargs: Additional parameters to the Distinct operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Distinct operation as last step. """ self._function = self._function.distinct(**kwargs) return self
[docs] def count(self, **kwargs) -> "GearsBuilder": """:ref:`op_count` the number of records in the execution. Args: **kwargs: Additional parameters to the Count operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Count operation as last step. """ self._function = self._function.count(**kwargs) return self
[docs] def countby( self, extractor: Extractor[InputRecord, Hashable] = lambda x: str(x), # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Distributed :ref:`op_countby` operation counting the records grouped by key. Args: extractor :data:`redgrease.typing.Extractor`): Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to 'lambda x: str(x)'. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the CountBy operation. Returns: GearsBuilder: Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a CountBy operation as last step. """ self._function = self._function.countby( extractor=extractor, # Other Redgrease args requirements=requirements, # Other Redis Gears args **kwargs, ) return self
[docs] def avg( self, extractor: Extractor[InputRecord, float] = lambda x: float( x if isinstance(x, (int, float, str)) else str(x) ), # Other Redgrease args requirements: Iterable[str] = None, # Other Redis Gears args **kwargs, ) -> "GearsBuilder": """Distributed :ref:`op_avg` operation, calculating arithmetic average of the records. Args: extractor :data:`redgrease.typing.Extractor`): Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to 'lambda x: float(x)'. requirements (Iterable[str], optional): Additional requirements / dependency Python packages. Defaults to None. **kwargs: Additional parameters to the map operation. Returns: OpenGearFunction: A new "open" gear function with an avg operation as last step. GearsBuilder - The same GearBuilder, but with updated function. Note that for GearBuilder this method does **not** return a new GearFunction, but instead returns the same GearBuilder, but with its internal function updated. """ self._function = self._function.avg( extractor=extractor, # Other Redgrease args requirements=requirements, # Other Redis Gears args **kwargs, ) return self
GB = GearsBuilder """Convenience shorthand for GearsBuilder.""" # # Suppress warnings for missing redisgears package # # As this package only lives on the Redis Gears server # pyright: reportMissingImports=false
[docs]class atomic: """The atomic() Python context is imported to the runtime's environment by default. The context ensures that all operations in it are executed atomically by blocking he main Redis process. """ def __init__(self): from redisgears import atomicCtx as redisAtomic self._atomic = redisAtomic() def __enter__(self): self._atomic.__enter__() return self def __exit__(self, *args, **kwargs): self._atomic.__exit__(*args, **kwargs)
[docs]def execute(command: str, *args) -> bytes: """Execute an arbitrary Redis command. Args: command (str): The commant to execute Returns: bytes: Raw command response """ from redisgears import executeCommand as redisExecute return redisExecute(command, *args)
[docs]def hashtag() -> str: """Returns a hashtag that maps to the lowest hash slot served by the local engine's shard. Put differently, it is useful as a hashtag for partitioning in a cluster. Returns: str: A hastag that maps to the lowest hash slot served by the local engine. """ from redisgears import getMyHashTag as redisHashtag return redisHashtag()
[docs]def hashtag3() -> str: """Provides a the same value as `hashtag`, but surrounded by curly braces. For example, if `hashtag()` generates "06S", then `hashtag3' gives "{06S}". This is useful for creating slot-specific keys using f-strings, inside gear functions, as the braces are already escaped. Example:: redgrease.cmd.set(f"{hashtag3()}", some_value) Returns: str: A braces-enclosed hashtag string """ return f"{{{hashtag()}}}"
[docs]def log(message: str, level: str = "notice"): """Print a message to Redis' log. Args: message (str): The message to output level (str, optional): Message loglevel. Either:: - 'debug' - 'verbose' - 'notice' - 'warning' Defaults to 'notice'. """ from redisgears import log as redisLog return redisLog(str(message), level=level)
[docs]def configGet(key: str) -> str: """Fetches the current value of a RedisGears configuration option. Args: key (str): The configuration option key """ from __main__ import configGet as redisConfigGet return redisConfigGet(key)
[docs]def gearsConfigGet(key: str, default=None) -> str: """Fetches the current value of a RedisGears configuration option and returns a default value if that key does not exist. Args: key (str): The configuration option key. default ([type], optional): A default value. Defaults to None. """ from __main__ import gearsConfigGet as redisGearsConfigGet return redisGearsConfigGet(key)
[docs]class gearsFuture: """The gearsFuture object allows another thread/process to process the record. Returning this object from a step's operation tells RedisGears to suspend execution until background processing had finished/failed. The gearsFuture object provides two control methods: ``continueRun()`` and ``continueFailed()``. Both methods are thread-safe and can be called at any time to signal that the background processing has finished. ``continueRun()`` signals success and its argument is a record for the main process. ``continueFailed()`` reports a failure to the main process and its argument is a string describing the failure. Calling gearsFuture() is supported only from the context of the following operations: * :ref:`Map` * flatmap * filter * foreach * aggregate * aggregateby An attempt to create a ``gearsFuture`` object outside of the supported contexts will result in an exception. """ def __init__(self): from redisgears import gearsFutureCtx as redisGearsFuture self._gearsFuture = redisGearsFuture()
[docs] def continueRun(self, record) -> None: """Signals success and its argument is a record for the main process. Args: record (Any): Record to yield to the blocked Gear function. """ return self._gearsFuture.continueRun(record)
[docs] def continueFail(self, message: str): """Reports a failure to the main process and its argument is a string describing the failure. Args: message (str): Message describing the failure. """ return self._gearsFuture.continueFail(message)
def run( function: redgrease.gears.GearFunction, builder: GearsBuilder, ): """Transforms a RedGrease GearFunction into a native Gears function. Note: This function is specific to RedGrease and is NOT a standard Redis Gears runtime function. Args: function (redgrease.gears.GearFunction): A GearsFunction, as created with RedGrease constructs. builder (GearsBuilder): The Redis Gears native GearsBuilder class. Returns: GearsBuilder: Redis Gears native GearsBuilder object, reconstructed from the input. """ if function.input_function: input_builder = run(function.input_function, builder) else: input_builder = builder return function.operation.add_to(input_builder)