# -*- coding: utf-8 -*-
# from __future__ import annotations
"""
GearsFunction and Operation definitions
"""
__author__ = "Anders Åström"
__contact__ = "anders@lyngon.com"
__copyright__ = "2021, Lyngon Pte. Ltd."
__licence__ = """The MIT License
Copyright © 2021 Lyngon Pte. Ltd.
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the “Software”), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be included in all copies
or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import numbers
import operator
from typing import Any, Dict, Generic, Hashable, Iterable, Optional, Type, TypeVar
import redgrease.sugar as sugar
import redgrease.utils
from redgrease.typing import (
Accumulator,
BatchReducer,
Expander,
Extractor,
Filterer,
InputRecord,
Key,
Mapper,
OutputRecord,
Processor,
Reducer,
Registrator,
)
T = TypeVar("T")
################################################################################
# Default Operands #
################################################################################
def _default_accumulator(acc, r):
acc = acc if isinstance(acc, list) else [acc]
acc.append(r)
return acc
def _default_extractor(r):
return hash(r)
def _default_reducer(_, acc, r):
return _default_accumulator(acc, r)
def _default_batch_reducer(_, records):
return len(records)
################################################################################
# Operations #
################################################################################
class Operation:
"""Abstract base class for Gear function operations.
Operations are the building block of RedisGears functions.
Different operation types can be used to achieve a variety of results to
meet various data processing needs.
Operations can have zero or more arguments that control their operation.
Depending on the operation's type arguments may be language-native data types
and function callbacks.
Attributes:
kwargs (dict):
Any left-over keyword arguments not explicitly consumed by the operation.
"""
def __init__(self, **kwargs):
"""Instantiate a Operation with left-over args"""
self.kwargs = kwargs
def add_to(self, function):
"""Placeholder method for adding the operation to the end of a Gear function.
This method must be implemented in each subclass, and will throw a
``NotImplementedException`` if called directly on the `Operation` superclass.
Args:
function (Union[Type, OpenGearFunction]):
The "open" gear function to append this operation to.
If, and only if, the operation is a reader function (always and only
the first operation in any Gear function), then the `function`
argument must instead be a GearsBuilder class/type.
Raises:
NotImplementedError:
If invoked on the `Operation` superclass, and,or not implemented in the
subclass.
"""
raise NotImplementedError(
"Builder Operation has not implemented the `add_to` method: "
f"'{self.__class__.__name__}'"
)
class Nop(Operation):
"""No Operation.
This Operation does nothing.
"""
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Returns the function, unmodified.
Args:
function (Union[Type, OpenGearFunction]):
The "open" gear function to append this operation to.
If, and only if, the operation is a reader function (always and only
the first operation in any Gear function), then the `function`
argument must instead be a GearsBuilder class/type.
Returns:
OpenGearFunction:
The function unmodified.
"""
return function
class Reader(Operation):
"""Reader operation
The Reader operation is always and only the first operation of any GearFunction.
It defines which reader type to use and its arguments.
Attributes:
reader (str):
The type of reader (https://oss.redislabs.com/redisgears/readers.html)
- ``"KeysReader"``
- ``"KeysOnlyReader"``
- ``"StreamReader"``
- ``"PythonReader"``
- ``"ShardsIDReader"``
- ``"CommandReader"``
defaultArg (str):
Argument that the reader may need. These are usually a key's name, prefix,
glob-like or a regular expression. Its use depends on the function's reader
type and action.
desc (str):
Function description.
"""
def __init__(
self,
reader: str = sugar.ReaderType.KeysReader,
defaultArg: str = "*",
desc: Optional[str] = None,
**kwargs,
) -> None:
"""Instantiate Reader operation
Args:
reader (str, optional):
Reader type.
Defaults to sugar.ReaderType.KeysReader.
defaultArg (str, optional):
Reader default arguments.
Defaults to "*".
desc (Optional[str], optional):
Function description.
Defaults to ``None``.
"""
super().__init__(**kwargs)
self.reader = reader
self.defaultArg = defaultArg
self.desc = desc
def add_to(self, builder: Type) -> "OpenGearFunction":
"""Create a new gear function based on this reader information.
Args:
builder (Type):
GearsBuilder class. Defines the constructor to use to create the
Gear function.
Returns:
OpenGearFunction:
Returns a minimal "open" gear function, consisting only of the
reader.
"""
return builder(self.reader, self.defaultArg, self.desc, **self.kwargs)
[docs]class Run(Operation):
"""Run action
The Run action runs a Gear function as a batch.
The function is executed once and exits once the data is exhausted by its reader.
The Run action can only be the last operation of any GearFunction, and it
effectivesly 'closes' it to further operations.
Attributes:
arg (str, optional):
Argument that's passed to the reader, overriding its defaultArg.
It means the following:
- A glob-like pattern for the KeysReader and KeysOnlyReader readers.
- A key name for the StreamReader reader.
- A Python generator for the PythonReader reader.
convertToStr (bool):
When `True`, adds a map operation to the flow's end that stringifies
records.
collect (bool):
When `True`, adds a collect operation to flow's end.
"""
def __init__(
self,
arg: Optional[str] = None,
convertToStr: bool = True,
collect: bool = True,
**kwargs,
) -> None:
"""Instantiate a Run action
Args:
arg (Optional[str], optional):
Optional argument that's passed to the reader, overriding its
defaultArg.
It means the following:
- A glob-like pattern for the KeysReader and KeysOnlyReader readers.
- A key name for the StreamReader reader.
- A Python generator for the PythonReader reader.
Defaults to ``None``.
convertToStr (bool, optional):
When `True`, adds a map operation to the flow's end that stringifies
records.
Defaults to ``True``.
collect (bool, optional):
When `True`, adds a collect operation to flow's end.
Defaults to ``True``.
"""
super().__init__(**kwargs)
self.arg = arg
self.convertToStr = convertToStr
self.collect = collect
def add_to(self, function: "OpenGearFunction") -> "ClosedGearFunction":
"""Closes a Gear function with the Run action.
Args:
function (OpenGearFunction):
The "open" Gear function to close with the run action.
Returns:
ClosedGearFunction:
A closed Gear batch function that is ready to run in RedisGears.
"""
import cloudpickle
return function.map(lambda x: cloudpickle.dumps(x, protocol=4)).run(
self.arg, False, self.collect, **self.kwargs
)
[docs]class Register(Operation):
"""Register action
The Register action registers a function as an event handler.
The function is executed each time an event arrives.
Each time it is executed, the function operates on the event's data and once done
it is suspended until its future invocations by new events.
The Register action can only be the last operation of any GearFunction, and it
effectivesly 'closes' it to further operations.
Attributes:
prefix (str):
Key prefix pattern to match on.
Not relevant for 'CommandReader' readers (see 'trigger').
convertToStr (bool):
When `True`, adds a map operation to the flow's end that stringifies
records.
collect (bool):
When `True`, adds a collect operation to flow's end.
mode (str):
The execution mode of the triggered function.
onRegistered (Callable):
A function callback that's called on each shard upon function registration.s
"""
def __init__(
self,
prefix: str = "*",
convertToStr: bool = True,
collect: bool = True,
mode: str = sugar.TriggerMode.Async,
onRegistered: Registrator = None,
**kwargs,
) -> None:
"""Instantiate a Register action
Args:
prefix (str, optional):
Key prefix pattern to match on.
Not relevant for 'CommandReader' readers (see 'trigger').
Defaults to '*'.
convertToStr (bool, optional):
When ``True`` adds a map operation to the flow's end that stringifies
records.
Defaults to ``True``.
collect (bool, optional):
When ``True`` adds a collect operation to flow's end.
Defaults to ``False``.
mode (str, optional):
The execution mode of the function.
Can be one of::
- ``"async"``:
Execution will be asynchronous across the entire cluster.
- ``"async_local"``:
Execution will be asynchronous and restricted to the handling shard.
- ``"sync"``:
Execution will be synchronous and local
Defaults to `redgrease.TriggerMode.Async` (``"async"``)
onRegistered (Registrator, optional):
A function callback that's called on each shard upon function
registration.
It is a good place to initialize non-serializeable objects such as
network connections.
Defaults to ``None``.
"""
super().__init__(**kwargs)
self.prefix = prefix
self.convertToStr = convertToStr
self.collect = collect
self.mode = mode
self.onRegistered = onRegistered
def add_to(self, function: "OpenGearFunction") -> "ClosedGearFunction":
"""Closes a Gear function with the Register action.
Args:
function (OpenGearFunction):
The "open" Gear function to close with the register action.
Returns:
ClosedGearFunction:
A closed "event-mode" Gear function that is ready to be registered on a
RedisGears system.
"""
import cloudpickle
return function.map(lambda x: cloudpickle.dumps(x, protocol=4)).register(
self.prefix,
False,
self.collect,
mode=self.mode,
onRegistered=self.onRegistered,
**self.kwargs,
)
################################################################################
# Operations #
################################################################################
[docs]class Map(Operation):
"""The local Map operation performs the one-to-one (1:1) mapping of records.
It requires one mapper function.
Attributes:
op (:data:`redgrease.typing.Mapper`):
The mapper function to map on all input records.
"""
def __init__(self, op: Mapper, **kwargs) -> None:
"""Instantiate a Map operation.
Args:
op (:data:`redgrease.typing.Mapper`):
Function to map on the input records.
The function must take one argument as input (input record) and
return something as an output (output record).
"""
super().__init__(**kwargs)
self.op = op
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.map(self.op, **self.kwargs)
[docs]class FlatMap(Operation):
"""The local FlatMap operation performs one-to-many (1:N) mapping of records.
It requires one expander function that maps a single input record to potentially
multiple output records.
FlatMap is nearly identical to the Map operation in purpose and use.
Unlike regular mapping, however, when FlatMap returns a sequence / iterator,
each element in the sequence is turned into a separate output record.
Attributes:
op (:data:`redgrease.typing.Expander`):
The mapper function to map on all input records.
"""
def __init__(self, op: Expander, **kwargs) -> None:
"""Instantiate a FlatMap operation.
Args:
op (:data:`redgrease.typing.Expander`):
Function to map on the input records.
The function must take one argument as input (input record) and
return an iterable as an output (output records).
"""
super().__init__(**kwargs)
self.op = op
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.flatmap(self.op, **self.kwargs)
[docs]class ForEach(Operation):
"""The local ForEach operation performs one-to-the-same (1=1) mapping.
It requires one processor function to perform some work that's related to the
input record.
Its output record is a copy of the input, which means anything the callback returns
is discarded.
Args:
op (:data:`redgrease.typing.Processor`):
Function to run on the input records.
"""
def __init__(self, op: Processor, **kwargs) -> None:
"""Instantiate a ForEach operation.
Args:
op (:data:`redgrease.typing.Processor`):
Function to run on each of the input records.
The function must take one argument as input (input record) and
should not return anything.
"""
super().__init__(**kwargs)
self.op = op
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.foreach(self.op, **self.kwargs)
[docs]class Filter(Operation):
"""The local Filter operation performs one-to-zero-or-one (1:(0|1)) filtering of
records.
It requires a filterer function.
An input record that yields a falsehood will be discarded and only truthful ones
will be output.
Args:
op (:data:`redgrease.typing.Filterer`):
Predicate function to run on the input records.
"""
def __init__(self, op: Filterer, **kwargs) -> None:
"""Instantiate a Filter operation.
Args:
op (:data:`redgrease.typing.Filterer`):
Function to apply on the input records, to decide which ones to keep.
The function must take one argument as input (input record) and
return a bool. The input records evaluated to ``True`` will be kept as
output records.
"""
super().__init__(**kwargs)
self.op = op
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.filter(self.op, **self.kwargs)
[docs]class Accumulate(Operation):
"""The local Accumulate operation performs many-to-one mapping (N:1) of records.
It requires one accumulator function.
Once input records are exhausted its output is a single record consisting of the
accumulator's value.
Args:
op (:data:`redgrease.typing.Accumulator`):
Accumulation function to run on the input records.
"""
def __init__(self, op: Accumulator, **kwargs) -> None:
"""Instantiate an Accumulate operation.
Args:
op (:data:`redgrease.typing.Accumulator`):
Function to to apply on the input records.
The function must take two arguments as input:
- the input record, and
- An accumulator value.
It should aggregate the input record into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
"""
super().__init__(**kwargs)
self.op = op
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.accumulate(self.op, **self.kwargs)
[docs]class LocalGroupBy(Operation):
"""The local LocalGroupBy operation performs many-to-less mapping (N:M) of records.
The operation requires two functions, an extractor and a reducer.
The output records consist of the grouping key and its respective reduce value.
Attributes:
extractor (:data:`redgrease.typing.Extractor`):
Function that extracts the key to group by from input records.
reducer (:data:`redgrease.typing.Reducer`):
Function that reduces the records in each group to an output record.
"""
def __init__(
self,
extractor: Extractor,
reducer: Reducer,
**kwargs,
) -> None:
"""Instantiate a LocalGroupBy operator.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
reducer (:data:`redgrease.typing.Reducer`):
Function to apply on the records of each group, to reduce to a single
value (per group).
The function must take (a) a key, (b) an input record and (c) a
variable that's called an accumulator.
It performs similarly to the accumulator callback, with the difference
being that it maintains an accumulator per reduced key / group.
"""
super().__init__(**kwargs)
self.extractor = extractor
self.reducer = reducer
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.localgroupby(self.extractor, self.reducer, **self.kwargs)
[docs]class Limit(Operation):
"""The local Limit operation limits the number of records.
It accepts two numeric arguments: a starting position in the input records "array"
and a maximal number of output records.
Attributes:
start (int):
Starting index (0-based) of the input record to start from
length (int):
The maximum number of records to let through.
"""
def __init__(self, length: int, start: int = 0, **kwargs) -> None:
"""Instantiate a Limit operation
Args:
length (int):
The maximum number of records.
start (int, optional):
The index of the first input record.
Defaults to 0.
"""
super().__init__(**kwargs)
self.length = length
self.start = start
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.limit(self.length, self.start, **self.kwargs)
[docs]class Collect(Operation):
"""The global Collect operation collects the result records from all of the
shards to the originating one.
"""
def __init__(self, **kwargs):
"""Instantiate a Collect operation."""
super().__init__(**kwargs)
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.collect(**self.kwargs)
[docs]class Repartition(Operation):
"""The global Repartition operation repartitions the records by them shuffling
between shards.
It accepts a single key extractor function.
The extracted key is used for computing the record's new placement in the cluster
(i.e. hash slot).
The operation then moves the record from its original shard to the new one.
Attributes:
extractor (redgrease.typing.Extractor):
A function deciding the destination shard of an input record.
"""
def __init__(self, extractor: Extractor, **kwargs) -> None:
"""Instantiate a Repartition operation
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function that takes a record and calculates a key that is used to
determine the hash slot, and consequently the shard, that the record
should migrate to to.
The function must take one argument as input (input record) and
return a string (key).
The hash slot, and consequently the destination shard, is determined by
hthe value of the key.
"""
super().__init__(**kwargs)
self.extractor = extractor
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.repartition(self.extractor, **self.kwargs)
[docs]class Aggregate(Operation):
"""The Aggregate operation performs many-to-one mapping (N:1) of records.
Aggregate provides an alternative to the local accumulate operation as it takes
the partitioning of data into consideration.
Furthermore, because records are aggregated locally before collection,
its performance is usually superior.
It requires a zero value and two accumulator functions for computing the local
and global aggregates.
The operation is made of these steps::
1. The local accumulator is executed locally and initialized with the zero value.
2. A global collect moves all records to the originating engine.
3. The global accumulator is executed locally by the originating engine.
Its output is a single record consisting of the accumulator's global value.
Attributes:
zero (Any):
The initial / zero value for the accumulator variable.
seqOp (:data:`redgrease.typing.Accumulator`):
A local accumulator function, applied locally on each shard.
combOp (:data:`redgrease.typing.Accumulator`):
A global accumulator function, applied on the results of the local
accumulations.
"""
def __init__(
self,
zero: Any,
seqOp: Accumulator,
combOp: Accumulator,
**kwargs,
) -> None:
"""Instantiates an Aggregate operation.
Args:
zero (Any):
The initial / zero value of the accumulator variable.
seqOp (:data:`redgrease.typing.Accumulator`):
A function to be applied on each of the input records, locally per
shard.
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
combOp (:data:`redgrease.typing.Accumulator`):
A function to be applied on each of the aggregated results of the local
aggregation (i.e. the output of `seqOp`).
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
"""
super().__init__(**kwargs)
self.zero = zero
self.seqOp = seqOp
self.combOp = combOp
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.aggregate(self.zero, self.seqOp, self.combOp, **self.kwargs)
[docs]class AggregateBy(Operation):
"""AggregateBy operation performs many-to-less mapping (N:M) of records.
It is similar to the Aggregate operation but aggregates per key.
It requires a an extractor callback, a zero value and two reducers callbacks for
computing the local and global aggregates.
The operation is made of these steps::
1. Extraction of the groups using extractor.
2. The local reducer is executed locally and initialized with the zero value.
3. A global repartition operation that uses the extractor.
4. The global reducer is executed on each shard once it is repartitioned with its
relevant keys.
Output list of records, one for each key. The output records consist of the
grouping key and its respective reducer's value.
Attributes:
extractor (:data:`redgrease.typing.Extractor`):
Function that extracts the key to group by from input records.
zero (Any):
The initial / zero value for the accumulator variable.
seqOp (:data:`redgrease.typing.Accumulator`):
A local accumulator function, applied locally on each shard.
combOp (:data:`redgrease.typing.Accumulator`):
A global accumulator function, applied on the results of the local
accumulations.
"""
def __init__(
self,
extractor: Extractor,
zero: Any,
seqOp: Reducer,
combOp: Reducer,
**kwargs,
) -> None:
"""Instantiate an AggregateBy operation.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
zero (Any):
The initial / zero value of the accumulator variable.
seqOp (:data:`redgrease.typing.Accumulator`):
A function to be applied on each of the input records, locally per
shard and group.
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
combOp (:data:`redgrease.typing.Accumulator`):
A function to be applied on each of the aggregated results of the local
aggregation (i.e. the output of `seqOp`).
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
"""
super().__init__(**kwargs)
self.extractor = extractor
self.zero = zero
self.seqOp = seqOp
self.combOp = combOp
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.aggregateby(
self.extractor, self.zero, self.seqOp, self.combOp, **self.kwargs
)
[docs]class GroupBy(Operation):
"""GroupBy * operation performs a many-to-less (N:M) grouping of records.
It is similar to AggregateBy but uses only a global reducer.
It can be used in cases where locally reducing the data isn't possible.
The operation requires two functions; an extractor a reducer.
The operation is made of these steps::
1. A global repartition operation that uses the extractor.
2. The reducer is locally invoked.
Output is a locally-reduced list of records, one for each key.
The output records consist of the grouping key and its respective accumulator's
value.
Attributes:
extractor (:data:`redgrease.typing.Extractor`):
Function that extracts the key to group by from input records.
reducer (:data:`redgrease.typing.Reducer`):
Function that reduces the records of each group to a value
"""
def __init__(
self,
extractor: Extractor,
reducer: Reducer,
**kwargs,
) -> None:
"""Instantiate a GroupBy operation.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
reducer (:data:`redgrease.typing.Reducer`):
Function to apply on the records of each group, to reduce to a single
value (per group).
The function must take (a) a key, (b) an input record and (c) a
variable that's called an accumulator.
It performs similarly to the accumulator callback, with the difference
being that it maintains an accumulator per reduced key / group.
"""
super().__init__(**kwargs)
self.extractor = extractor
self.reducer = reducer
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.groupby(self.extractor, self.reducer, **self.kwargs)
[docs]class BatchGroupBy(Operation):
"""BatchGroupBy operation performs a many-to-less (N:M) grouping of records.
Prefer the GroupBy Operation
----------------------------
Instead of using BatchGroupBy, prefer using the GroupBy operation as it is more
efficient and performant. Only use BatchGroupBy when the reducer's logic
requires the full list of records for each input key.
The operation requires two functions; an extractor a batch reducer.
The operation is made of these steps::
1. A global repartition operation that uses the extractor
2. A local localgroupby operation that uses the batch reducer
Once finished, the operation locally outputs a record for each key and its
respective accumulator value.
Increased memory consumption
----------------------------
Using this operation may cause a substantial increase in memory usage during
runtime.
Attributes:
extractor (:data:`redgrease.typing.Extractor`):
Function that extracts the key to group by from input records.
reducer (:data:`redgrease.typing.Reducer`):
Function that reduces the records of each group to a value
"""
def __init__(
self,
extractor: Extractor,
reducer: BatchReducer,
**kwargs,
) -> None:
"""Instantiate a BatchGroupBy operation.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
reducer (:data:`redgrease.typing.Reducer`):
Function to apply on the records of each group, to reduce to a single
value (per group).
The function must take (a) a key, (b) an input record and (c) a
variable that's called an accumulator.
It performs similarly to the accumulator callback, with the difference
being that it maintains an accumulator per reduced key / group.
"""
super().__init__(**kwargs)
self.extractor = extractor
self.reducer = reducer
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.batchgroupby(self.extractor, self.reducer, **self.kwargs)
[docs]class Sort(Operation):
"""Sort operation sorts the records.
It allows to control the sort order.
The operation is made of the following steps::
1. A global aggregate operation collects and combines all records.
2. A local sort is performed on the list.
3. The list is flatmapped to records.
Increased memory consumption
----------------------------
Using this operation may cause an increase in memory usage during runtime due
to the list being copied during the sorting operation.
Attributes:
reverse (bool):
Defines if the sorting order is descending (``True``) or ascending
(``False``).
"""
def __init__(self, reverse: bool = True, **kwargs) -> None:
"""Instantiate a Sort operation.
Args:
reverse (bool, optional):
Sort in descending order (higher to lower).
Defaults to ``True``.
"""
super().__init__(**kwargs)
self.reverse = reverse
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.sort(self.reverse, **self.kwargs)
[docs]class Distinct(Operation):
"""The Distinct operation returns distinct records.
It requires no arguments.
The operation is made of the following steps::
1. A aggregate operation locally reduces the records to sets that are then
collected and unionized globally.
2. A local flatmap operation turns the set into records.
"""
def __init__(self, **kwargs):
"""Instantiate a Distinct operation."""
super().__init__(**kwargs)
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.distinct(**self.kwargs)
[docs]class Count(Operation):
"""The Count operation counts the number of input records.
It requires no arguments.
The operation is made of an aggregate operation that uses local counting and
global summing accumulators.
"""
def __init__(self, **kwargs):
"""Instantiate a Count operation."""
super().__init__(**kwargs)
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.count(**self.kwargs)
[docs]class CountBy(Operation):
"""The CountBy operation counts the records grouped by key.
It requires a single extractor function.
The operation is made of an aggregateby operation that uses local counting and
global summing accumulators.
Attributes:
extractor (:data:`redgrease.typing.Extractor`):
Function that extracts the key to group by from input records.
"""
def __init__(self, extractor: Extractor, **kwargs) -> None:
"""Instantiate a CountBy operation.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
"""
super().__init__(**kwargs)
self.extractor = extractor
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.countby(self.extractor, **self.kwargs)
[docs]class Avg(Operation):
"""The Avg operation returns the arithmetic average of records.
It has an optional value extractor function.
The operation is made of the following steps::
1. A aggregate operation locally reduces the records to tuples of sum and count
that are globally combined.
2. A local map operation calculates the average from the global tuple.
Attributes:
extractor (:data:`redgrease.typing.Extractor`):
Function that extracts the key to group by from input records.
"""
def __init__(self, extractor: Extractor, **kwargs) -> None:
"""Instantiate an Avg operation.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
"""
super().__init__(**kwargs)
self.extractor = extractor
def add_to(self, function: "OpenGearFunction") -> "OpenGearFunction":
"""Adds the operation to an "open" Gear function.
Args:
function (OpenGearFunction):
The "open" gear function to add the operation to.
Returns:
OpenGearFunction:
The function with the operation added to the end.
"""
return function.avg(self.extractor, **self.kwargs)
################################################################################
# GearFunctions #
################################################################################
class GearFunction(Generic[T]):
"""Abstract base class for both "open" and closed Gear functions.
The base `GearFunction` class is not intended to be instantiated by API users.
A GearFunction is a chain of consecutive Operations.
Attributes:
operation (Operation):
The last operation in the functions chain of operations.
input_function (OpenGearFunction):
The function (chain of operations) that provides the input records to the
`operation`. Two different GearFunctions can share the same `input_function`
requirements (Iterable[str], optional):
A set of requirements / dependencies (Python packages) that the operation
requires in order to execute.
"""
def __init__(
self,
operation: Operation,
input_function: "OpenGearFunction" = None,
requirements: Optional[Iterable[str]] = None,
) -> None:
"""Instantiate a GearFunction
Args:
operation (Operation):
The last operation in the functions chain of operations.
input_function (OpenGearFunction, optional):
The function (chain of operations) that provides the input records to
the `operation`.
Defaults to ``None``.
requirements (Optional[Iterable[str]], optional):
A set of requirements / dependencies (Python packages) that the
operation requires in order to execute.
Defaults to ``None``.
"""
self.operation = operation
self.input_function = input_function
self.requirements = set(requirements if requirements else [])
if input_function:
self.requirements = self.requirements.union(input_function.requirements)
@property
def reader(self) -> Optional[str]:
"""The reader type, generating the initial input records to the GearFunction.
Returns:
str:
Either ``"KeysReader"``, ``"KeysOnlyReader"``, ``"StreamReader"``,
``"PythonReader"``, ``"ShardsIDReader"``, ``"CommandReader"`` or
``None`` (If no reader is defined).
"""
if isinstance(self.operation, Reader):
return self.operation.reader
if self.input_function:
return self.input_function.reader
return None
@property
def supports_batch_mode(self) -> bool:
"""Indicates if the function can run in Batch-mode, by closing it with a
`run` action.
Returns:
bool:
``True`` if the function supports batch mode, ``False`` if not.
"""
return self.reader in [
sugar.ReaderType.KeysReader,
sugar.ReaderType.KeysOnlyReader,
sugar.ReaderType.StreamReader,
sugar.ReaderType.PythonReader,
sugar.ReaderType.ShardsIDReader,
]
@property
def supports_event_mode(self) -> bool:
"""Indicates if the function can run in Event-mode, by closing it with a
`register` action.
Returns:
bool:
``True`` if the function supports event mode, ``False`` if not.
"""
return self.reader in [
sugar.ReaderType.KeysReader,
sugar.ReaderType.StreamReader,
sugar.ReaderType.CommandReader,
]
[docs]class ClosedGearFunction(GearFunction[T]):
"""Closed Gear functions are GearsFunctions that have been "closed" with a
:ref:`op_action_run` action or a :ref:`op_action_register` action.
Closed Gear functions cannot add more :ref:`operations`, but can be executed in
RedisGears.
"""
def __init__(
self,
operation: Operation,
input_function: "OpenGearFunction" = None,
requirements: Optional[Iterable[str]] = None,
) -> None:
""" """
super().__init__(
operation, input_function=input_function, requirements=requirements
)
[docs] def on(
self,
gears_server,
unblocking: bool = False,
requirements: Iterable[str] = None,
replace: bool = None,
**kwargs,
):
"""Execute the function on a RedisGears.
This is equivalent to passing the function to `Gears.pyexecute`
Args:
gears_server ([type]):
Redis client / connection object.
unblocking (bool, optional):
Execute function unblocking, i.e. asynchronous.
Defaults to ``False``.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
Returns:
redgrease.data.ExecutionResult:
The result of the function, just as `Gears.pyexecute`
"""
if hasattr(gears_server, "gears"):
gears_server = gears_server.gears
if not hasattr(gears_server, "pyexecute"):
from redgrease.client import Gears
gears_server = Gears(gears_server)
try:
return gears_server.pyexecute(
self, unblocking=unblocking, requirements=requirements, **kwargs
)
except Exception as ex:
# TODO: This is ugly. just to keep 'redis' from being imported to "gears"
if ex.__class__.__name__ != "DuplicateTriggerError":
raise
# If we get an error because the trigger already is registered,
# then we check the 'replace' argument for what to do:
# - `replace is ``None``` : Re-raise the error
# - `replace is ``False``` : Ignore the error
# - `replace is ``True``` : Unregister the previous and re-register the new
if replace is None or "trigger" not in self.operation.kwargs:
raise
if replace is False:
return gears_server._trigger_proxy(self.operation.kwargs["trigger"])
# Find and replace the registered trigger.
trigger = self.operation.kwargs["trigger"]
regs = gears_server.dumpregistrations(trigger=trigger)
if len(regs) != 1:
raise
gears_server.unregister(regs[0].id)
# Try registering again
return gears_server.pyexecute(
self, unblocking=unblocking, requirements=requirements, **kwargs
)
[docs]class OpenGearFunction(GearFunction[InputRecord]):
"""An open Gear function is a Gear function that is not yet "closed" with a
:ref:`op_action_run` action or a :ref:`op_action_register` action.
Open Gear functions can be used to create new "open" gear functions by
applying :ref:`operations`, or it can create a closed Gear function by applying
either the :ref:`op_action_run` action or a :ref:`op_action_register` action.
"""
def __init__(
self,
operation: Operation,
input_function: "OpenGearFunction" = None,
requirements: Optional[Iterable[str]] = None,
) -> None:
""" """
super().__init__(
operation, input_function=input_function, requirements=requirements
)
[docs] def run(
self,
arg: str = None, # TODO: This can also be a Python generator
convertToStr: bool = True,
collect: bool = True,
# Helpers, all must be None
# Other Redgrease args
requirements: Iterable[str] = None,
on=None,
# Other Redis Gears args
**kwargs,
# TODO: Add all the Reader specific args here
) -> ClosedGearFunction[InputRecord]:
"""Create a "closed" function to be :ref:`op_action_run` as in "batch-mode".
Batch functions are executed once and exits once the data is
exhausted by its reader.
Args:
arg (str, optional):
An optional argument that's passed to the reader as its defaultArg.
It means the following:
- A glob-like pattern for the KeysReader and KeysOnlyReader readers.
- A key name for the StreamReader reader.
- A Python generator for the PythonReader reader.
Defaults to ``None``.
convertToStr (bool, optional):
When ``True``, adds a map operation to the flow's end that stringifies
records.
Defaults to ``False``.
collect (bool, optional):
When ``True`` adds a collect operation to flow's end.
Defaults to ``False``.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
on (redis.Redis):
Immediately execute the function on this RedisGears system.
**kwargs:
Additional parameters to the run operation.
Returns:
Union[ClosedGearFunction, redgrease.data.ExecutionResult]:
A new closed batch function, if `on` is **not** specified.
An execution result, if `on` **is** specified.
Raises:
TypeError:
If the function does not support batch mode.
"""
if not self.supports_batch_mode:
raise TypeError(f"Batch mode (run) is not supported for '{self.reader}'")
gear_fun: ClosedGearFunction = ClosedGearFunction[InputRecord](
Run(arg=arg, convertToStr=convertToStr, collect=collect, **kwargs),
input_function=self,
requirements=requirements,
)
if redgrease.GEARS_RUNTIME:
return redgrease.runtime.run(gear_fun, redgrease.GearsBuilder)
if on:
return gear_fun.on(on)
return gear_fun
[docs] def register( # noqa: C901
self,
prefix: str = "*",
convertToStr: bool = True,
collect: bool = True,
# Helpers, all must be None
mode: str = None,
onRegistered: Registrator = None,
eventTypes: Iterable[str] = None,
keyTypes: Iterable[str] = None,
readValue: bool = None,
batch: int = None,
duration: int = None,
onFailedPolicy: str = None,
onFailedRetryInterval: int = None,
trimStream: bool = None,
trigger: str = None, # Reader Specific: CommandReader
# Other Redgrease args
requirements: Iterable[str] = None,
on=None,
# Other Redis Gears args
**kwargs,
# TODO: Add all the Reader specific args here
) -> ClosedGearFunction[InputRecord]:
"""Create a "closed" function to be :ref:`op_action_register` 'ed as an
event-triggered function.
Event functions are executed each time an event arrives.
Each time it is executed, the function operates on the event's
data and once done is suspended until its future invocations by
new events.
Args:
prefix (str, optional):
Key prefix pattern to match on.
Not relevant for 'CommandReader' readers (see 'trigger').
Defaults to ``"*"``.
convertToStr (bool, optional):
When ``True`` adds a map operation to the flow's end that stringifies
records.
Defaults to ``True``.
collect (bool, optional):
When ``True`` adds a collect operation to flow's end.
Defaults to ``False``.
mode (str, optional):
The execution mode of the function.
Can be one of:
- ``"async"``:
Execution will be asynchronous across the entire cluster.
- ``"async_local"``:
Execution will be asynchronous and restricted to the handling shard.
- ``"sync"``:
Execution will be synchronous and local.
Defaults to ``"async"``.
onRegistered (Registrator, optional):
A function that's called on each shard upon function registration.
It is a good place to initialize non-serializeable objects such as
network connections.
Defaults to ``None``.
eventTypes (Iterable[str], optional):
For KeysReader only.
A whitelist of event types that trigger execution when the KeysReader
are used. The list may contain one or more:
- Any Redis or module command
- Any Redis event
Defaults to ``None``.
keyTypes (Iterable[str], optional):
For KeysReader and KeysOnlyReader only.
A whitelist of key types that trigger execution when using the
KeysReader or KeysOnlyReader readers.
The list may contain one or more from the following:
- Redis core types:
``"string"``, ``"hash"``, ``"list"``, ``"set"``, ``"zset"`` or
``"stream"``
- Redis module types:
``"module"``
Defaults to ``None``.
readValue (bool, optional):
For KeysReader only.
When ``False`` the value will not be read, so the 'type' and 'value'
of the record will be set to ``None``.
Defaults to ``True``.
batch (int, optional):
For StreamReader only.
The number of new messages that trigger execution.
Defaults to 1.
duration (int, optional):
For StreamReader only.
The time to wait before execution is triggered, regardless of the batch
size (0 for no duration).
Defaults to 0.
onFailedPolicy (str, optional):
For StreamReader only.
The policy for handling execution failures.
May be one of:
- ``"continue"``:
Ignores a failure and continues to the next execution.
This is the default policy.
- ``"abort"``:
Stops further executions.
- ``"retry"``:
Retries the execution after an interval specified with
onFailedRetryInterval (default is one second).
Defaults to ``"continue"``.
onFailedRetryInterval (int, optional):
For StreamReader only.
The interval (in milliseconds) in which to retry in case onFailedPolicy
is 'retry'.
Defaults to 1.
trimStream (bool, optional):
For StreamReader only.
When ``True`` the stream will be trimmed after execution
Defaults to ``True``.
trigger (str):
For 'CommandReader' only, and mandatory.
The trigger string that will trigger the function.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
on (redis.Redis):
Immediately execute the function on this RedisGears system.
**kwargs:
Additional parameters to the register operation.
Returns:
Union[ClosedGearFunction, redgrease.data.ExecutionResult]:
A new closed event function, if `on` is **not** specified.
An execution result, if `on` **is** specified.
Raises:
TypeError:
If the function does not support event mode.
"""
if mode is not None:
kwargs["mode"] = mode
if onRegistered is not None:
kwargs["onRegistered"] = onRegistered
if not self.supports_event_mode:
raise TypeError(f"Event mode (run) is not supported for '{self.reader}'")
if eventTypes is not None:
kwargs["eventTypes"] = list(eventTypes)
if keyTypes is not None:
kwargs["keyTypes"] = list(keyTypes)
if readValue is not None:
kwargs["readValue"] = readValue
if batch is not None:
kwargs["batch"] = batch
if duration is not None:
kwargs["duration"] = duration
if onFailedPolicy is not None:
kwargs["onFailedPolicy"] = onFailedPolicy
if onFailedRetryInterval is not None:
kwargs["onFailedRetryInterval"] = onFailedRetryInterval
if trimStream is not None:
kwargs["trimStream"] = trimStream
if trigger is not None:
kwargs["trigger"] = trigger
replace = kwargs.pop("replace", None)
gear_fun = ClosedGearFunction[InputRecord](
Register(
prefix=prefix,
convertToStr=convertToStr,
collect=collect,
**kwargs,
),
input_function=self,
requirements=requirements,
)
if redgrease.GEARS_RUNTIME:
return redgrease.runtime.run(gear_fun, redgrease.GearsBuilder)
if on:
return gear_fun.on(on, replace=replace)
return gear_fun
[docs] def map(
self,
op: Mapper[
InputRecord,
OutputRecord,
],
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[OutputRecord]":
"""Instance-local :ref:`op_map` operation that performs a one-to-one (1:1) mapping of
records.
Args:
op (:data:`redgrease.typing.Mapper`):
Function to map on the input records.
The function must take one argument as input (input record) and
return something as an output (output record).
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_map` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_map` operation as last step.
"""
op = redgrease.utils.passfun(op)
return OpenGearFunction(
Map(op=op, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def flatmap(
self,
op: Expander[InputRecord, OutputRecord] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[Iterable[OutputRecord]]":
"""Instance-local :ref:`op_flatmap` operation that performs one-to-many (1:N) mapping
of records.
Args:
op (:data:`redgrease.typing.Expander`, optional):
Function to map on the input records.
The function must take one argument as input (input record) and
return an iterable as an output (output records).
Defaults to the 'identity-function', I.e. if input is an iterable will
be expanded.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_flatmap` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_flatmap` operation as last
step.
"""
op = redgrease.utils.passfun(op)
return OpenGearFunction(
FlatMap(op=op, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def foreach(
self,
op: Processor[InputRecord],
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[InputRecord]":
"""Instance-local :ref:`op_foreach` operation performs one-to-the-same (1=1) mapping.
Args:
op (:data:`redgrease.typing.Processor`):
Function to run on each of the input records.
The function must take one argument as input (input record) and
should not return anything.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_foreach` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_foreach` operation as last
step.
"""
op = redgrease.utils.passfun(op)
return OpenGearFunction(
ForEach(op=op, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def filter(
self,
op: Filterer[InputRecord] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[InputRecord]":
"""Instance-local :ref:`op_filter` operation performs one-to-zero-or-one (1:bool)
filtering of records.
Args:
op (:data:`redgrease.typing.Filterer`, optional):
Function to apply on the input records, to decide which ones to keep.
The function must take one argument as input (input record) and
return a bool. The input records evaluated to ``True`` will be kept as
output records.
Defaults to the 'identity-function', i.e. records are filtered based on
their own trueness or falseness.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_filter` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_filter` operation as last
step.
"""
op = redgrease.utils.passfun(op)
return OpenGearFunction(
Filter(op=op, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def accumulate(
self,
op: Accumulator[T, InputRecord] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[T]":
"""Instance-local :ref:`op_accumulate` operation performs many-to-one mapping (N:1) of
records.
Args:
op (:data:`redgrease.typing.Accumulator`, optional):
Function to to apply on the input records.
The function must take two arguments as input:
- An accumulator value, and
- The input record.
It should aggregate the input record into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
Defaults to a list accumulator, I.e. the output will be a list of
all inputs.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_accumulate` operation.
Returns:
OpenGearFunction:
A new "open" gear function with :ref:`op_accumulate` operation as last
step.
"""
op = redgrease.utils.passfun(op, default=_default_accumulator)
return OpenGearFunction(
Accumulate(op=op, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def localgroupby(
self,
extractor: Extractor[InputRecord, Key] = None,
reducer: Reducer[Key, T, InputRecord] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[Dict[Key, T]]":
"""Instance-local :ref:`op_localgroupby` operation performs many-to-less mapping (N:M)
of records.
Args:
extractor (:data:`redgrease.typing.Extractor`, optional):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
Defaults to the hash of the input.
reducer (:data:`redgrease.typing.Reducer`, optional):
Function to apply on the records of each group, to reduce to a single
value (per group).
The function must take (a) a key, (b) an input record and (c) a
variable that's called an accumulator.
It performs similarly to the accumulator callback, with the difference
being that it maintains an accumulator per reduced key / group.
Defaults to a list accumulator, I.e. the output will be a list of
all inputs, for each group.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_localgroupby` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_localgroupby` operation as
last step.
"""
extractor = redgrease.utils.passfun(extractor, default=_default_extractor)
reducer = redgrease.utils.passfun(reducer, default=_default_reducer)
return OpenGearFunction(
LocalGroupBy(extractor=extractor, reducer=reducer, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def limit(
self,
length: int,
start: int = 0,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[InputRecord]":
"""Instance-local :ref:`op_limit` operation limits the number of records.
Args:
length (int):
The maximum number of records.
start (int, optional):
The index of the first input record.
Defaults to 0.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_limit` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_limit` operation as last
step.
"""
return OpenGearFunction(
Limit(length=length, start=start, **kwargs),
input_function=self,
)
[docs] def collect(self, **kwargs) -> "OpenGearFunction[InputRecord]":
"""Cluster-global :ref:`op_collect` operation collects the result records.
Args:
**kwargs:
Additional parameters to the :ref:`op_collect` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_collect` operation as last
step.
"""
return OpenGearFunction(
Collect(**kwargs),
input_function=self,
)
[docs] def repartition(
self,
extractor: Extractor[InputRecord, Hashable],
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[InputRecord]":
"""Cluster-global :ref:`op_repartition` operation repartitions the records by
shuffling
them between shards.
Args:
extractor (:data:`Extractor`):
Function that takes a record and calculates a key that is used to
determine the hash slot, and consequently the shard, that the record
should migrate to to.
The function must take one argument as input (input record) and
return a string (key).
The hash slot, and consequently the destination shard, is determined by
the value of the key.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_repartition` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_repartition` operation as
last step.
"""
return OpenGearFunction(
Repartition(extractor=extractor, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def aggregate(
self,
zero: T = None,
seqOp: Accumulator[T, InputRecord] = None,
combOp: Accumulator[T, T] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[T]":
"""Distributed :ref:`op_aggregate` operation perform an aggregation on local
data then a global aggregation on the local aggregations.
Args:
zero (Any, optional):
The initial / zero value of the accumulator variable.
Defaults to an empty list.
seqOp (:data:`redgrease.typing.Accumulator`, optional):
A function to be applied on each of the input records, locally per
shard.
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
Defaults to addition, if 'zero' is a number and to a list accumulator
if 'zero' is a list.
combOp (:data:`redgrease.typing.Accumulator`, optional):
A function to be applied on each of the aggregated results of the local
aggregation (i.e. the output of `seqOp`).
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
Defaults to re-use the `seqOp` function.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the ref:`op_aggregate` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a ref:`op_aggregate` operation as last
step.
"""
_zero = zero if zero is not None else []
if not seqOp:
if isinstance(_zero, numbers.Number):
seqOp = operator.add
elif isinstance(_zero, list):
seqOp = _default_accumulator
combOp = combOp or operator.add
else:
raise ValueError(
"No operatod provided, and unable to deduce a reasonable default."
)
seqOp = redgrease.utils.passfun(seqOp)
combOp = redgrease.utils.passfun(combOp, default=seqOp)
return OpenGearFunction(
Aggregate(zero=_zero, seqOp=seqOp, combOp=combOp, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def aggregateby(
self,
extractor: Extractor[InputRecord, Key] = None,
zero: T = None,
seqOp: Reducer[Key, T, InputRecord] = None,
combOp: Reducer[Key, T, T] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[Dict[Key, T]]":
"""Distributed :ref:`op_aggregateby` operation, behaves like aggregate, but
separated on each key, extracted using the extractor.
Args:
extractor (:data:`redgrease.typing.Extractor`, optional):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
Defaults to the hash of the input.
zero (Any, optional):
The initial / zero value of the accumulator variable.
Defaults to an empty list.
seqOp (:data:`redgrease.typing.Accumulator`, optional):
A function to be applied on each of the input records, locally per
shard and group.
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
Defaults to a list reducer.
combOp (:data:`redgrease.typing.Accumulator`):
A function to be applied on each of the aggregated results of the local
aggregation (i.e. the output of `seqOp`).
It must take two parameters:
- an accumulator value, from previous calls
- an input record
The function aggregates the input into the accumulator variable,
which stores the state between the function's invocations.
The function must return the accumulator's updated value.
Defaults to re-use the `seqOp` function.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_aggregateby` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_aggregateby` operation as
last step.
"""
_zero = zero if zero is not None else []
extractor = redgrease.utils.passfun(extractor, default=_default_extractor)
seqOp = redgrease.utils.passfun(seqOp, _default_reducer)
combOp = redgrease.utils.passfun(combOp, seqOp)
return OpenGearFunction(
AggregateBy(
extractor=extractor, zero=_zero, seqOp=seqOp, combOp=combOp, **kwargs
),
input_function=self,
requirements=requirements,
)
[docs] def groupby(
self,
extractor: Extractor[InputRecord, Key] = None,
reducer: Reducer[Key, T, InputRecord] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[Dict[Key, T]]":
"""Cluster-local :ref:`op_groupby` operation performing a many-to-less (N:M)
grouping of records.
Args:
extractor (:data:`redgrease.typing.Extractor`, optional):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
Defaults to the hash of the input.
reducer (:data:`redgrease.typing.Reducer`, optional):
Function to apply on the records of each group, to reduce to a single
value (per group).
The function must take (a) a key, (b) an input record and (c) a
variable that's called an accumulator.
It performs similarly to the accumulator callback, with the difference
being that it maintains an accumulator per reduced key / group.
Defaults to a list reducer.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_groupby` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_groupby` operation as last
step.
"""
extractor = redgrease.utils.passfun(extractor, default=_default_extractor)
reducer = redgrease.utils.passfun(reducer, default=_default_reducer)
return OpenGearFunction(
GroupBy(extractor=extractor, reducer=reducer, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def batchgroupby(
self,
extractor: Extractor[InputRecord, Key] = None,
reducer: BatchReducer[Key, T, InputRecord] = None,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[Dict[Key, T]]":
"""Cluster-local :ref:`op_groupby` operation, performing a many-to-less (N:M)
grouping of records.
Note: Using this operation may cause a substantial increase in memory usage
during runtime. Consider using the GroupBy
Args:
extractor (:data:`redgrease.typing.Extractor`, optional):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
Defaults to the hash of the input.
reducer (:data:`redgrease.typing.Reducer`):
Function to apply on the records of each group, to reduce to a single
value (per group).
The function must take (a) a key, (b) an input record and (c) a
variable that's called an accumulator.
It performs similarly to the accumulator callback, with the difference
being that it maintains an accumulator per reduced key / group.
Default is the length (`len`) of the input.
**kwargs:
Additional parameters to the :ref:`op_groupby` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_groupby` operation as last
step.
"""
extractor = redgrease.utils.passfun(extractor, default=_default_extractor)
reducer = redgrease.utils.passfun(reducer, default=_default_batch_reducer)
return OpenGearFunction(
BatchGroupBy(extractor=extractor, reducer=reducer, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def sort(
self,
reverse: bool = True,
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[InputRecord]":
""":ref:`op_sort` the records
Args:
reverse (bool, optional):
Sort in descending order (higher to lower).
Defaults to ``True``.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_sort` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_sort` operation as last
step.
"""
return OpenGearFunction(
Sort(reverse=reverse, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def distinct(self, **kwargs) -> "OpenGearFunction[InputRecord]":
"""Keep only the :ref:`op_distinct` values in the data.
Args:
**kwargs:
Additional parameters to the :ref:`op_distinct` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_distinct` operation as last
step.
"""
return OpenGearFunction(
Distinct(**kwargs),
input_function=self,
)
[docs] def count(self, **kwargs) -> "OpenGearFunction[int]":
""":ref:`op_count` the number of records in the execution.
Args:
**kwargs:
Additional parameters to the :ref:`op_count` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_count` operation as last
step.
"""
return OpenGearFunction(
Count(**kwargs),
input_function=self,
)
[docs] def countby(
self,
extractor: Extractor[InputRecord, Hashable] = lambda x: str(x),
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[Dict[Hashable, int]]":
"""Distributed :ref:`op_countby` operation counting the records grouped by key.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
Defaults to ``lambda x: str(x)``.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_countby` operation.
Returns:
OpenGearFunction:
A new "open" gear function with a :ref:`op_countby` operation as last
step.
"""
return OpenGearFunction(
CountBy(extractor=extractor, **kwargs),
input_function=self,
requirements=requirements,
)
[docs] def avg(
self,
extractor: Extractor[InputRecord, float] = lambda x: float(
x if isinstance(x, (int, float, str)) else str(x)
),
# Other Redgrease args
requirements: Iterable[str] = None,
# Other Redis Gears args
**kwargs,
) -> "OpenGearFunction[float]":
"""Distributed :ref:`op_avg` operation, calculating arithmetic average
of the records.
Args:
extractor (:data:`redgrease.typing.Extractor`):
Function to apply on the input records, to extact the grouping key.
The function must take one argument as input (input record) and
return a string (key).
The groups are defined by the value of the key.
Defaults to ``lambda x: float(x)``.
requirements (Iterable[str], optional):
Additional requirements / dependency Python packages.
Defaults to ``None``.
**kwargs:
Additional parameters to the :ref:`op_avg` operation.
Returns:
OpenGearFunction:
A new "open" gear function with an :ref:`op_avg` operation as last
step.
"""
return OpenGearFunction(
Avg(extractor=extractor, **kwargs),
input_function=self,
requirements=requirements,
)