Operations¶
This section goes through the various operations available to Open GearFunction in more detail.
Map¶
- class redgrease.gears.Map(op: Callable[[redgrease.typing.InputRecord], redgrease.typing.OutputRecord], **kwargs)[source]¶
The local Map operation performs the one-to-one (1:1) mapping of records.
It requires one mapper function.
- op¶
The mapper function to map on all input records.
Instantiate a Map operation.
- Parameters
op (
redgrease.typing.Mapper
) – Function to map on the input records. The function must take one argument as input (input record) and return something as an output (output record).
FlatMap¶
- class redgrease.gears.FlatMap(op: Callable[[redgrease.typing.InputRecord], Iterable[redgrease.typing.OutputRecord]], **kwargs)[source]¶
The local FlatMap operation performs one-to-many (1:N) mapping of records.
It requires one expander function that maps a single input record to potentially multiple output records.
FlatMap is nearly identical to the Map operation in purpose and use. Unlike regular mapping, however, when FlatMap returns a sequence / iterator, each element in the sequence is turned into a separate output record.
- op¶
The mapper function to map on all input records.
Instantiate a FlatMap operation.
- Parameters
op (
redgrease.typing.Expander
) – Function to map on the input records. The function must take one argument as input (input record) and return an iterable as an output (output records).
ForEach¶
- class redgrease.gears.ForEach(op: Callable[[redgrease.typing.InputRecord], None], **kwargs)[source]¶
The local ForEach operation performs one-to-the-same (1=1) mapping.
It requires one processor function to perform some work that’s related to the input record.
Its output record is a copy of the input, which means anything the callback returns is discarded.
- Parameters
op (
redgrease.typing.Processor
) – Function to run on the input records.
Instantiate a ForEach operation.
- Parameters
op (
redgrease.typing.Processor
) – Function to run on each of the input records. The function must take one argument as input (input record) and should not return anything.
Filter¶
- class redgrease.gears.Filter(op: Callable[[redgrease.typing.InputRecord], bool], **kwargs)[source]¶
The local Filter operation performs one-to-zero-or-one (1:(0|1)) filtering of records.
It requires a filterer function.
An input record that yields a falsehood will be discarded and only truthful ones will be output.
- Parameters
op (
redgrease.typing.Filterer
) – Predicate function to run on the input records.
Instantiate a Filter operation.
- Parameters
op (
redgrease.typing.Filterer
) – Function to apply on the input records, to decide which ones to keep. The function must take one argument as input (input record) and return a bool. The input records evaluated toTrue
will be kept as output records.
Accumulate¶
- class redgrease.gears.Accumulate(op: Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]¶
The local Accumulate operation performs many-to-one mapping (N:1) of records.
It requires one accumulator function.
Once input records are exhausted its output is a single record consisting of the accumulator’s value.
- Parameters
op (
redgrease.typing.Accumulator
) – Accumulation function to run on the input records.
Instantiate an Accumulate operation.
- Parameters
op (
redgrease.typing.Accumulator
) –Function to to apply on the input records. The function must take two arguments as input:
the input record, and
An accumulator value.
It should aggregate the input record into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.
LocalGroupBy¶
- class redgrease.gears.LocalGroupBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], reducer: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]¶
The local LocalGroupBy operation performs many-to-less mapping (N:M) of records.
The operation requires two functions, an extractor and a reducer.
The output records consist of the grouping key and its respective reduce value.
- extractor¶
Function that extracts the key to group by from input records.
- reducer¶
Function that reduces the records in each group to an output record.
Instantiate a LocalGroupBy operator.
- Parameters
extractor (
redgrease.typing.Extractor
) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.reducer (
redgrease.typing.Reducer
) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group.
Limit¶
- class redgrease.gears.Limit(length: int, start: int = 0, **kwargs)[source]¶
The local Limit operation limits the number of records.
It accepts two numeric arguments: a starting position in the input records “array” and a maximal number of output records.
- start¶
Starting index (0-based) of the input record to start from
- Type
int
- length¶
The maximum number of records to let through.
- Type
int
Instantiate a Limit operation
- Parameters
length (int) – The maximum number of records.
start (int, optional) – The index of the first input record. Defaults to 0.
Collect¶
Repartition¶
- class redgrease.gears.Repartition(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], **kwargs)[source]¶
- The global Repartition operation repartitions the records by them shuffling
between shards.
It accepts a single key extractor function. The extracted key is used for computing the record’s new placement in the cluster (i.e. hash slot). The operation then moves the record from its original shard to the new one.
- Attributes:
- extractor (redgrease.typing.Extractor):
A function deciding the destination shard of an input record.
Instantiate a Repartition operation
- Parameters
extractor (
redgrease.typing.Extractor
) – Function that takes a record and calculates a key that is used to determine the hash slot, and consequently the shard, that the record should migrate to to. The function must take one argument as input (input record) and return a string (key). The hash slot, and consequently the destination shard, is determined by hthe value of the key.
Aggregate¶
- class redgrease.gears.Aggregate(zero: Any, seqOp: Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], combOp: Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]¶
The Aggregate operation performs many-to-one mapping (N:1) of records.
Aggregate provides an alternative to the local accumulate operation as it takes the partitioning of data into consideration. Furthermore, because records are aggregated locally before collection, its performance is usually superior.
It requires a zero value and two accumulator functions for computing the local and global aggregates.
The operation is made of these steps:
The local accumulator is executed locally and initialized with the zero value.
A global collect moves all records to the originating engine.
The global accumulator is executed locally by the originating engine.
Its output is a single record consisting of the accumulator’s global value.
- zero¶
The initial / zero value for the accumulator variable.
- Type
Any
- seqOp¶
A local accumulator function, applied locally on each shard.
- combOp¶
A global accumulator function, applied on the results of the local accumulations.
Instantiates an Aggregate operation.
- Parameters
zero (Any) – The initial / zero value of the accumulator variable.
seqOp (
redgrease.typing.Accumulator
) – A function to be applied on each of the input records, locally per shard. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.combOp (
redgrease.typing.Accumulator
) – A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.
AggregateBy¶
- class redgrease.gears.AggregateBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], zero: Any, seqOp: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], combOp: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]¶
AggregateBy operation performs many-to-less mapping (N:M) of records.
It is similar to the Aggregate operation but aggregates per key. It requires a an extractor callback, a zero value and two reducers callbacks for computing the local and global aggregates.
The operation is made of these steps:
Extraction of the groups using extractor.
The local reducer is executed locally and initialized with the zero value.
A global repartition operation that uses the extractor.
- The global reducer is executed on each shard once it is repartitioned with its
relevant keys.
Output list of records, one for each key. The output records consist of the grouping key and its respective reducer’s value.
- extractor¶
Function that extracts the key to group by from input records.
- zero¶
The initial / zero value for the accumulator variable.
- Type
Any
- seqOp¶
A local accumulator function, applied locally on each shard.
- combOp¶
A global accumulator function, applied on the results of the local accumulations.
Instantiate an AggregateBy operation.
- Parameters
extractor (
redgrease.typing.Extractor
) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.zero (Any) – The initial / zero value of the accumulator variable.
seqOp (
redgrease.typing.Accumulator
) – A function to be applied on each of the input records, locally per shard and group. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.combOp (
redgrease.typing.Accumulator
) – A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.
GroupBy¶
- class redgrease.gears.GroupBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], reducer: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]¶
GroupBy * operation performs a many-to-less (N:M) grouping of records. It is similar to AggregateBy but uses only a global reducer. It can be used in cases where locally reducing the data isn’t possible.
The operation requires two functions; an extractor a reducer.
The operation is made of these steps:
1. A global repartition operation that uses the extractor. 2. The reducer is locally invoked.
Output is a locally-reduced list of records, one for each key. The output records consist of the grouping key and its respective accumulator’s value.
- extractor¶
Function that extracts the key to group by from input records.
- reducer¶
Function that reduces the records of each group to a value
Instantiate a GroupBy operation.
- Parameters
extractor (
redgrease.typing.Extractor
) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.reducer (
redgrease.typing.Reducer
) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group.
BatchGroupBy¶
- class redgrease.gears.BatchGroupBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], reducer: Callable[[redgrease.typing.Key, Iterable[redgrease.typing.InputRecord]], redgrease.typing.OutputRecord], **kwargs)[source]¶
BatchGroupBy operation performs a many-to-less (N:M) grouping of records.
Instead of using BatchGroupBy, prefer using the GroupBy operation as it is more efficient and performant. Only use BatchGroupBy when the reducer’s logic requires the full list of records for each input key.
The operation requires two functions; an extractor a batch reducer.
The operation is made of these steps:
1. A global repartition operation that uses the extractor 2. A local localgroupby operation that uses the batch reducer
Once finished, the operation locally outputs a record for each key and its respective accumulator value.
Using this operation may cause a substantial increase in memory usage during runtime.
- extractor¶
Function that extracts the key to group by from input records.
- reducer¶
Function that reduces the records of each group to a value
Instantiate a BatchGroupBy operation.
- Parameters
extractor (
redgrease.typing.Extractor
) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.reducer (
redgrease.typing.Reducer
) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group.
Sort¶
- class redgrease.gears.Sort(reverse: bool = True, **kwargs)[source]¶
Sort operation sorts the records.
It allows to control the sort order.
The operation is made of the following steps:
1. A global aggregate operation collects and combines all records. 2. A local sort is performed on the list. 3. The list is flatmapped to records.
Using this operation may cause an increase in memory usage during runtime due to the list being copied during the sorting operation.
- reverse¶
Defines if the sorting order is descending (
True
) or ascending (False
).- Type
bool
Instantiate a Sort operation.
- Parameters
reverse (bool, optional) – Sort in descending order (higher to lower). Defaults to
True
.
Distinct¶
- class redgrease.gears.Distinct(**kwargs)[source]¶
The Distinct operation returns distinct records.
It requires no arguments.
The operation is made of the following steps:
1. A aggregate operation locally reduces the records to sets that are then collected and unionized globally. 2. A local flatmap operation turns the set into records.
Instantiate a Distinct operation.
Count¶
CountBy¶
- class redgrease.gears.CountBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], **kwargs)[source]¶
The CountBy operation counts the records grouped by key.
It requires a single extractor function.
The operation is made of an aggregateby operation that uses local counting and global summing accumulators.
- extractor¶
Function that extracts the key to group by from input records.
Instantiate a CountBy operation.
- Parameters
extractor (
redgrease.typing.Extractor
) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.
Avg¶
- class redgrease.gears.Avg(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], **kwargs)[source]¶
The Avg operation returns the arithmetic average of records.
It has an optional value extractor function.
The operation is made of the following steps:
1. A aggregate operation locally reduces the records to tuples of sum and count that are globally combined. 2. A local map operation calculates the average from the global tuple.
- extractor¶
Function that extracts the key to group by from input records.
Instantiate an Avg operation.
- Parameters
extractor (
redgrease.typing.Extractor
) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.
Actions¶
Actions closes open GearFunctions, and indicates the running mode of the function, as follows:
Action |
Execution Mode |
---|---|
“batch-mode” |
|
“event-mode” |
Run¶
- class redgrease.gears.Run(arg: Optional[str] = None, convertToStr: bool = True, collect: bool = True, **kwargs)[source]¶
Run action
The Run action runs a Gear function as a batch. The function is executed once and exits once the data is exhausted by its reader.
The Run action can only be the last operation of any GearFunction, and it effectivesly ‘closes’ it to further operations.
- arg¶
Argument that’s passed to the reader, overriding its defaultArg. It means the following:
A glob-like pattern for the KeysReader and KeysOnlyReader readers.
A key name for the StreamReader reader.
A Python generator for the PythonReader reader.
- Type
str, optional
- convertToStr¶
When True, adds a map operation to the flow’s end that stringifies records.
- Type
bool
- collect¶
When True, adds a collect operation to flow’s end.
- Type
bool
Instantiate a Run action
- Parameters
arg (Optional[str], optional) –
Optional argument that’s passed to the reader, overriding its defaultArg. It means the following:
A glob-like pattern for the KeysReader and KeysOnlyReader readers.
A key name for the StreamReader reader.
A Python generator for the PythonReader reader.
Defaults to
None
.convertToStr (bool, optional) – When True, adds a map operation to the flow’s end that stringifies records. Defaults to
True
.collect (bool, optional) – When True, adds a collect operation to flow’s end. Defaults to
True
.
Register¶
- class redgrease.gears.Register(prefix: str = '*', convertToStr: bool = True, collect: bool = True, mode: str = 'async', onRegistered: Optional[Callable[[], None]] = None, **kwargs)[source]¶
Register action
The Register action registers a function as an event handler. The function is executed each time an event arrives. Each time it is executed, the function operates on the event’s data and once done it is suspended until its future invocations by new events.
The Register action can only be the last operation of any GearFunction, and it effectivesly ‘closes’ it to further operations.
- prefix¶
Key prefix pattern to match on. Not relevant for ‘CommandReader’ readers (see ‘trigger’).
- Type
str
- convertToStr¶
When True, adds a map operation to the flow’s end that stringifies records.
- Type
bool
- collect¶
When True, adds a collect operation to flow’s end.
- Type
bool
- mode¶
The execution mode of the triggered function.
- Type
str
- onRegistered¶
A function callback that’s called on each shard upon function registration.s
- Type
Callable
Instantiate a Register action
- Parameters
prefix (str, optional) – Key prefix pattern to match on. Not relevant for ‘CommandReader’ readers (see ‘trigger’). Defaults to ‘*’.
convertToStr (bool, optional) – When
True
adds a map operation to the flow’s end that stringifies records. Defaults toTrue
.collect (bool, optional) – When
True
adds a collect operation to flow’s end. Defaults toFalse
.mode (str, optional) –
The execution mode of the function. Can be one of:
- ``"async"``:
Execution will be asynchronous across the entire cluster.
"async_local"
:Execution will be asynchronous and restricted to the handling shard.
"sync"
:Execution will be synchronous and local
Defaults to redgrease.TriggerMode.Async (
"async"
)onRegistered (Registrator, optional) – A function callback that’s called on each shard upon function registration. It is a good place to initialize non-serializeable objects such as network connections. Defaults to
None
.
Operation Callback Types¶
This section runs through the various type signatures that the function callbacks used in the operations must follow.
Registrator¶
- redgrease.typing.Registrator¶
“Type definition for Registrator functions.
I.e. callback functions that may be called on each shard upon function registration. Such functions provide a good place to initialize non-serializable objects such as network connections.
An function of Registrator type shoud take no arguments, nor return any value.
alias of
Callable
[[],None
]
Extractor¶
- redgrease.typing.Extractor¶
Type definition for Extractor functions.
Extractor functions are used in the following operations:
Extractor functions extracts or calculates the value that should be used as (grouping) key, from an input record of the operation.
- Parameters
(InputRecord) - A single input-record, of the same type as the
operations’ input type.
- Returns
A any ‘Hashable’ value.
- Return type
Key
Example - Count users per supervisor:
# Function of "Extractor" type # Extracts the "supervisor" for a user, # If the user has no supervisor, then the user is considered its own supervisor. def supervisor(user) return user.get("supervisor", user["id"]) KeysReader("user:*").values().countby(supervisor).run()
alias of
Callable
[[redgrease.typing.InputRecord
],redgrease.typing.Key
]
Mapper¶
- redgrease.typing.Mapper¶
Type definition for Mapper functions.
Mapper functions are used in the following operations:
Mapper functions transforms a value from the operations input to some new value.
- Parameters
(InputRecord) - A single input-record, of the same type as the
operations’ input type.
- Returns
A any value.
- Return type
OutputRecord
alias of
Callable
[[redgrease.typing.InputRecord
],redgrease.typing.OutputRecord
]
Expander¶
- redgrease.typing.Expander¶
Type definition forExpander functions.
Expander functions are used in the following operations:
Expander functions transforms a value from the operations input into several new values.
- Parameters
(InputRecord) - A single input-record, of the same type as the
operations’ input type.
- Returns
An iterable sequence of values, for example a list, each of which becomes an input to the next operation.
- Return type
Iterable[OutputRecord]
alias of
Callable
[[redgrease.typing.InputRecord
],Iterable
[redgrease.typing.OutputRecord
]]
Processor¶
- redgrease.typing.Processor¶
Type definition forProcessor functions.
Processor functions are used in the following operations:
Processor functions performs some side effect using a value from the operations input.
- Parameters
(InputRecord) - A single input-record, of the same type as the
operations’ input type.
- Returns
Nothing.
- Return type
None
alias of
Callable
[[redgrease.typing.InputRecord
],None
]
Filterer¶
- redgrease.typing.Filterer¶
Type definition forFilterer functions.
Filterer functions are used in the following operations:
Filter functions evaluates a value from the operations input to either
True
orFalse
.- Parameters
(InputRecord) - A single input-record, of the same type as the
operations’ input type.
- Returns
Either
True
orFalse
.- Return type
bool
alias of
Callable
[[redgrease.typing.InputRecord
],bool
]
Accumulator¶
- redgrease.typing.Accumulator¶
Type definition forAccumulator functions.
Accumulator functions are used in the following operations:
Accumulator functions takes a variable that’s also called an accumulator, as well as an input record. It aggregates inputs into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value after each call.
- Parameters
( T ) - An accumulator value.
(InputRecord) - A single input-record, of the same type as the operations’ input
type.
- Returns
The updated accumulator value.
- Return type
T
alias of
Callable
[[redgrease.typing.T
,redgrease.typing.InputRecord
],redgrease.typing.T
]
Reducer¶
- redgrease.typing.Reducer¶
Type definition forReducer functions.
Reducer functions are used in the following operations:
Reducer functions receives a key, a variable that’s called an accumulator and an an input. It performs similarly to the
redgrease.typing.Accumulator callback
, with the difference being that it maintains an accumulator per reduced key.- Parameters
(Key) - A key value for the group.
( T ) - An accumulator value.
(InputRecord) - A single input-record, of the same type as the operations’ input
type.
- Returns
The updated accumulator value.
- Return type
T
alias of
Callable
[[redgrease.typing.Key
,redgrease.typing.T
,redgrease.typing.InputRecord
],redgrease.typing.T
]
BatchReducer¶
- redgrease.typing.BatchReducer¶
Type definition forBatchReducer functions.
BatchReducer functions are used in the following operations:
BatchReducer functions receives a key and a list of input records. It performs similarly to the
redgrease.typing.Reducer callback
, with the difference being that it is input with a list of records instead of a single one. It is expected to return an accumulator value for these records- Parameters
(Key) - A key value for the group.
(Iterable[InputRecord]) - A collection of input-record, of the same type as the
operations’ input type.
- Returns
A reduced output value.
- Return type
OutputRecord
alias of
Callable
[[redgrease.typing.Key
,Iterable
[redgrease.typing.InputRecord
]],redgrease.typing.OutputRecord
]