Operations

This section goes through the various operations available to Open GearFunction in more detail.

Map

class redgrease.gears.Map(op: Callable[[redgrease.typing.InputRecord], redgrease.typing.OutputRecord], **kwargs)[source]

The local Map operation performs the one-to-one (1:1) mapping of records.

It requires one mapper function.

op

The mapper function to map on all input records.

Type

redgrease.typing.Mapper

Instantiate a Map operation.

Parameters

op (redgrease.typing.Mapper) – Function to map on the input records. The function must take one argument as input (input record) and return something as an output (output record).

FlatMap

class redgrease.gears.FlatMap(op: Callable[[redgrease.typing.InputRecord], Iterable[redgrease.typing.OutputRecord]], **kwargs)[source]

The local FlatMap operation performs one-to-many (1:N) mapping of records.

It requires one expander function that maps a single input record to potentially multiple output records.

FlatMap is nearly identical to the Map operation in purpose and use. Unlike regular mapping, however, when FlatMap returns a sequence / iterator, each element in the sequence is turned into a separate output record.

op

The mapper function to map on all input records.

Type

redgrease.typing.Expander

Instantiate a FlatMap operation.

Parameters

op (redgrease.typing.Expander) – Function to map on the input records. The function must take one argument as input (input record) and return an iterable as an output (output records).

ForEach

class redgrease.gears.ForEach(op: Callable[[redgrease.typing.InputRecord], None], **kwargs)[source]

The local ForEach operation performs one-to-the-same (1=1) mapping.

It requires one processor function to perform some work that’s related to the input record.

Its output record is a copy of the input, which means anything the callback returns is discarded.

Parameters

op (redgrease.typing.Processor) – Function to run on the input records.

Instantiate a ForEach operation.

Parameters

op (redgrease.typing.Processor) – Function to run on each of the input records. The function must take one argument as input (input record) and should not return anything.

Filter

class redgrease.gears.Filter(op: Callable[[redgrease.typing.InputRecord], bool], **kwargs)[source]

The local Filter operation performs one-to-zero-or-one (1:(0|1)) filtering of records.

It requires a filterer function.

An input record that yields a falsehood will be discarded and only truthful ones will be output.

Parameters

op (redgrease.typing.Filterer) – Predicate function to run on the input records.

Instantiate a Filter operation.

Parameters

op (redgrease.typing.Filterer) – Function to apply on the input records, to decide which ones to keep. The function must take one argument as input (input record) and return a bool. The input records evaluated to True will be kept as output records.

Accumulate

class redgrease.gears.Accumulate(op: Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]

The local Accumulate operation performs many-to-one mapping (N:1) of records.

It requires one accumulator function.

Once input records are exhausted its output is a single record consisting of the accumulator’s value.

Parameters

op (redgrease.typing.Accumulator) – Accumulation function to run on the input records.

Instantiate an Accumulate operation.

Parameters

op (redgrease.typing.Accumulator) –

Function to to apply on the input records. The function must take two arguments as input:

  • the input record, and

  • An accumulator value.

It should aggregate the input record into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.

LocalGroupBy

class redgrease.gears.LocalGroupBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], reducer: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]

The local LocalGroupBy operation performs many-to-less mapping (N:M) of records.

The operation requires two functions, an extractor and a reducer.

The output records consist of the grouping key and its respective reduce value.

extractor

Function that extracts the key to group by from input records.

Type

redgrease.typing.Extractor

reducer

Function that reduces the records in each group to an output record.

Type

redgrease.typing.Reducer

Instantiate a LocalGroupBy operator.

Parameters
  • extractor (redgrease.typing.Extractor) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.

  • reducer (redgrease.typing.Reducer) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group.

Limit

class redgrease.gears.Limit(length: int, start: int = 0, **kwargs)[source]

The local Limit operation limits the number of records.

It accepts two numeric arguments: a starting position in the input records “array” and a maximal number of output records.

start

Starting index (0-based) of the input record to start from

