Source code for redgrease.client

# -*- coding: utf-8 -*-
"""
Client library module for Redis Gears servers, exposing the gears-specific commands
Can be instantiated in a few ways:

Examples:
    - As a redis client::

        import redgrease

        r = redgrease.RedisGears()  # Takes same arguments as redis.Redis
        r.gears.pyexecute(...)


    - As a separate Gears object, taking a redis.Redis as parameter::

        import redis
        import redgrease

        r = redis.Redis()
        g = redgrease.Gears(r)
        g.pyexecute(...)

"""
__author__ = "Anders Åström"
__contact__ = "anders@lyngon.com"
__copyright__ = "2021, Lyngon Pte. Ltd."
__licence__ = """The MIT License
Copyright © 2021 Lyngon Pte. Ltd.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
 software and associated documentation files (the “Software”), to deal in the Software
 without restriction, including without limitation the rights to use, copy, modify,
 merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
 permit persons to whom the Software is furnished to do so, subject to the following
 conditions:

The above copyright notice and this permission notice shall be included in all copies
 or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
 INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
 PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
 OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""

import ast
import fnmatch
import logging

# import warnings
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union

import redis

import redgrease.config
import redgrease.data
import redgrease.exceptions
import redgrease.gearialization
import redgrease.gears
import redgrease.reader
import redgrease.requirements
import redgrease.runtime
from redgrease.utils import (
    CaseInsensitiveDict,
    bool_ok,
    dict_of,
    list_parser,
    safe_bool,
    safe_str,
    to_dict,
    to_redis_type,
)

log = logging.getLogger(__name__)

ExecutionID = Union[bytes, str, redgrease.data.ExecID, redgrease.data.ExecutionInfo]
"""Type alias for valid execution identifiers"""

RegistrationID = Union[bytes, str, redgrease.data.ExecID, redgrease.data.Registration]
"""Type alias for valid registration identifiers"""


get_python_version = """
import sys

GearsBuilder("ShardsIDReader").map(lambda shard: tuple(sys.version_info)).run()
"""

get_gears_version = """
import redgrease

