Source code for redgrease.config

# -*- coding: utf-8 -*-
"""
Redis Gears Config
"""
__author__ = "Anders Åström"
__contact__ = "anders@lyngon.com"
__copyright__ = "2021, Lyngon Pte. Ltd."
__licence__ = """The MIT License
Copyright © 2021 Lyngon Pte. Ltd.

Permission is hereby granted, free of charge, to any person obtaining a copy of this
 software and associated documentation files (the “Software”), to deal in the Software
 without restriction, including without limitation the rights to use, copy, modify,
 merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
 permit persons to whom the Software is furnished to do so, subject to the following
 conditions:

The above copyright notice and this permission notice shall be included in all copies
 or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
 INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
 PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
 CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
 OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""


import logging
from typing import Any, Dict, Union

import redis

from redgrease.typing import Constructor
from redgrease.utils import safe_str, to_list

log = logging.getLogger(__name__)


[docs]class Config: """Redis Gears Config""" ValueTypes: Dict[str, Constructor] = { "MaxExecutions": int, "MaxExecutionsPerRegistration": int, "ProfileExecutions": bool, "PythonAttemptTraceback": bool, "DownloadDeps": bool, "DependenciesUrl": safe_str, "DependenciesSha256": safe_str, "PythonInstallationDir": safe_str, "CreateVenv": bool, "ExecutionThreads": int, "ExecutionMaxIdleTime": int, "PythonInstallReqMaxIdleTime": int, "SendMsgRetries": int, } """Mapping from config name to its corresponding value type, transformer or constructor. """ __slots__ = "redis" def __init__(self, redis: redis.Redis): """Instantiate a Redis Gears Confg object Args: redis (redis.Redis): Redis client / connection for underlying communication. """ self.redis = redis
[docs] def get(self, *config_option: Union[bytes, str]) -> Dict[Union[bytes, str], Any]: """Get the value of one or more built-in configuration or a user-defined options. Args: *config_option (Union[bytes, str]): One or more names/key of configurations to get. Returns: Dict[Union[bytes, str], Any]: Dict of the requested config options mapped to their corresponding values. """ if config_option: conf = [safe_str(c) for c in config_option] else: conf = list(Config.ValueTypes.keys()) return self.redis.execute_command("RG.CONFIGGET", *conf, keys=conf)
[docs] def set(self, config_dict=None, **config_setting) -> bool: """Set a value of one ore more built-in configuration or a user-defined options. This function offers two methods of providing the keys and values to set; either as a mapping/dict or b key-word arguments. Args: config_dict (Mapping[str, Any]): Mapping / dict of config values to set. **config_setting (Any): Key-word arguments to set as config values. Returns: bool: `True` if all was successful, `False` oterwise """ settings = {} if config_dict: settings.update(config_dict) if config_setting: settings.update(config_setting) return self.redis.execute_command("RG.CONFIGSET", *to_list(settings))
[docs] def get_single(self, config_option: Union[bytes, str]): """Get a single config value. Args: config_option (str): Name of the config to get. Returns: Any: The value of the config option. """ return self.get(config_option)[config_option]
@property def MaxExecutions(self) -> int: """Get the current value for the `MaxExecutions` config option. The MaxExecutions configuration option controls the maximum number of executions that will be saved in the executions list. Once this threshold value is reached, older executions will be deleted from the list by order of their creation (FIFO). Only executions that had finished (e.g. the 'done' or 'aborted' status ) are deleted. """ return self.get_single("MaxExecutions") @MaxExecutions.setter def MaxExecutions(self, value: int) -> None: self.set(MaxExecutions=value) @property def MaxExecutionsPerRegistration(self) -> int: """Get the current value for the `MaxExecutionsPerRegistration` config option. The MaxExecutionsPerRegistration configuration option controls the maximum number of executions that are saved in the list per registration. Once this threshold value is reached, older executions for that registration will be deleted from the list by order of their creation (FIFO). Only executions that had finished (e.g. the 'done' or 'aborted' status ) are deleted. """ return self.get_single("MaxExecutionsPerRegistration") @MaxExecutionsPerRegistration.setter def MaxExecutionsPerRegistration(self, value: int) -> None: self.set(MaxExecutionsPerRegistration=value) @property def ProfileExecutions(self) -> bool: """Get the current value for the `ProfileExecutions` config option. The ProfileExecutions configuration option controls whether executions are profiled. **Note: Profiling impacts performance** Profiling requires reading the server's clock, which is a costly operation in terms of performance. Execution profiling is recommended only for debugging purposes and should be disabled in production. """ return self.get_single("ProfileExecutions") @ProfileExecutions.setter def ProfileExecutions(self, value: bool) -> None: self.set(ProfileExecutions=value) @property def PythonAttemptTraceback(self) -> bool: """Get the current value for the `PythonAttemptTraceback` config option. The PythonAttemptTraceback configuration option controls whether the engine tries producing stack traces for Python runtime errors. """ return self.get_single("PythonAttemptTraceback") @PythonAttemptTraceback.setter def PythonAttemptTraceback(self, value: bool) -> None: self.set(PythonAttemptTraceback=value) @property def DownloadDeps(self) -> bool: """Get the current value for the `DownloadDeps` config option. The DownloadDeps configuration option controls whether or not RedisGears will attempt to download missing Python dependencies. """ return self.get_single("DownloadDeps") # @DownloadDeps.setter # def DownloadDeps(self, value): # self.set(DownloadDeps=value) @property def DependenciesUrl(self) -> str: """Get the current value for the `DependenciesUrl` config option. The DependenciesUrl configuration option controls the location from which RedisGears tries to download its Python dependencies. """ return self.get_single("DependenciesUrl") # @DependenciesUrl.setter # def DependenciesUrl(self, value): # self.set(DependenciesUrl=value) @property def DependenciesSha256(self) -> str: """Get the current value for the `DependenciesSha256` config option. The DependenciesSha256 configuration option specifies the SHA265 hash value of the Python dependencies. This value is verified after the dependencies have been downloaded and will stop the server's startup in case of a mismatch. """ return self.get_single("DependenciesSha256") # @DependenciesSha256.setter # def DependenciesSha256(self, value): # self.set(DependenciesSha256=value) @property def PythonInstallationDir(self): """Get the current value for the `PythonInstallationDir` config option. The PythonInstallationDir configuration option specifies the path for RedisGears' Python dependencies. """ return self.get_single("PythonInstallationDir") # @PythonInstallationDir.setter # def PythonInstallationDir(self, value): # self.set(PythonInstallationDir=value) @property def CreateVenv(self): """Get the current value for the `CreateVenv` config option. The CreateVenv configuration option controls whether the engine will create a virtual Python environment. """ return self.get_single("CreateVenv") # @CreateVenv.setter # def CreateVenv(self, value): # self.set(CreateVenv=value) @property def ExecutionThreads(self): """Get the current value for the `ExecutionThreads` config option. The ExecutionThreads configuration option controls the number of threads that will run executions. """ return self.get_single("ExecutionThreads") # @ExecutionThreads.setter # def ExecutionThreads(self, value): # self.set(ExecutionThreads=value) @property def ExecutionMaxIdleTime(self): """Get the current value for the `ExecutionMaxIdleTime` config option. The ExecutionMaxIdleTime configuration option controls the maximal amount of idle time (in milliseconds) before execution is aborted. Idle time means no progress is made by the execution. The main reason for idle time is an execution that's blocked on waiting for records from another shard that had failed (i.e. crashed). In that case, the execution will be aborted after the specified time limit. The idle timer is reset once the execution starts progressing again. """ return self.get_single("ExecutionMaxIdleTime") @ExecutionMaxIdleTime.setter def ExecutionMaxIdleTime(self, value): self.set(ExecutionMaxIdleTime=value) @property def PythonInstallReqMaxIdleTime(self): """Get the current value for the `PythonInstallReqMaxIdleTime` config option. The PythonInstallReqMaxIdleTime configuration option controls the maximal amount of idle time (in milliseconds) before Python's requirements installation is aborted. Idle time means that the installation makes no progress. The main reason for idle time is the same as for ExecutionMaxIdleTime . """ return self.get_single("PythonInstallReqMaxIdleTime") @PythonInstallReqMaxIdleTime.setter def PythonInstallReqMaxIdleTime(self, value): self.set(PythonInstallReqMaxIdleTime=value) @property def SendMsgRetries(self): """Get the current value for the `SendMsgRetries` config option. The SendMsgRetries configuration option controls the maximum number of retries for sending a message between RedisGears' shards. When a message is sent and the shard disconnects before acknowledging it, or when it returns an error, the message will be resent until this threshold is met. Setting the value to 0 means unlimited retries. """ return self.get_single("SendMsgRetries") @SendMsgRetries.setter def SendMsgRetries(self, value): self.set(SendMsgRetries=value)