Type

int

length

The maximum number of records to let through.

Type

int

Instantiate a Limit operation

Parameters
  • length (int) – The maximum number of records.

  • start (int, optional) – The index of the first input record. Defaults to 0.

Collect

class redgrease.gears.Collect(**kwargs)[source]

The global Collect operation collects the result records from all of the shards to the originating one.

Instantiate a Collect operation.

Repartition

class redgrease.gears.Repartition(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], **kwargs)[source]
The global Repartition operation repartitions the records by them shuffling

between shards.

It accepts a single key extractor function. The extracted key is used for computing the record’s new placement in the cluster (i.e. hash slot). The operation then moves the record from its original shard to the new one.

Attributes:
extractor (redgrease.typing.Extractor):

A function deciding the destination shard of an input record.

Instantiate a Repartition operation

Parameters

extractor (redgrease.typing.Extractor) – Function that takes a record and calculates a key that is used to determine the hash slot, and consequently the shard, that the record should migrate to to. The function must take one argument as input (input record) and return a string (key). The hash slot, and consequently the destination shard, is determined by hthe value of the key.

Aggregate

class redgrease.gears.Aggregate(zero: Any, seqOp: Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], combOp: Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]

The Aggregate operation performs many-to-one mapping (N:1) of records.

Aggregate provides an alternative to the local accumulate operation as it takes the partitioning of data into consideration. Furthermore, because records are aggregated locally before collection, its performance is usually superior.

It requires a zero value and two accumulator functions for computing the local and global aggregates.

The operation is made of these steps:

  1. The local accumulator is executed locally and initialized with the zero value.

  2. A global collect moves all records to the originating engine.

  3. The global accumulator is executed locally by the originating engine.

Its output is a single record consisting of the accumulator’s global value.

zero

The initial / zero value for the accumulator variable.

Type

Any

seqOp

A local accumulator function, applied locally on each shard.

Type

redgrease.typing.Accumulator

combOp

A global accumulator function, applied on the results of the local accumulations.

Type

redgrease.typing.Accumulator

Instantiates an Aggregate operation.

Parameters
  • zero (Any) – The initial / zero value of the accumulator variable.

  • seqOp (redgrease.typing.Accumulator) – A function to be applied on each of the input records, locally per shard. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.

  • combOp (redgrease.typing.Accumulator) – A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.

AggregateBy

class redgrease.gears.AggregateBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], zero: Any, seqOp: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], combOp: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]

AggregateBy operation performs many-to-less mapping (N:M) of records.

It is similar to the Aggregate operation but aggregates per key. It requires a an extractor callback, a zero value and two reducers callbacks for computing the local and global aggregates.

The operation is made of these steps:

  1. Extraction of the groups using extractor.

  2. The local reducer is executed locally and initialized with the zero value.

  3. A global repartition operation that uses the extractor.

  4. The global reducer is executed on each shard once it is repartitioned with its

    relevant keys.

Output list of records, one for each key. The output records consist of the grouping key and its respective reducer’s value.

extractor

Function that extracts the key to group by from input records.

Type

redgrease.typing.Extractor

zero

The initial / zero value for the accumulator variable.

Type

Any

seqOp

A local accumulator function, applied locally on each shard.

Type

redgrease.typing.Accumulator

combOp

A global accumulator function, applied on the results of the local accumulations.

Type

redgrease.typing.Accumulator

Instantiate an AggregateBy operation.

Parameters
  • extractor (redgrease.typing.Extractor) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.

  • zero (Any) – The initial / zero value of the accumulator variable.

  • seqOp (redgrease.typing.Accumulator) – A function to be applied on each of the input records, locally per shard and group. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.

  • combOp (redgrease.typing.Accumulator) – A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value.

GroupBy

class redgrease.gears.GroupBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], reducer: Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T], **kwargs)[source]

GroupBy * operation performs a many-to-less (N:M) grouping of records. It is similar to AggregateBy but uses only a global reducer. It can be used in cases where locally reducing the data isn’t possible.

