ms_graph_exporter.celery package¶
Submodules¶
ms_graph_exporter.celery.app module¶
Module defines Celery app.
Configure app with additional user options available for the Celery worker.
-
class
ms_graph_exporter.celery.app.CustomArgs(worker, **options)[source]¶ Bases:
celery.bootsteps.StepBootstep to handle custom options and setup of the Celery worker.
-
__init__(worker, **options)[source]¶ Initialize class instance.
Passes User Options defined by the worker to
GraphApiTaskbase class and schedules periodic data extraction from MS Graph API into the post-processing queue.Note
By default, no config file is expected by the app. All options are taken either from the command-line interface (CLI) or the execution environment (ENV).
Config file takes precedence over the CLI, which in turn takes precedence over ENV.
-
name= 'ms_graph_exporter.celery.app.CustomArgs'¶
-
ms_graph_exporter.celery.config module¶
Module defines Celery configuration objects.
-
class
ms_graph_exporter.celery.config.CeleryConfigBase[source]¶ Bases:
objectBaseline Celery app configuration.
-
beat_max_loop_interval= 300¶
-
beat_scheduler= 'redbeat.RedBeatScheduler'¶
-
include= ('ms_graph_exporter.celery.tasks',)¶
-
redbeat_key_prefix= 'ms_graph_exporter'¶
-
redbeat_lock_timeout= 1500¶
-
task_acks_late= True¶
-
task_ignore_result= True¶
-
task_routes= {'graph.fetch_stream': {'queue': 'graph.fetch'}, 'graph.map_streams': {'queue': 'graph.map'}, 'graph.store_records': {'queue': 'graph.store'}}¶
-
timezone= 'UTC'¶
-
worker_prefetch_multiplier= 1¶
-
-
class
ms_graph_exporter.celery.config.CeleryConfigDev[source]¶ Bases:
ms_graph_exporter.celery.config.CeleryConfigBaseCelery app configuration (DEV).
-
broker_url= 'redis://localhost:6379/0'¶
-
-
class
ms_graph_exporter.celery.config.CeleryConfigProd[source]¶ Bases:
ms_graph_exporter.celery.config.CeleryConfigBaseCelery app configuration (PROD).
-
beat_max_loop_interval= 5¶
-
broker_url= 'redis://redis:6379/0'¶
-
redbeat_lock_timeout= 15¶
-
-
class
ms_graph_exporter.celery.config.CeleryConfigTest[source]¶ Bases:
ms_graph_exporter.celery.config.CeleryConfigBaseCelery app configuration (TEST).
-
broker_url= 'redis://localhost:6379/0'¶
-
task_always_eager= True¶
-
ms_graph_exporter.celery.graph_api_base module¶
Module implements base class GraphApiTask for data extraction Celery tasks.
GraphApiTask defines Redis and MS Graph API clients as attributes to be shared
by all tasks within a Celery worker.
-
class
ms_graph_exporter.celery.graph_api_base.GraphApiTask[source]¶ Bases:
celery.app.task.TaskBase class for MS Graph API data extraction Celery tasks.
Allows to share instances of Redis connection pool
BlockingConnectionPoolandMsGraphbetween all tasks executed within a single Celery worker.- Variables
_config (
Dict[str,Any]) – Dictionary holding configuration parameters that are initialized by the worker on startup._logger (
Logger) – Channel to be used for log output specific to the worker._ms_graph (
MsGraph) – Ms Graph API client instance to be used for queries._redis_pool (
Redis) – Connection pool to be used by Redis clients for response data queuing.
-
_records_to_redis_naive(records)[source]¶ Push records to Redis.
Push
recordsinto a Redis list or publish in a channel. Does not apply any optimizations. Yields control to Gevent Hub at each iteration.
-
_records_to_redis_pipe(records)[source]¶ Push records to Redis with pipelining.
Push
recordsinto a list or publish in a channel. Utilize pipelining to minimize connection overhead. Yields control to Gevent Hub at each iteration.
-
classmethod
config_update(**options)[source]¶ Update internal configuration.
Accept arbitrary set of keyword arguments and update
_configdict with all values matching keys defined there initially.- Parameters
**options – Set of arbitrary keyword arguments used to update
_config.- Return type
None
-
classmethod
config_update_from_file(config_file=None)[source]¶ Update internal configuration from file.
Read YAML
config_fileand update_configdict with all values matching keys defined there initially.
-
ignore_result= True¶
-
priority= None¶
-
rate_limit= None¶
-
property
redis_client¶ Provide an instance of Redis client.
- Return type
Redis
-
property
redis_conn_pool¶ Provide an instance of Redis connection pool.
- Return type
ConnectionPool
-
redis_rm_queue()[source]¶ Delete Redis queue storing data records.
- Returns
Flag indicating success or failure of the operation.
- Return type
-
reject_on_worker_lost= None¶
-
request_stack= <celery.utils.threads._LocalStack object>¶
-
serializer= 'json'¶
-
store_errors_even_if_ignored= False¶
-
task_fetch_slice(slice_start=None, slice_end=None)[source]¶ Fetch a time slice of records.
Request time-domain data slice
[slice_start - slice_end]- Parameters
- Return type
-
task_get_time_slices(timestamp=None)[source]¶ Calculate time slices from the point in time.
Based on available configuration, generates a series of consecutive time slices (streams) going back in time from
timestamp.- Parameters
timestamp (
Optional[datetime]) – Point in time from which to calculate time slices. Microseconds are zeroed andtimestampis rounded up to a second. If not defined, uses current time.
Note
Following
_configoptions influence the result:timelag- seconds to shift back fromtimestampadjusting the whole time-frame for the series.streams- Number of time slices to generate.stream_frame- Size of each time slice in seconds.
So, with the
timestamp=...-07T22:02:53.123,timelag=60,streams=3andstream_frame=10following series is generated:1. [...-07T22:01:44 - ...-07T22:01:53] 2. [...-07T22:01:34 - ...-07T22:01:43] 3. [...-07T22:01:24 - ...-07T22:01:33]
-
task_redis_push_multi_imap(records, push_mode='pipe')[source]¶ Push records to Redis in parallel.
Split
recordslist into smaller data sets to be pushed to Redis with multiple Greenlets. Parameterpush_modedefines which data transfer strategy to use.Note
See the note for
task_redis_push_multi_spawn()for detailed explanation on parameterpush_mode.
-
task_redis_push_multi_spawn(records, push_mode='pipe')[source]¶ Push records to Redis in parallel.
Split
recordslist into smaller data sets to be pushed to Redis with multiple Greenlets.Note
Parameter
push_modedefines which data transfer strategy to use by providing the suffix for specific_records_to_redis_*method to be called. Currently acceptsnaiveorpipe.
-
task_redis_push_single(records, push_mode='pipe')[source]¶ Push records to Redis in a single batch.
Note
See the note for
task_redis_push_multi_spawn()for detailed explanation on parameterpush_mode.
-
track_started= False¶
-
typing= True¶
ms_graph_exporter.celery.tasks module¶
Module defines parallel data retrieval tasks.
Tasks use GraphApiTask
as a base for interaction with MS Graph API and Redis.
-
(task)
ms_graph_exporter.celery.tasks.fetch_stream(slice_start, slice_end)¶ Get a slice of records.
Request time-domain data slice
[slice_start - slice_end]and paginate through the response passing each page of records to a dedicated queuing task.
-
(task)
ms_graph_exporter.celery.tasks.map_streams¶ Split extraction into streams.
When invoked by the scheduler, splits past time-frame between previous and current invocation into smaller time slices (streams) and initiates parallel data requests for each slice.
Note
The task should be executed every
streams * stream_frameseconds in order to fully cover the elapsed time-frame between executions.- Return type
None
-
(task)
ms_graph_exporter.celery.tasks.store_records(records)¶ Store records.
Use Redis client to push
recordsinto a list or publish under a channel mentioned in the app config. Just log data, if the queue client is not instantiated.
Module contents¶
Package contains following modules.
ms_graph_exporter.celery.app- Celery app and worker customizations.ms_graph_exporter.celery.config- app config objects.ms_graph_exporter.celery.graph_api_base- base class for data extraction tasks.ms_graph_exporter.celery.tasks- parallel data retrieval tasks.