Builtin Runtime Functions

The RedisGears Python server runtime automagically expose a number of functions into the scope of any Gear functions being executed. These “builtin” runtime functions can be used in Gear Functions without importing any module or package:

Note

With the exception of the GearsBuilder neither these functions cannot be used in normal application code outside Gear functions running in the RedisGears server runtime.

RedGrease expose its own wrapped versions of these RedisGears runtime functions which, for the most part, behave exactly like the originals, but require you to import them, either from the top level redgrease package, or from the redgrease.runtime module.

But if these are the same, why would you bother with them?

The main reason to use the RedGrease versions, is that they aid during development by enabling most Integrated Development Environment (IDE) access to the doc-strings and type-annotations that the RedGrease versions are providing.

This alone can help greatly in developing gears faster (e.g. through auto-complete) and with less errors (e.g. with type checking).

Note

If you are only using these wrapped runtime functions in your Gear Functions, and no other RedGrease features, then you actually don’t need RedGrease to be installed on the RedisGears server runtime. Explicitly setting enforce_redgrease argument to False when executing a function script with to Gears.pyexecute(), will not add any redgrease requirement to the function and simply ignore any explicit runtime imports.

The section, Redgrease Extras Options, goes deeper into the details of the various RedGrease extras options, and their limitations.

Another reason to use the functions from RedGrease runtime, is that it contains some slightly enhanced variants of the defaults, like for example, the log, or have alternative versions, like hashtag3.

And if you are going to use other RedGrease features, then you will have to load the top level redgrease namespace anyway, which automatically expose the runtime functions.

The RedGrease runtime functions can be imported in a few ways:

  • Directly from the package top-level, e.g:

    from redgrease import GearsBuilder, log, atomic, execute
    
  • Explicitly from the redgrease.runtime module:

    from redgrease.runtime import GearsBuilder, log, atomic, execute
    
  • By importing the redgrease.runtime module:

    import redgrease.runtime
    

It is possible to load all symbols, using *, although it’s generally not a recommended practice, particularly not for the top level redgrease package.

execute

RedGrease’s version of runtime.execute() behaves just like the default.

This function executes an arbitrary Redis command inside Gear functions.

Note

For more information about Redis commands refer to:

Arguments

  • command : the command to execute

  • args : the command’s arguments

Example:

from redgrease import execute

# Pings the server (reply should be 'PONG')
reply = execute('PING')

In most cases, a more convenient approach is to use Serverside Redis Commands to execute Redis Commands inside Gear Functions.

Longer Example:

from redgrease import GearsBuilder, execute

def age(x):
   ''' Extracts the age from a person's record '''
   return int(x['value']['age'])

def cas(x):
   ''' Checks and sets the current maximum '''
   k = 'age:maximum'
   v = execute('GET', k)   # read key's current value
   v = int(v) if v else 0  # initialize to 0 if N
   if x > v:               # if a new maximum found
   execute('SET', k, x)  # set key to new value

# Event handling function registration
gb = GearsBuilder()
gb.map(age)
gb.foreach(cas)
gb.register('person:*')

execute API Reference

redgrease.runtime.execute(command: str, *args) bytes[source]

Execute an arbitrary Redis command.

Parameters

command (str) – The commant to execute

Returns

Raw command response

Return type

bytes

atomic

RedGrease’s version of runtime.atomic() behaves just like the default.

Atomic provides a context manager that ensures that all operations in it are executed atomically by blocking the main Redis process.

Example:

from redgrease import atomic, GB, hashtag

# Increments two keys atomically
def transaction(_):
   with atomic():
      execute('INCR', f'{{{hashtag()}}}:foo')
      execute('INCR', f'{{{hashtag()}}}:bar')

gb = GB('ShardsIDReader')
gb.foreach(transaction)
gb.run()

atomic API Reference

class redgrease.runtime.atomic[source]

The atomic() Python context is imported to the runtime’s environment by default.

The context ensures that all operations in it are executed atomically by blocking he main Redis process.

configGet

RedGrease’s version of runtime.configGet() behaves just like the default.

This function fetches the current value of a RedisGears configuration options.