GearsBuilder("ShardsIDReader").map(lambda shard: redgrease.GEARS_RUNTIME or ()).run()
"""


class Gears:
    """Client class for Redis Gears commands.

    Attributes:
        redis (redis.Redis):
            Redis client / connection, used for the underlying communication with
            the server.

        config (redgrease.config.Config):
            Redis Gears Configuration 'client'
    """

    _RESPONSE_CALLBACKS: Dict[str, Callable] = {
        "RG.ABORTEXECUTION": bool_ok,
        "RG.CONFIGGET": dict_of(
            CaseInsensitiveDict(redgrease.config.Config.ValueTypes)
        ),
        "RG.CONFIGSET": lambda res: all(map(bool_ok, res)),
        "RG.DROPEXECUTION": bool_ok,
        "RG.DUMPEXECUTIONS": list_parser(redgrease.data.ExecutionInfo.from_redis),
        "RG.DUMPREGISTRATIONS": list_parser(redgrease.data.Registration.from_redis),
        "RG.GETEXECUTION": redgrease.data.ExecutionPlan.parse,
        "RG.GETRESULTS": redgrease.data.parse_execute_response,
        "RG.GETRESULTSBLOCKING": redgrease.data.parse_execute_response,
        "RG.INFOCLUSTER": redgrease.data.ClusterInfo.parse,
        "RG.PYEXECUTE": redgrease.data.parse_execute_response,
        "RG.PYSTATS": redgrease.data.PyStats.from_redis,
        "RG.PYDUMPREQS": list_parser(redgrease.data.PyRequirementInfo.from_redis),
        "RG.REFRESHCLUSTER": bool_ok,
        "RG.TRIGGER": redgrease.data.parse_trigger_response,
        "RG.UNREGISTER": bool_ok,
    }

    # NODES_FLAGS - (cluster only)
    # States how target node is selected for cluster commands:
    # - blocked : command is not allowed - Raises a RedisClusterException
    # - random : executed on one randomly selected node
    # - all-masters : executed on all master node
    # - all-nodes : executed on all nodes
    # - slot-id : executed on the node defined by the second argument
    _NODES_FLAGS = {
        "RG.INFOCLUSTER": "random",
        "RG.PYSTATS": "all-nodes",
        "RG.PYDUMPREQS": "random",
        "RG.REFRESHCLUSTER": "all-nodes",
        "RG.DUMPEXECUTIONS": "random",
        "RG.DUMPREGISTRATIONS": "random",
    }

    # RESULT_CALLBACKS - (cluster only)
    # Not to be confused with redis.Redis.RESPONSE_CALLBACKS
    # RESULT_CALLBACKS is special to rediscluster.RedisCluster.
    # It decides how results of commands defined in `NODES_FLAGS` are aggregated into
    # a final response, **after** redis.Redis.RESPONSE_CALLBACKS as been applied to
    # each response individually.
    _RESULT_CALLBACKS = {
        "RG.INFOCLUSTER": lambda _, res: next(iter(res.values())),
        "RG.PYSTATS": lambda _, res: res,
        "RG.PYDUMPREQS": lambda _, res: next(iter(res.values())),
        "RG.REFRESHCLUSTER": lambda _, res: all(res.values()),
    }

    def __init__(self, redis: redis.Redis):
        """Instantiate a Gears client object

        Args:
            redis (redis.Redis):
                redis.Redis client object, used for the underlying communication with
                the server.
        """
        for command, callback in Gears._RESPONSE_CALLBACKS.items():
            redis.set_response_callback(command, callback)

        # Node Flags is only set on `rediscluster.RedisCluster`
        node_flags: Dict = getattr(redis, "nodes_flags", dict())
        if node_flags:
            node_flags.update(Gears._NODES_FLAGS)
            setattr(redis, "node_flags", node_flags)

        # Result Callbacks is only set on `rediscluster.RedisCluster`
        result_callbacks: Dict = getattr(redis, "result_callbacks", dict())
        if result_callbacks:
            result_callbacks.update(Gears._RESULT_CALLBACKS),
            setattr(redis, "result_callbacks", result_callbacks)

        self._redis = redis
        self.config = redgrease.config.Config(redis)

        self._python_version: Optional[Tuple] = None
        self._gears_version: Optional[Tuple] = None

    def _ping(self) -> bool:
        """Test server liveness/connectivity

        Returns:
            bool:
                True if, and only if, the Redis server is responsive and is running the
                gears module
        """
        return self.config.MaxExecutions > 0

    def _trigger_proxy(self, trigger):
        def trigger_function(*args):
            return self.trigger(trigger, *args)

        return redgrease.data.ExecutionResult(trigger_function)

[docs] def abortexecution(self, id: ExecutionID) -> bool: """Abort the execution of a function mid-flight Args: id (Union[redgrease.data.ExecutionInfo, redgrease.data.ExecID, bytes, str]): The execution id to abort Returns: bool: True or an error if the execution does not exist or had already finished. """ if isinstance(id, redgrease.data.ExecutionInfo): id = id.executionId return self._redis.execute_command("RG.ABORTEXECUTION", to_redis_type(id))
[docs] def dropexecution(self, id: ExecutionID) -> bool: """Remove the execution of a function from the executions list. Args: id (Union[redgrease.data.ExecutionInfo, redgrease.data.ExecID, bytes, str]): Execution ID to remove Returns: bool: True if successful, or an error if the execution does not exist or is still running. """ if isinstance(id, redgrease.data.ExecutionInfo): id = id.executionId return self._redis.execute_command("RG.DROPEXECUTION", to_redis_type(id))
[docs] def dumpexecutions( self, status: Union[str, redgrease.data.ExecutionStatus] = None, registered: bool = None, ) -> List[redgrease.data.ExecutionInfo]: """Get list of function executions. The executions list's length is capped by the 'MaxExecutions' configuration option. Args: status (Union[str, redgrease.data.ExecutionStatus], optional): Only return executions that match this status. Either: "created", "running", "done", "aborted", "pending_cluster", "pending_run", "pending_receive" or "pending_termination". Defaults to None. registered (bool, optional): If `True`, only return registered executions. If `False`, only return non-registered executions. Defaults to None. Returns: List[redgrease.data.ExecutionInfo]: A list of ExecutionInfo, with an entry per execution. """ executions: List[redgrease.data.ExecutionInfo] = [] executions = self._redis.execute_command("RG.DUMPEXECUTIONS") if status or registered is not None: filtered_executions = [] for exe in executions: if status and safe_str(status) != safe_str(exe.status): continue if registered is not None and registered != safe_bool(exe.registered): continue filtered_executions.append(exe) executions = filtered_executions return executions
[docs] def dumpregistrations( self, reader: str = None, desc: str = None, mode: str = None, key: str = None, stream: str = None, trigger: str = None, ) -> List[redgrease.data.Registration]: """Get list of function registrations. Args: reader (str, optional): Only return registrations of this reader type. E.g: "StreamReader" Defaults to None. desc (str, optional): Only return registrations, where the description match this pattern. E.g: "transaction*log*" Defaults to None. mode (str, optional): Only return registrations, in this mode. Either "async", "async_local" or "sync". Defaults to None. key (str, optional): Only return (KeysReader) registrations, where the key pattern match this key. Defaults to None. stream (str, optional): Only return (StreamReader) registrations, where the stream pattern match this key. Defaults to None. trigger (str, optional): Only return (CommandReader) registrations, where the trigger pattern match this key. Defaults to None. Returns: List[redgrease.data.Registration]: A list of Registration, with one entry per registered function. """ registrations: List[redgrease.data.Registration] = [] registrations = self._redis.execute_command("RG.DUMPREGISTRATIONS") if reader or desc or mode or key or stream or trigger: filtered_registrations = [] for reg in registrations: if trigger and ( "trigger" not in reg.RegistrationData.args and safe_str(reg.RegistrationData.args["trigger"]) == trigger ): continue if stream and not fnmatch.fnmatch( stream, safe_str(reg.RegistrationData.args.get("stream", "")) ): continue if key and fnmatch.fnmatch( key, safe_str(reg.RegistrationData.args.get("regex", "")) ): continue if reader and reader != reg.reader: continue if desc and reg.desc and not fnmatch.fnmatch(safe_str(reg.desc), desc): continue if mode and mode != reg.RegistrationData.mode: continue filtered_registrations.append(reg) registrations = filtered_registrations return registrations
[docs] def getexecution( self, id: ExecutionID, locality: Optional[redgrease.data.ExecLocality] = None, ) -> Mapping[bytes, redgrease.data.ExecutionPlan]: """Get the execution plan details for a function in the execution list. Args: id (Union[redgrease.data.ExecutionInfo, redgrease.data.ExecID, bytes, str]): Execution identifier for the function to fetch execution plan for. locality (Optional[redgrease.data.ExecLocality], optional): Set to 'Shard' to get only local execution plan and set to 'Cluster' to collect executions from all shards. Defaults to 'Shard' in stand-alone mode, but "Cluster" in cluster mode. Returns: Mapping[bytes, redgrease.data.ExecutionPlan]: A dict, mapping cluster ID to ExecutionPlan """ if isinstance(id, redgrease.data.ExecutionInfo): id = id.executionId loc = [] if locality is None else [safe_str(locality).upper()] return self._redis.execute_command("RG.GETEXECUTION", to_redis_type(id), *loc)
[docs] def getresults( self, id: ExecutionID, ) -> redgrease.data.ExecutionResult: """Get the results of a function in the execution list. Args: id (Union[redgrease.data.ExecutionInfo, redgrease.data.ExecID, bytes, str]): Execution identifier for the function to fetch the output for. Returns: redgrease.data.ExecutionResult: Results and errors from the gears function, if, and only if, execution exists and is completed. Raises: redis.exceptions.ResponseError: If the the execution does not exist or is still running """ if isinstance(id, redgrease.data.ExecutionInfo): id = id.executionId return self._redis.execute_command("RG.GETRESULTS", to_redis_type(id))
def getresultsblocking(self, id: ExecutionID) -> redgrease.data.ExecutionResult: """Get the results and errors from the execution details of a function. If the execution is not finished, the call is blocked until execution ends. Args: id (Union[redgrease.data.ExecutionInfo, redgrease.data.ExecID, bytes, str]): Execution identifier for the function to fetch the results and errors for. Returns: redgrease.data.ExecutionResult: Results and errors from the gears function if the execution exists. Raises: redis.exceptions.ResponseError: If the the execution does not exist. """ if isinstance(id, redgrease.data.ExecutionInfo): id = id.executionId return self._redis.execute_command("RG.GETRESULTSBLOCKING", to_redis_type(id))
[docs] def infocluster(self) -> redgrease.data.ClusterInfo: """Gets information about the cluster and its shards. Returns: redgrease.data.ClusterInfo: Cluster information or None if not in cluster mode. """ return self._redis.execute_command("RG.INFOCLUSTER")
[docs] def pyexecute( self, gear_function: Union[ str, redgrease.runtime.GearsBuilder, redgrease.gears.GearFunction ] = "", unblocking=False, requirements: Optional[ Iterable[Union[str, redgrease.requirements.Requirement]] ] = None, enforce_redgrease: redgrease.requirements.PackageOption = None, ) -> redgrease.data.ExecutionResult: """Execute a gear function. Args: gear_function (Union[str, redgrease.gears.GearFunction], optional): Function to execute. Either: - A :ref:`raw function string <exe_gear_function_str>` containing a clear-text serialized Gears Python function as per the `examples in the official documentation <https://oss.redislabs.com/redisgears/intro.html#the-simplest-example>`_. - A :ref:`script file path <exe_gear_function_file>`. - A :ref:`GearFunction object <exe_gear_function_obj>`, e.g. :class:`.GearsBuilder` or either of the :ref:`gearfun_readers` types. .. note:: * Python version must match the Gear runtime. * If the function is not "closed" with a :meth:`run <.OpenGearFunction.run>` or :meth:`register <.OpenGearFunction.register>` operation, an ``run()`` operation without additional arguments will be assumed, and automatically added to the function to close it. * The default for ``enforce_redgrease`` is ``True``. Defaults to ``""``, i.e. no function. unblocking (bool, optional): Execute function without waiting for it to finish before returning. Defaults to ``False``. I.e. block until the function returns or fails. requirements (Iterable[Union[None, str, redgrease.requirements.Requirement]], optional): List of 3rd party package requirements needed to execute the function on the server. Defaults to ``None``. enforce_redgrease (redgrease.requirements.PackageOption, optional): Indicates if redgrease runtime package requirement should be added or not, and potentially which version and/or extras or source. It can take several optional types: - ``None`` : No enforcement. Requirements are passed through, with or without 'redgrease' runtime package. - ``True`` : Enforces latest ``"redgrease[runtime]"`` package on PyPi, - ``False`` : Enforces that *redgrease* is **not** in the requirements list, any *redgrease* requirements will be removed from the function's requirements. Note that it will **not** force *redgrease* to be uninstalled from the server runtime. - Otherwise, the argument's string-representation is evaluated, and interpreted as either: a. A specific version. E.g. ``"1.2.3"``. b. A version qualifier. E.g. ``">=1.0.0"``. c. Extras. E.g. ``"all"`` or ``"runtime"``. Will enforce the latest version on PyPi, with this/these extras. d. Full requirement qualifier or source. E.g: ``"redgrease[all]>=1.2.3"`` or ``"redgrease[runtime]@git+https://github.com/lyngon/redgrease.git@main"`` Defaults to ``None`` when ``gear_function`` is a :ref:`script file path <exe_gear_function_file>` or a :ref:`raw function string <exe_gear_function_str>`, but ``True`` when it is a :ref:`GearFunction object <exe_gear_function_obj>`. Returns: redgrease.data.ExecutionResult: The returned :class:`ExecutionResult <.data.ExecutionResult>` has two properties: ``value`` and ``errors``, containing the result value and any potential errors, respectively. The value contains the result of the function, unless: - When used in 'unblocking' mode, the value is set to the execution ID - If the function has no output (i.e. it is closed by `register()` or for some other reason is not closed by a `run()` action), the value is True (boolean). Any errors generated by the function are accumulated in the ``errors`` property, as a list. .. note:: The :class:`ExecutionResult <redgrease.data.ExecutionResult>` object itself behaves *for the most part* like its contained ``value``, so for many simple operations, such as checking `True-ness`, result length, iterate results etc, it can be used directly. But the the safest was to get the actual result is by means of the ``value`` property. Raises: redis.exceptions.ResponseError: If the function cannot be parsed. """ # noqa requirements = set(requirements if requirements else []) function_string, ctx = redgrease.gearialization.get_function_string( gear_function ) params = [] if unblocking: params.append("UNBLOCKING") # Resolve requirement conflicts, remove duplicates requirements = redgrease.requirements.resolve_requirements( requirements.union(ctx.get("requirements", set())), enforce_redgrease=ctx.get("enforce_redgrease", enforce_redgrease), ) if requirements: params.append("REQUIREMENTS") params += list(map(str, requirements)) try: command_response = self._redis.execute_command( "RG.PYEXECUTE", function_string, *params, ) except redis.exceptions.ResponseError as ex: ex.args = ast.literal_eval(ex.args[0]) if "trigger already registered" in ex.args[-1]: raise redgrease.exceptions.DuplicateTriggerError() from ex raise if command_response and "trigger" in ctx: return self._trigger_proxy(ctx["trigger"]) return command_response
[docs] def pystats(self) -> redgrease.data.PyStats: """Gets memory usage statisticy from the Python interpreter Returns: redgrease.data.PyStats: Python interpretere memory statistics, including total, peak and current amount of allocated memory, in bytes. """ return self._redis.execute_command("RG.PYSTATS")
[docs] def pydumpreqs( self, name: str = None, is_downloaded: bool = None, is_installed: bool = None ) -> List[redgrease.data.PyRequirementInfo]: """Gets all the python requirements available (with information about each requirement). Args: name (str, optional): Only return packages with this **base name**. I.e. it is not filtering on version number, extras etc. Defaults to None. is_downloaded (bool, optional): If `True`, only return requirements that have been downloaded. If `False`, only return requirements that have NOT been downloaded. Defaults to None. is_installed (bool, optional): If `True`, only return requirements that have been installed. If `False`, only return requirements that have NOT been installed. Defaults to None. Returns: List[redgrease.data.PyRequirementInfo]: List of Python requirement information objects. """ requirements: List[redgrease.data.PyRequirementInfo] = [] requirements = self._redis.execute_command("RG.PYDUMPREQS") if name or is_downloaded is not None or is_installed is not None: filtered_requirements = [] for req in requirements: if name and not redgrease.requirements.same_name(name, req.Name): continue if is_downloaded is not None and is_downloaded != safe_bool( req.IsDownloaded ): continue if is_installed is not None and is_installed != safe_bool( req.IsInstalled ): continue filtered_requirements.append(req) requirements = filtered_requirements return requirements
[docs] def refreshcluster(self) -> bool: """Refreshes the local node's view of the cluster topology. Returns: bool: True if successful. Raises: redis.exceptions.ResponseError: If not successful """ return self._redis.execute_command("RG.REFRESHCLUSTER")
[docs] def trigger(self, trigger_name: str, *args) -> List[Any]: """Trigger the execution of a registered 'CommandReader' function. Args: trigger_name (str): The registered 'trigger' name of the function *args (Any): Any additional arguments to the trigger Returns:6 List: A list of the functions output records. """ return self._redis.execute_command("RG.TRIGGER", safe_str(trigger_name), *args)
[docs] def unregister(self, id: RegistrationID) -> bool: """Removes the registration of a function Args: id (Union[redgrease.data.Registration, redgrease.data.ExecID, bytes, str]): Execution identifier for the function to unregister. Returns: bool: True if successful Raises: redis.exceptions.ResponseError: If the registration ID doesn't exist or if the function's reader doesn't support the unregister operation. """ if isinstance(id, redgrease.data.Registration): id = id.id return self._redis.execute_command("RG.UNREGISTER", to_redis_type(id))
def python_version(self) -> Optional[Tuple]: if self._python_version is None: ver = self.pyexecute(get_python_version) if isinstance(ver, list): ver = ver[0] self._python_version = ast.literal_eval(safe_str(ver)) return self._python_version def gears_version(self) -> Optional[Tuple]: if self._gears_version is None: module_list = [ to_dict( mod, key_transform=safe_str, val_transform=safe_str, ) for mod in self._redis.execute_command("MODULE", "LIST") ] for module_info in module_list: if module_info.get("name", None) == "rg": try: ver_int = int(module_info["ver"]) major, minor_build = divmod(ver_int, 10000) minor, build = divmod(minor_build, 100) self._gears_version = (major, minor, build) except (KeyError, ValueError): pass # ver = self.pyexecute(get_gears_version, enforce_redgrease=True) # if isinstance(ver, list): # ver = ver[0] # self._gears_version = ast.literal_eval(safe_str(ver)) return self._gears_version class RedisGearsModule: def __init__(self, **kwargs) -> None: self.connection = None self._gears = None @property def gears(self): """Gears client, exposing gears commands Returns: Gears: Gears client """ if not self._gears: self._gears = Gears(self) return self._gears class RedisModules(RedisGearsModule): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) def module_list(self): return [ to_dict( mod, key_transform=safe_str, val_transform=safe_str, ) for mod in self.execute_command("MODULE", "LIST") ] class Redis(redis.Redis, RedisModules): def __init__( self, host="localhost", port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding="utf-8", encoding_errors="strict", charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs="required", ssl_ca_certs=None, ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None, **kwargs, ): redis.Redis.__init__( self, host=host, port=port, db=db, password=password, socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout, socket_keepalive=socket_keepalive, socket_keepalive_options=socket_keepalive_options, connection_pool=connection_pool, unix_socket_path=unix_socket_path, encoding=encoding, encoding_errors=encoding_errors, charset=charset, errors=errors, decode_responses=decode_responses, retry_on_timeout=retry_on_timeout, ssl=ssl, ssl_keyfile=ssl_keyfile, ssl_certfile=ssl_certfile, ssl_cert_reqs=ssl_cert_reqs, ssl_ca_certs=ssl_ca_certs, ssl_check_hostname=ssl_check_hostname, max_connections=max_connections, single_connection_client=single_connection_client, health_check_interval=health_check_interval, client_name=client_name, username=username, **kwargs, ) RedisModules.__init__(self) try: # If the `rediscluster` package is present, then # define `RedisCluster` as `rediscluster.RedisCluster` with `RedisModules` import rediscluster import rediscluster.exceptions class RedisCluster(rediscluster.RedisCluster, RedisModules): def __init__( self, host=None, port=None, startup_nodes=None, max_connections=None, max_connections_per_node=False, init_slot_cache=True, readonly_mode=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, connection_class=None, read_from_replicas=False, cluster_down_retry_attempts=3, host_port_remap=None, **kwargs, ): self.connection = None rediscluster.RedisCluster.__init__( self, host=host, port=port, startup_nodes=startup_nodes, max_connections=max_connections, max_connections_per_node=max_connections_per_node, init_slot_cache=init_slot_cache, readonly_mode=readonly_mode, reinitialize_steps=reinitialize_steps, skip_full_coverage_check=skip_full_coverage_check, nodemanager_follow_cluster=nodemanager_follow_cluster, connection_class=connection_class, read_from_replicas=read_from_replicas, cluster_down_retry_attempts=cluster_down_retry_attempts, host_port_remap=host_port_remap, **kwargs, ) RedisModules.__init__(self) except ModuleNotFoundError as mnf_err: # If the `rediscluster` package is NOT present, then # define `RedisCluster` as a class that throws a `ModuleNotFoundError`. ex = mnf_err class RedisCluster: # type: ignore def __init__(self, *args, **kwargs) -> None: raise ex # Redis with Modules def RedisMods( host=None, port=None, db=None, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding=None, encoding_errors=None, charset=None, errors=None, decode_responses=None, retry_on_timeout=None, ssl=None, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None, ssl_check_hostname=None, max_connections=None, single_connection_client=None, health_check_interval=None, client_name=None, username=None, # Redis Cluster startup_nodes=None, max_connections_per_node=None, init_slot_cache=None, readonly_mode=None, reinitialize_steps=None, skip_full_coverage_check=None, nodemanager_follow_cluster=None, connection_class=None, read_from_replicas=None, cluster_down_retry_attempts=None, host_port_remap=None, # Catchall **kwargs, ) -> Redis: default_args = { "db": db, "password": password, "socket_timeout": socket_timeout, "socket_connect_timeout": socket_connect_timeout, "socket_keepalive": socket_keepalive, "socket_keepalive_options": socket_keepalive_options, "connection_pool": connection_pool, "unix_socket_path": unix_socket_path, "encoding": encoding, "encoding_errors": encoding_errors, "charset": charset, "errors": errors, "decode_responses": decode_responses, "retry_on_timeout": retry_on_timeout, "ssl": ssl, "ssl_keyfile": ssl_keyfile, "ssl_certfile": ssl_certfile, "ssl_cert_reqs": ssl_cert_reqs, "ssl_ca_certs": ssl_ca_certs, "ssl_check_hostname": ssl_check_hostname, "max_connections": max_connections, "single_connection_client": single_connection_client, "health_check_interval": health_check_interval, "client_name": client_name, "username": username, } cluster_args = { "startup_nodes": startup_nodes, "max_connections_per_node": max_connections_per_node, "init_slot_cache": init_slot_cache, "readonly_mode": readonly_mode, "reinitialize_steps": reinitialize_steps, "skip_full_coverage_check": skip_full_coverage_check, "nodemanager_follow_cluster": nodemanager_follow_cluster, "connection_class": connection_class, "read_from_replicas": read_from_replicas, "cluster_down_retry_attempts": cluster_down_retry_attempts, "host_port_remap": host_port_remap, } if host is None and startup_nodes is None: host = "localhost" try: return RedisCluster( host=host, port=port, **{ **{k: v for k, v in cluster_args.items() if v is not None}, **{k: v for k, v in default_args.items() if v is not None}, **kwargs, }, ) except ( rediscluster.exceptions.RedisClusterException, ModuleNotFoundError, ): if port is None: port = 6379 return Redis( host=host, port=port, **{**{k: v for k, v in default_args.items() if v is not None}, **kwargs}, ) # Deprecated def RedisGears(*args, **kwargs): """Deprecated Use RedisMods instead """ # warnings.warn( # """Instantiation using `RedisGears` will be deprecated. # Please use `RedisMods` instead.""", # DeprecationWarning, # stacklevel=2, # ) return RedisMods(*args, **kwargs)