Source code for ms_graph_exporter.celery.app

# -*- coding: utf-8 -*-
"""Module defines Celery app.

Configure app with additional user options available for the Celery worker.
"""
# TODO:VERIFY:20190827:01
#   Is Gevent monkey-patching below necessary when using `celery worker --pool gevent` ?
#
# fmt: off
from gevent.monkey import patch_all; patch_all()  # noqa: E702
# fmt: on
from os import environ  # noqa: I100, I201

from celery import Celery
from celery import bootsteps
from celery.utils.log import get_logger

from redbeat import RedBeatSchedulerEntry

from ms_graph_exporter.celery.config import (
    CeleryConfigDev,
    CeleryConfigProd,
    CeleryConfigTest,
)


[docs]class CustomArgs(bootsteps.Step): """Bootstep to handle custom options and setup of the Celery worker."""
[docs] def __init__(self, worker, **options): """Initialize class instance. Passes User Options defined by the worker to :class:`GraphApiTask` base class and schedules periodic data extraction from MS Graph API into the post-processing queue. Note ---- By default, no config file is expected by the app. All options are taken either from the command-line interface (CLI) or the execution environment (ENV). Config file takes precedence over the CLI, which in turn takes precedence over ENV. """ from ms_graph_exporter.celery.graph_api_base import GraphApiTask GraphApiTask.config_update(**options) if options["graph_app_config"]: GraphApiTask.config_update_from_file() logger = get_logger(__name__) map_interval = ( GraphApiTask._config["graph_streams"] * GraphApiTask._config["graph_stream_frame"] ) entry = RedBeatSchedulerEntry( "periodic_map_streams", "graph.map_streams", map_interval, app=worker.app ) logger.info("adding schedule entry, %s", entry) entry.save()
[docs]def add_worker_arguments(parser): """Configure custom options for the Celery worker.""" parser.add_argument( "--app_config", dest="graph_app_config", type=str, default=environ.get("GRAPH_APP_CONFIG", ""), help="""YAML-based configuration file. By default, no config file is expected by the app. All options are taken either from the command-line interface (CLI) or the execution environment (ENV). Config file directives are the same as the CLI options listed below. Corresponding ENV options are upper-case names of CLI options prefixed with 'GRAPH_'. Config file takes precedence over the CLI, which in turn takes precedence over ENV. """, ) parser.add_argument( "--client_id", dest="graph_client_id", type=str, default=environ.get("GRAPH_CLIENT_ID", ""), help="ClientId of the Service Principal with access to MS Graph API.", ) parser.add_argument( "--client_secret", dest="graph_client_secret", type=str, default=environ.get("GRAPH_CLIENT_SECRET", ""), help="ClientSecret of the Service Principal with access to MS Graph API.", ) parser.add_argument( "--greenlets_count", dest="graph_greenlets_count", type=int, default=environ.get("GRAPH_GREENLETS_COUNT", 10), help="Maximum number of Greenlets to spawn for each data upload task.", ) parser.add_argument( "--page_size", dest="graph_page_size", type=int, default=environ.get("GRAPH_PAGE_SIZE", 50), help="Number of records to request from the MS Graph API in a single response.", ) parser.add_argument( "--queue_backend", dest="graph_queue_backend", type=str, choices=["redis", "log"], default=environ.get("GRAPH_QUEUE_BACKEND", "redis"), help="Backend type to store exported data.", ) parser.add_argument( "--queue_type", dest="graph_queue_type", type=str, choices=["list", "channel"], default=environ.get("GRAPH_QUEUE_TYPE", "list"), help="Storage queue implementation type.", ) parser.add_argument( "--queue_key", dest="graph_queue_key", type=str, default=environ.get("GRAPH_QUEUE_KEY", "ms_graph_exporter"), help="Name of the CHANNEL or LIST where extracted data is pushed.", ) parser.add_argument( "--redis_url", dest="graph_redis_url", type=str, default=environ.get("GRAPH_REDIS_URL", "redis://localhost:6379?db=0"), help="Connection string for Redis client.", ) group_redis_pool_block = parser.add_mutually_exclusive_group(required=False) group_redis_pool_block.add_argument( "--redis_pool_block", dest="graph_redis_pool_block", action="store_true", help="Enable threat-safe blocking in Redis connection pool.", ) group_redis_pool_block.add_argument( "--no-redis_pool_block", dest="graph_redis_pool_block", action="store_false", help="Disable threat-safe blocking blocking in Redis connection pool.", ) parser.set_defaults( graph_redis_pool_block=environ.get("GRAPH_REDIS_POOL_BLOCK", True) ) group_redis_pool_gevent_queue = parser.add_mutually_exclusive_group(required=False) group_redis_pool_gevent_queue.add_argument( "--redis_pool_gevent_queue", dest="graph_redis_pool_gevent_queue", action="store_true", help="Enable `gevent.queue.LifoQueue` usage in Redis connection pool.", ) group_redis_pool_gevent_queue.add_argument( "--no-redis_pool_gevent_queue", dest="graph_redis_pool_gevent_queue", action="store_false", help="Disable `gevent.queue.LifoQueue` usage in Redis connection pool.", ) parser.set_defaults( graph_redis_pool_gevent_queue=environ.get("GRAPH_REDIS_POOL_GEVENT_QUEUE", True) ) parser.add_argument( "--redis_pool_max_connections", dest="graph_redis_pool_max_connections", type=int, default=environ.get("GRAPH_REDIS_POOL_MAX_CONNECTIONS", 15), help="""Maximum number of reusable connections maintained by the connection pool of the Redis client.""", ), parser.add_argument( "--redis_pool_timeout", dest="graph_redis_pool_timeout", type=int, default=environ.get("GRAPH_REDIS_POOL_TIMEOUT", 1), help="""Time that blocking-enabled Redis client waits for connection to become available from the exhausted connection pool (seconds). Afterwards, raises Redis ConnectionError exception.""", ), parser.add_argument( "--streams", dest="graph_streams", type=int, default=environ.get("GRAPH_STREAMS", 2), help="Number of parallel streams to fetch time-domain data.", ), parser.add_argument( "--stream_frame", dest="graph_stream_frame", type=int, default=environ.get("GRAPH_STREAM_FRAME", 30), help="Time-domain size of the data request for each stream (seconds).", ), parser.add_argument( "--tenant", dest="graph_tenant", type=str, default=environ.get("GRAPH_TENANT", ""), help="""Azure AD tenant where the Service Principal with access rights to call the MS Graph API resides. """, ), parser.add_argument( "--timelag", dest="graph_timelag", type=int, default=environ.get("GRAPH_TIMELAG", 120), help="""Seconds to shift back the query time-frame for each of the periodic invocations of the parallelized extraction process.""", ),
celery_app: Celery = Celery() conf_type: str = environ.get("GRAPH_ENV", "development") if conf_type == "development": celery_app.config_from_object(CeleryConfigDev) elif conf_type == "production": celery_app.config_from_object(CeleryConfigProd) elif conf_type == "testing": celery_app.config_from_object(CeleryConfigTest) celery_app.user_options["worker"].add(add_worker_arguments) celery_app.steps["worker"].add(CustomArgs) if __name__ == "__main__": celery_app.start()