Example:

from redgrease import configGet

# Gets the current value for 'ProfileExecutions'
foo = configGet('ProfileExecutions')

configGet API Reference

redgrease.runtime.configGet(key: str) str[source]

Fetches the current value of a RedisGears configuration option.

Parameters

key (str) – The configuration option key

gearsConfigGet

RedGrease’s version of runtime.gearsConfigGet() behaves just like the default.

This function fetches the current value of a RedisGears configuration options, and returns a default value if that key does not exist.

Example:

from redgrease import gearsConfigGet

# Gets the 'foo' configuration option key and defaults to 'bar'
foo = gearsConfigGet('foo', default='bar')

gearsConfigGet API Reference

redgrease.runtime.gearsConfigGet(key: str, default=None) str[source]

Fetches the current value of a RedisGears configuration option and returns a default value if that key does not exist.

Parameters
  • key (str) – The configuration option key.

  • default ([type], optional) – A default value. Defaults to None.

hashtag

RedGrease’s version of runtime.hashtag() behaves just like the default.

This function returns a hashtag that maps to the lowest hash slot served by the local engine’s shard. Put differently, it is useful as a hashtag for partitioning in a cluster.

hashtag API Reference

redgrease.runtime.hashtag() str[source]

Returns a hashtag that maps to the lowest hash slot served by the local engine’s shard. Put differently, it is useful as a hashtag for partitioning in a cluster.

Returns

A hastag that maps to the lowest hash slot served by the local engine.

Return type

str

hashtag3

This function, runtime.hashtag3(), is not part of the default RedisGears runtime scope, and is introduced by RedGrease. It is a slightly modified version of version of runtime.hashtag() but adds enclosing curly braces ("{" and "}") to the hashtag, so it can be used directly inside Python f-strings.

hashtag3 API Reference

redgrease.runtime.hashtag3() str[source]

Provides a the same value as hashtag, but surrounded by curly braces.