The operation requires two functions; an extractor a reducer.

The operation is made of these steps:

1. A global repartition operation that uses the extractor.
2. The reducer is locally invoked.

Output is a locally-reduced list of records, one for each key. The output records consist of the grouping key and its respective accumulator’s value.

extractor

Function that extracts the key to group by from input records.

Type

redgrease.typing.Extractor

reducer

Function that reduces the records of each group to a value

Type

redgrease.typing.Reducer

Instantiate a GroupBy operation.

Parameters
  • extractor (redgrease.typing.Extractor) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.

  • reducer (redgrease.typing.Reducer) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group.

BatchGroupBy

class redgrease.gears.BatchGroupBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], reducer: Callable[[redgrease.typing.Key, Iterable[redgrease.typing.InputRecord]], redgrease.typing.OutputRecord], **kwargs)[source]

BatchGroupBy operation performs a many-to-less (N:M) grouping of records.

Instead of using BatchGroupBy, prefer using the GroupBy operation as it is more efficient and performant. Only use BatchGroupBy when the reducer’s logic requires the full list of records for each input key.

The operation requires two functions; an extractor a batch reducer.

The operation is made of these steps:

1. A global repartition operation that uses the extractor
2. A local localgroupby operation that uses the batch reducer

Once finished, the operation locally outputs a record for each key and its respective accumulator value.

Using this operation may cause a substantial increase in memory usage during runtime.

extractor

Function that extracts the key to group by from input records.

Type

redgrease.typing.Extractor

reducer

Function that reduces the records of each group to a value

Type

redgrease.typing.Reducer

Instantiate a BatchGroupBy operation.

Parameters
  • extractor (redgrease.typing.Extractor) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.

  • reducer (redgrease.typing.Reducer) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group.

Sort

class redgrease.gears.Sort(reverse: bool = True, **kwargs)[source]

Sort operation sorts the records.

It allows to control the sort order.

The operation is made of the following steps:

1. A global aggregate operation collects and combines all records.
2. A local sort is performed on the list.
3. The list is flatmapped to records.

Using this operation may cause an increase in memory usage during runtime due to the list being copied during the sorting operation.

reverse

Defines if the sorting order is descending (True) or ascending (False).

Type

bool

Instantiate a Sort operation.

Parameters

reverse (bool, optional) – Sort in descending order (higher to lower). Defaults to True.

Distinct

class redgrease.gears.Distinct(**kwargs)[source]

The Distinct operation returns distinct records.

It requires no arguments.

The operation is made of the following steps:

1. A aggregate operation locally reduces the records to sets that are then
    collected and unionized globally.
2. A local flatmap operation turns the set into records.

Instantiate a Distinct operation.

Count

class redgrease.gears.Count(**kwargs)[source]

The Count operation counts the number of input records.

It requires no arguments.

The operation is made of an aggregate operation that uses local counting and global summing accumulators.

Instantiate a Count operation.

CountBy

class redgrease.gears.CountBy(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], **kwargs)[source]

The CountBy operation counts the records grouped by key.

It requires a single extractor function.

The operation is made of an aggregateby operation that uses local counting and global summing accumulators.

extractor

Function that extracts the key to group by from input records.

Type

redgrease.typing.Extractor

Instantiate a CountBy operation.

Parameters

extractor (redgrease.typing.Extractor) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.

Avg

class redgrease.gears.Avg(extractor: Callable[[redgrease.typing.InputRecord], redgrease.typing.Key], **kwargs)[source]

The Avg operation returns the arithmetic average of records.

It has an optional value extractor function.

The operation is made of the following steps:

1. A aggregate operation locally reduces the records to tuples of sum and count
    that are globally combined.
2. A local map operation calculates the average from the global tuple.
extractor

Function that extracts the key to group by from input records.

Type

redgrease.typing.Extractor

Instantiate an Avg operation.

Parameters

extractor (redgrease.typing.Extractor) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key.