For example, if hashtag() generates “06S”, then `hashtag3’ gives “{06S}”.

This is useful for creating slot-specific keys using f-strings, inside gear functions, as the braces are already escaped. Example:

redgrease.cmd.set(f"{hashtag3()}",  some_value)
Returns

A braces-enclosed hashtag string

Return type

str

log

RedGrease’s version of runtime.log() behaves almost like the default. It prints a message to Redis’ log, but forces the the argument to a string before logging it.

(The built in default throws an error if the argument is not a string.)

Example:

from redgrease import GB, log

# Dumps every datum in the DB to the log for "debug" purposes
GB().foreach(lambda x: log(str(x), level='debug')).run()

log API Reference

redgrease.runtime.log(message: str, level: str = 'notice')[source]

Print a message to Redis’ log.

Parameters
  • message (str) – The message to output

  • level (str, optional) –

    Message loglevel. Either:

    - 'debug'
    - 'verbose'
    - 'notice'
    - 'warning'
    

    Defaults to ‘notice’.

gearsFuture

The gearsFuture object allows another thread/process to process the record.

Returning this object from a step’s operation tells RedisGears to suspend execution until background processing had finished/failed.

The gearsFuture object provides two control methods: gearsFuture.continueRun() and gearsFuture.continueFailed(). Both methods are thread-safe and can be called at any time to signal that the background processing has finished.

gearsFuture.continueRun() signals success and its argument is a record for the main process. gearsFuture.continueFailed() reports a failure to the main process and its argument is a string describing the failure.

Calling gearsFuture() is supported only from the context of the following operations:

An attempt to create a gearsFuture object outside of the supported contexts will result in an exception.

Note

gearsFuture was introduced in RedisGears 1.0.2.

Example:

gearsFuture with Python Async Await

gearsFuture is also integrated seamlessly with Python’s async/await syntax, so it possible to do the following:

import asyncio

from redgrease import GB


async def c(r):
    await asyncio.sleep(1)
    return r


GB("ShardsIDReader").map(c).run()

gearsFuture API Reference

class redgrease.runtime.gearsFuture[source]

The gearsFuture object allows another thread/process to process the record.

Returning this object from a step’s operation tells RedisGears to suspend execution until background processing had finished/failed.

The gearsFuture object provides two control methods: continueRun() and continueFailed(). Both methods are thread-safe and can be called at any time to signal that the background processing has finished.

continueRun() signals success and its argument is a record for the main process. continueFailed() reports a failure to the main process and its argument is a string describing the failure.

Calling gearsFuture() is supported only from the context of the following operations:

  • Map

  • flatmap

  • filter

  • foreach

  • aggregate

  • aggregateby

An attempt to create a gearsFuture object outside of the supported contexts will result in an exception.

continueRun(record) None[source]

Signals success and its argument is a record for the main process.

Parameters

record (Any) – Record to yield to the blocked Gear function.

continueFail(message: str)[source]

Reports a failure to the main process and its argument is a string describing the failure.

Parameters

message (str) – Message describing the failure.

GearsBuilder

The runtime.GearsBuilder (as well as its short-form alias GB), behaves exactly like the default RedisGears version, with a couple of exceptions:

  1. It has a property gearfunction which gives access to the constructed GearFunction object at that that point in the builder pipeline.

  2. Any additional arguments passed to its constructor, will be passed as defaults to the run() or register() action that terminates the build.

Note

The runtime.GearsBuilder objects are mutable with respect to the operations, whereas GearFunction objects are immutable and returns a new function when an operation is applied.

This means that:

fun = GearsBuilder()
fun.map(...)
fun.aggregateby(...)

Creates one single function equivalent to:

fun = KeysReader().map(...).aggregateby(...)

Whereas:

sad = KeysReader()
sad.map(...)
sad.aggregateby(...)

Creates three functions; One named sad that is just the KeysReader, one which is sad with a Map and one which is sad with a AggregateBy. The latter two functions are also not bound to any variables in this example.

GearsBuilder API Reference

class redgrease.runtime.GearsBuilder(reader: str = 'KeysReader', defaultArg: str = '*', desc: Optional[str] = None, *args, **kwargs)[source]

The RedisGears GearsBuilder class is imported to the runtime’s environment by default, and this class is a RedGrease wrapper of it.

It exposes the functionality of the function’s context builder.

Warning

GearsBuilder is mutable with respect to the operations.

The GearsBuilder is a subclass of gears.OpenGearFunction, but unlike other OpenGearFunctions, the GearsBuilder mutates an internal GearFunction instead of creating a new one for each operation. This behavior is deliberate, in order to be consistent with the original GearsBuilder.

Gear function / process factory

Parameters
  • reader (str, optional) –

    Input records reader Defining where the input to the gear will come from. One of:

    • "KeysReader"

    • "KeysOnlyReader"

    • "StreamReader"

    • "PythonReader"

    • "ShardsReader"

    • "CommandReader"

    Defaults to ‘KeysReader’.

  • defaultArg (str, optional) – Additional arguments to the reader. These are usually a key’s name, prefix, glob-like or a regular expression. Its use depends on the function’s reader type and action. Defaults to ‘*’.

  • desc (str, optional) – An optional description. Defaults to None.

property gearfunction

The “open” GearFunction object at this step in the pipeline.

This GearFunction is itself immutable but can be built upon to create new GearFunctions, independently from the GearsBuilder.

Returns

The current GearFunction object.

Return type

redgrease.gears.OpenGearFunction

run(arg: Optional[str] = None, convertToStr: bool = True, collect: bool = True, requirements: Optional[Iterable[str]] = None, on=None, **kwargs) redgrease.gears.ClosedGearFunction[source]

Create a “closed” function to be Run as in “batch-mode”.

Batch functions are executed once and exits once the data is exhausted by its reader.

Parameters
  • arg (str, optional) –

    An optional argument that’s passed to the reader as its defaultArg. It means the following:

    • A glob-like pattern for the KeysReader and KeysOnlyReader readers.

    • A key name for the StreamReader reader.

    • A Python generator for the PythonReader reader.

    Defaults to None.

  • convertToStr (bool, optional) – When True, adds a map operation to the flow’s end that stringifies records. Defaults to False.

  • collect (bool, optional) – When True adds a collect operation to flow’s end. Defaults to False.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • on (redis.Redis) – Immediately execute the function on this RedisGears system.

  • **kwargs – Additional parameters to the run operation.

Returns

A new closed batch function, if on is not specified. An execution result, if on is specified.

Return type

Union[ClosedGearFunction, redgrease.data.ExecutionResult]

Raises

TypeError – If the function does not support batch mode.

register(prefix: str = '*', convertToStr: bool = True, collect: bool = True, mode: Optional[str] = None, onRegistered: Optional[Callable[[], None]] = None, eventTypes: Optional[Iterable[str]] = None, keyTypes: Optional[Iterable[str]] = None, readValue: Optional[bool] = None, batch: Optional[int] = None, duration: Optional[int] = None, onFailedPolicy: Optional[str] = None, onFailedRetryInterval: Optional[int] = None, trimStream: Optional[bool] = None, trigger: Optional[str] = None, requirements: Optional[Iterable[str]] = None, on=None, **kwargs) redgrease.gears.ClosedGearFunction[source]

Create a “closed” function to be Register ‘ed as an event-triggered function.

Event functions are executed each time an event arrives. Each time it is executed, the function operates on the event’s data and once done is suspended until its future invocations by new events. :param prefix: Key prefix pattern to match on.

Not relevant for ‘CommandReader’ readers (see ‘trigger’). Defaults to "*".

Parameters
  • convertToStr (bool, optional) – When True adds a map operation to the flow’s end that stringifies records. Defaults to True.

  • collect (bool, optional) – When True adds a collect operation to flow’s end. Defaults to False.

  • mode (str, optional) –

    The execution mode of the function. Can be one of:

    • "async":

      Execution will be asynchronous across the entire cluster.

    • "async_local":

      Execution will be asynchronous and restricted to the handling shard.

    • "sync":

      Execution will be synchronous and local.

    Defaults to "async".

  • onRegistered (Registrator, optional) – A function that’s called on each shard upon function registration. It is a good place to initialize non-serializeable objects such as network connections. Defaults to None.

  • eventTypes (Iterable[str], optional) –

    For KeysReader only. A whitelist of event types that trigger execution when the KeysReader are used. The list may contain one or more:

    • Any Redis or module command

    • Any Redis event

    Defaults to None.

  • keyTypes (Iterable[str], optional) –

    For KeysReader and KeysOnlyReader only. A whitelist of key types that trigger execution when using the KeysReader or KeysOnlyReader readers. The list may contain one or more from the following:

    • Redis core types:

      "string", "hash", "list", "set", "zset" or "stream"

    • Redis module types:

      "module"

    Defaults to None.

  • readValue (bool, optional) – For KeysReader only. When False the value will not be read, so the ‘type’ and ‘value’ of the record will be set to None. Defaults to True.

  • batch (int, optional) – For StreamReader only. The number of new messages that trigger execution. Defaults to 1.

  • duration (int, optional) – For StreamReader only. The time to wait before execution is triggered, regardless of the batch size (0 for no duration). Defaults to 0.

  • onFailedPolicy (str, optional) –

    For StreamReader only. The policy for handling execution failures. May be one of:

    • "continue":

      Ignores a failure and continues to the next execution. This is the default policy.

    • "abort":

      Stops further executions.

    • "retry":

      Retries the execution after an interval specified with onFailedRetryInterval (default is one second).

    Defaults to "continue".

  • onFailedRetryInterval (int, optional) – For StreamReader only. The interval (in milliseconds) in which to retry in case onFailedPolicy is ‘retry’. Defaults to 1.

  • trimStream (bool, optional) – For StreamReader only. When True the stream will be trimmed after execution Defaults to True.

  • trigger (str) – For ‘CommandReader’ only, and mandatory. The trigger string that will trigger the function.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • on (redis.Redis) – Immediately execute the function on this RedisGears system.

  • **kwargs – Additional parameters to the register operation.

Returns

A new closed event function, if on is not specified. An execution result, if on is specified.

Return type

Union[ClosedGearFunction, redgrease.data.ExecutionResult]

Raises

TypeError – If the function does not support event mode.

map(op: Callable[[redgrease.typing.InputRecord], redgrease.typing.OutputRecord], requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local Map operation that performs a one-to-one (1:1) mapping of records.

:param op redgrease.typing.Mapper): Function to map on the input records.

The function must take one argument as input (input record) and return something as an output (output record).

Parameters
  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Map operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Map operation as last step.

Return type

GearsBuilder

flatmap(op: Optional[Callable[[redgrease.typing.InputRecord], Iterable[redgrease.typing.OutputRecord]]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local FlatMap operation that performs one-to-many (1:N) mapping of records.

:param op redgrease.typing.Expander: Function to map on the input records.

The function must take one argument as input (input record) and return an iterable as an output (output records). Defaults to the ‘identity-function’, I.e. if input is an iterable will be expanded.

Parameters
  • optional) – Function to map on the input records. The function must take one argument as input (input record) and return an iterable as an output (output records). Defaults to the ‘identity-function’, I.e. if input is an iterable will be expanded.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the FlatMap operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a FlatMap operation as last step.

Return type

GearsBuilder

foreach(op: Callable[[redgrease.typing.InputRecord], None], requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local ForEach operation performs one-to-the-same (1=1) mapping.

:param op redgrease.typing.Processor): Function to run on each of the input records.

The function must take one argument as input (input record) and should not return anything.

Parameters
  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the ForEach operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a ForEach operation as last step.

Return type

GearsBuilder

filter(op: Optional[Callable[[redgrease.typing.InputRecord], bool]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local Filter operation performs one-to-zero-or-one (1:bool) filtering of records.

:param op redgrease.typing.Filterer: Function to apply on the input records, to decide which ones to keep.

The function must take one argument as input (input record) and return a bool. The input records evaluated to True will be kept as output records. Defaults to the ‘identity-function’, i.e. records are filtered based on their own trueess or falseness.

Parameters
  • optional) – Function to apply on the input records, to decide which ones to keep. The function must take one argument as input (input record) and return a bool. The input records evaluated to True will be kept as output records. Defaults to the ‘identity-function’, i.e. records are filtered based on their own trueess or falseness.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Filter operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a FIlter operation as last step.

Return type

GearsBuilder

accumulate(op: Optional[Callable[[redgrease.runtime.T, redgrease.typing.InputRecord], redgrease.runtime.T]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local Accumulate operation performs many-to-one mapping (N:1) of records.

:param op redgrease.typing.Accumulator: Function to to apply on the input records.
The function must take two arguments as input:
  • An accumulator value, and

  • The input record.

It should aggregate the input record into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to a list accumulator, I.e. the output will be a list of all inputs.

Parameters
  • optional)

    Function to to apply on the input records. The function must take two arguments as input:

    • An accumulator value, and

    • The input record.

    It should aggregate the input record into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to a list accumulator, I.e. the output will be a list of all inputs.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Accumulate operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with Accumulate operation as last step.

Return type

GearsBuilder

localgroupby(extractor: Optional[Callable[[redgrease.typing.InputRecord], redgrease.typing.Key]] = None, reducer: Optional[Callable[[redgrease.typing.Key, redgrease.runtime.T, redgrease.typing.InputRecord], redgrease.runtime.T]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local LocalGroupBy operation performs many-to-less mapping (N:M) of records.

:param extractor redgrease.typing.Extractor: Function to apply on the input records, to extact the grouping key.

The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

Parameters

optional) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

:param reducer redgrease.typing.Reducer: Function to apply on the records of each group, to reduce to a single

value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Defaults to a list accumulator, I.e. the output will be a list of all inputs, for each group.

Parameters
  • optional) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Defaults to a list accumulator, I.e. the output will be a list of all inputs, for each group.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the LocalGroupBy operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a LocalGroupBy operation as last step.

Return type

GearsBuilder

limit(length: int, start: int = 0, **kwargs) redgrease.runtime.GearsBuilder[source]

Instance-local Limit operation limits the number of records.

Parameters
  • length (int) – The maximum number of records.

  • start (int, optional) – The index of the first input record. Defaults to 0.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Limit operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Limit operation as last step.

Return type

GearsBuilder

collect(**kwargs) redgrease.runtime.GearsBuilder[source]

Cluster-global Collect operation collects the result records.

Parameters

**kwargs – Additional parameters to the Collect operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Collect operation as last step.

Return type

GearsBuilder

repartition(extractor: Callable[[redgrease.typing.InputRecord], Hashable], requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Cluster-global Repartition operation repartitions the records by shuffling them between shards.

:param extractor redgrease.typing.Extractor): Function that takes a record and calculates a key that is used to

determine the hash slot, and consequently the shard, that the record should migrate to to. The function must take one argument as input (input record) and return a string (key). The hash slot, and consequently the destination shard, is determined by the value of the key.

Parameters
  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Repartition operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Repartition operation as last step.

Return type

GearsBuilder

aggregate(zero: Optional[redgrease.runtime.T] = None, seqOp: Optional[Callable[[redgrease.runtime.T, redgrease.typing.InputRecord], redgrease.runtime.T]] = None, combOp: Optional[Callable[[redgrease.runtime.T, redgrease.runtime.T], redgrease.runtime.T]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Distributed Aggregate operation perform an aggregation on local data then a global aggregation on the local aggregations.

Parameters

zero (Any, optional) – The initial / zero value of the accumulator variable. Defaults to an empty list.

:param seqOp redgrease.typing.Accumulator: A function to be applied on each of the input records, locally per

shard. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to addition, if ‘zero’ is a number and to a list accumulator if ‘zero’ is a list.

Parameters

optional) – A function to be applied on each of the input records, locally per shard. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to addition, if ‘zero’ is a number and to a list accumulator if ‘zero’ is a list.

:param combOp redgrease.typing.Accumulator: A function to be applied on each of the aggregated results of the local

aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to re-use the seqOp function.

Parameters
  • optional) – A function to be applied on each of the aggregated results of the local aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to re-use the seqOp function.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Aggregate operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Aggregate operation as last step.

Return type

GearsBuilder

aggregateby(extractor: Optional[Callable[[redgrease.typing.InputRecord], redgrease.typing.Key]] = None, zero: Optional[redgrease.runtime.T] = None, seqOp: Optional[Callable[[redgrease.typing.Key, redgrease.runtime.T, redgrease.typing.InputRecord], redgrease.runtime.T]] = None, combOp: Optional[Callable[[redgrease.typing.Key, redgrease.runtime.T, redgrease.runtime.T], redgrease.runtime.T]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Distributed AggregateBy operation, behaves like aggregate, but separated on each key, extracted using the extractor.

:param extractor redgrease.typing.Extractor: Function to apply on the input records, to extact the grouping key.

The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

Parameters
  • optional) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

  • zero (Any, optional) – The initial / zero value of the accumulator variable. Defaults to an empty list.

:param seqOp redgrease.typing.Accumulator: A function to be applied on each of the input records, locally per

shard and group. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to a list reducer.

Parameters

optional) – A function to be applied on each of the input records, locally per shard and group. It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to a list reducer.

:param combOp redgrease.typing.Accumulator): A function to be applied on each of the aggregated results of the local

aggregation (i.e. the output of seqOp). It must take two parameters: - an accumulator value, from previous calls - an input record The function aggregates the input into the accumulator variable, which stores the state between the function’s invocations. The function must return the accumulator’s updated value. Defaults to re-use the seqOp function.

Parameters
  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the AggregateBy operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a AggregateBy operation as last step.

Return type

GearsBuilder

groupby(extractor: Optional[Callable[[redgrease.typing.InputRecord], redgrease.typing.Key]] = None, reducer: Optional[Callable[[redgrease.typing.Key, redgrease.runtime.T, redgrease.typing.InputRecord], redgrease.runtime.T]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Cluster-local GroupBy operation performing a many-to-less (N:M) grouping of records.

:param extractor redgrease.typing.Extractor: Function to apply on the input records, to extact the grouping key.

The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

Parameters

optional) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

:param reducer redgrease.typing.Reducer: Function to apply on the records of each group, to reduce to a single

value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Defaults to a list reducer.

Parameters
  • optional) – Function to apply on the records of each group, to reduce to a single value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Defaults to a list reducer.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the GroupBy operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a GroupBy operation as last step.

Return type

GearsBuilder

batchgroupby(extractor: Optional[Callable[[redgrease.typing.InputRecord], redgrease.typing.Key]] = None, reducer: Optional[Callable[[redgrease.typing.Key, Iterable[redgrease.runtime.T]], redgrease.typing.InputRecord]] = None, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Cluster-local GroupBy operation, performing a many-to-less (N:M) grouping of records.

Note: Using this operation may cause a substantial increase in memory usage

during runtime. Consider using the GroupBy

:param extractor redgrease.typing.Extractor: Function to apply on the input records, to extact the grouping key.

The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

Parameters

optional) – Function to apply on the input records, to extact the grouping key. The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to the hash of the input.

:param reducer redgrease.typing.Reducer): Function to apply on the records of each group, to reduce to a single

value (per group). The function must take (a) a key, (b) an input record and (c) a variable that’s called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key / group. Default is the length (len) of the input.

Parameters

**kwargs – Additional parameters to the BatchGroupBy operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a BatchGroupBy operation as last step.

Return type

GearsBuilder

sort(reverse: bool = True, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Sort the records

Parameters
  • reverse (bool, optional) – Sort in descending order (higher to lower). Defaults to True.

  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the Sort operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Sort operation as last step.

Return type

GearsBuilder

distinct(**kwargs) redgrease.runtime.GearsBuilder[source]

Keep only the Distinct values in the data.

Parameters

**kwargs – Additional parameters to the Distinct operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Distinct operation as last step.

Return type

GearsBuilder

count(**kwargs) redgrease.runtime.GearsBuilder[source]

Count the number of records in the execution.

Parameters

**kwargs – Additional parameters to the Count operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a Count operation as last step.

Return type

GearsBuilder

countby(extractor: Callable[[redgrease.typing.InputRecord], Hashable] = <function GearsBuilder.<lambda>>, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Distributed CountBy operation counting the records grouped by key.

:param extractor redgrease.typing.Extractor): Function to apply on the input records, to extact the grouping key.

The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to ‘lambda x: str(x)’.

Parameters
  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the CountBy operation.

Returns

Itself, i.e. the same GearsBuilder, but with its internal gear function updated with a CountBy operation as last step.

Return type

GearsBuilder

avg(extractor: Callable[[redgrease.typing.InputRecord], float] = <function GearsBuilder.<lambda>>, requirements: Optional[Iterable[str]] = None, **kwargs) redgrease.runtime.GearsBuilder[source]

Distributed Avg operation, calculating arithmetic average of the records.

:param extractor redgrease.typing.Extractor): Function to apply on the input records, to extact the grouping key.

The function must take one argument as input (input record) and return a string (key). The groups are defined by the value of the key. Defaults to ‘lambda x: float(x)’.

Parameters
  • requirements (Iterable[str], optional) – Additional requirements / dependency Python packages. Defaults to None.

  • **kwargs – Additional parameters to the map operation.

Returns

A new “open” gear function with an avg operation as last step. GearsBuilder - The same GearBuilder, but with updated function.

Note that for GearBuilder this method does not return a new GearFunction, but instead returns the same GearBuilder, but with its internal function updated.

Return type

OpenGearFunction

property reader: Optional[str]

The reader type, generating the initial input records to the GearFunction.

Returns

Either "KeysReader", "KeysOnlyReader", "StreamReader", "PythonReader", "ShardsIDReader", "CommandReader" or None (If no reader is defined).

Return type

str

property supports_batch_mode: bool

Indicates if the function can run in Batch-mode, by closing it with a run action.

Returns

True if the function supports batch mode, False if not.

Return type

bool

property supports_event_mode: bool

Indicates if the function can run in Event-mode, by closing it with a register action.

Returns

True if the function supports event mode, False if not.

Return type

bool

Now we are finally ready to start building some Gear Functions.


Courtesy of : Lyngon Pte. Ltd.