Actions

Actions closes open GearFunctions, and indicates the running mode of the function, as follows:

Action

Execution Mode

Run

“batch-mode”

Register

“event-mode”

Run

class redgrease.gears.Run(arg: Optional[str] = None, convertToStr: bool = True, collect: bool = True, **kwargs)[source]

Run action

The Run action runs a Gear function as a batch. The function is executed once and exits once the data is exhausted by its reader.

The Run action can only be the last operation of any GearFunction, and it effectivesly ‘closes’ it to further operations.

arg

Argument that’s passed to the reader, overriding its defaultArg. It means the following:

  • A glob-like pattern for the KeysReader and KeysOnlyReader readers.

  • A key name for the StreamReader reader.

  • A Python generator for the PythonReader reader.

Type

str, optional

convertToStr

When True, adds a map operation to the flow’s end that stringifies records.

Type

bool

collect

When True, adds a collect operation to flow’s end.

Type

bool

Instantiate a Run action

Parameters
  • arg (Optional[str], optional) –

    Optional argument that’s passed to the reader, overriding its defaultArg. It means the following:

    • A glob-like pattern for the KeysReader and KeysOnlyReader readers.

    • A key name for the StreamReader reader.

    • A Python generator for the PythonReader reader.

    Defaults to None.

  • convertToStr (bool, optional) – When True, adds a map operation to the flow’s end that stringifies records. Defaults to True.

  • collect (bool, optional) – When True, adds a collect operation to flow’s end. Defaults to True.

Register

class redgrease.gears.Register(prefix: str = '*', convertToStr: bool = True, collect: bool = True, mode: str = 'async', onRegistered: Optional[Callable[[], None]] = None, **kwargs)[source]

Register action

The Register action registers a function as an event handler. The function is executed each time an event arrives. Each time it is executed, the function operates on the event’s data and once done it is suspended until its future invocations by new events.

The Register action can only be the last operation of any GearFunction, and it effectivesly ‘closes’ it to further operations.

prefix

Key prefix pattern to match on. Not relevant for ‘CommandReader’ readers (see ‘trigger’).

Type

str

convertToStr

When True, adds a map operation to the flow’s end that stringifies records.

Type

bool

collect

When True, adds a collect operation to flow’s end.

Type

bool

mode

The execution mode of the triggered function.

Type

str

onRegistered

A function callback that’s called on each shard upon function registration.s

Type

Callable

Instantiate a Register action

Parameters
  • prefix (str, optional) – Key prefix pattern to match on. Not relevant for ‘CommandReader’ readers (see ‘trigger’). Defaults to ‘*’.

  • convertToStr (bool, optional) – When True adds a map operation to the flow’s end that stringifies records. Defaults to True.

  • collect (bool, optional) – When True adds a collect operation to flow’s end. Defaults to False.

  • mode (str, optional) –

    The execution mode of the function. Can be one of:

    - ``"async"``:
    

    Execution will be asynchronous across the entire cluster.

    • "async_local":

      Execution will be asynchronous and restricted to the handling shard.

    • "sync":

      Execution will be synchronous and local

    Defaults to redgrease.TriggerMode.Async ("async")

  • onRegistered (Registrator, optional) – A function callback that’s called on each shard upon function registration. It is a good place to initialize non-serializeable objects such as network connections. Defaults to None.

Operation Callback Types

This section runs through the various type signatures that the function callbacks used in the operations must follow.

Registrator

redgrease.typing.Registrator

“Type definition for Registrator functions.

I.e. callback functions that may be called on each shard upon function registration. Such functions provide a good place to initialize non-serializable objects such as network connections.

An function of Registrator type shoud take no arguments, nor return any value.

alias of Callable[[], None]

Extractor

redgrease.typing.Extractor

Type definition for Extractor functions.

Extractor functions are used in the following operations:

Extractor functions extracts or calculates the value that should be used as (grouping) key, from an input record of the operation.

Parameters

(InputRecord) - A single input-record, of the same type as the

operations’ input type.

Returns

A any ‘Hashable’ value.

Return type

Key

Example - Count users per supervisor:

# Function of "Extractor" type
# Extracts the "supervisor" for a user,
# If the user has no supervisor, then the user is considered its own supervisor.
def supervisor(user)
    return user.get("supervisor", user["id"])

KeysReader("user:*").values().countby(supervisor).run()

alias of Callable[[redgrease.typing.InputRecord], redgrease.typing.Key]

Mapper

redgrease.typing.Mapper

Type definition for Mapper functions.

Mapper functions are used in the following operations:

Mapper functions transforms a value from the operations input to some new value.

Parameters

(InputRecord) - A single input-record, of the same type as the

operations’ input type.

Returns

A any value.

Return type

OutputRecord

alias of Callable[[redgrease.typing.InputRecord], redgrease.typing.OutputRecord]

Expander

redgrease.typing.Expander

Type definition forExpander functions.

Expander functions are used in the following operations:

Expander functions transforms a value from the operations input into several new values.

Parameters

(InputRecord) - A single input-record, of the same type as the

operations’ input type.

Returns

An iterable sequence of values, for example a list, each of which becomes an input to the next operation.

Return type

Iterable[OutputRecord]

alias of Callable[[redgrease.typing.InputRecord], Iterable[redgrease.typing.OutputRecord]]

Processor

redgrease.typing.Processor

Type definition forProcessor functions.

Processor functions are used in the following operations:

Processor functions performs some side effect using a value from the operations input.

Parameters

(InputRecord) - A single input-record, of the same type as the

operations’ input type.

Returns

Nothing.

Return type

None

alias of Callable[[redgrease.typing.InputRecord], None]

Filterer

redgrease.typing.Filterer

Type definition forFilterer functions.

Filterer functions are used in the following operations:

Filter functions evaluates a value from the operations input to either True or False.

Parameters

(InputRecord) - A single input-record, of the same type as the

operations’ input type.

Returns

Either True or False.

Return type

bool

alias of Callable[[redgrease.typing.InputRecord], bool]

Accumulator

redgrease.typing.Accumulator

Type definition forAccumulator functions.

Accumulator functions are used in the following operations:

Accumulator functions takes a variable that’s also called an accumulator, as well as an input record. It aggregates inputs into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value after each call.

Parameters

  • ( T ) - An accumulator value.

  • (InputRecord) - A single input-record, of the same type as the operations’ input

type.

Returns

The updated accumulator value.

Return type

T

alias of Callable[[redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T]

Reducer

redgrease.typing.Reducer

Type definition forReducer functions.

Reducer functions are used in the following operations:

Reducer functions receives a key, a variable that’s called an accumulator and an an input. It performs similarly to the redgrease.typing.Accumulator callback, with the difference being that it maintains an accumulator per reduced key.

Parameters

  • (Key) - A key value for the group.

  • ( T ) - An accumulator value.

  • (InputRecord) - A single input-record, of the same type as the operations’ input

type.

Returns

The updated accumulator value.

Return type

T

alias of Callable[[redgrease.typing.Key, redgrease.typing.T, redgrease.typing.InputRecord], redgrease.typing.T]

BatchReducer

redgrease.typing.BatchReducer

Type definition forBatchReducer functions.

BatchReducer functions are used in the following operations:

BatchReducer functions receives a key and a list of input records. It performs similarly to the redgrease.typing.Reducer callback, with the difference being that it is input with a list of records instead of a single one. It is expected to return an accumulator value for these records

Parameters

  • (Key) - A key value for the group.

  • (Iterable[InputRecord]) - A collection of input-record, of the same type as the

operations’ input type.

Returns

A reduced output value.

Return type

OutputRecord

alias of Callable[[redgrease.typing.Key, Iterable[redgrease.typing.InputRecord]], redgrease.typing.OutputRecord]


Courtesy of : Lyngon Pte. Ltd.