ms_graph_exporter.celery package¶
Submodules¶
ms_graph_exporter.celery.app module¶
Module defines Celery app.
Configure app with additional user options available for the Celery worker.
-
class
ms_graph_exporter.celery.app.
CustomArgs
(worker, **options)[source]¶ Bases:
celery.bootsteps.Step
Bootstep to handle custom options and setup of the Celery worker.
-
__init__
(worker, **options)[source]¶ Initialize class instance.
Passes User Options defined by the worker to
GraphApiTask
base class and schedules periodic data extraction from MS Graph API into the post-processing queue.Note
By default, no config file is expected by the app. All options are taken either from the command-line interface (CLI) or the execution environment (ENV).
Config file takes precedence over the CLI, which in turn takes precedence over ENV.
-
name
= 'ms_graph_exporter.celery.app.CustomArgs'¶
-
ms_graph_exporter.celery.config module¶
Module defines Celery configuration objects.
-
class
ms_graph_exporter.celery.config.
CeleryConfigBase
[source]¶ Bases:
object
Baseline Celery app configuration.
-
beat_max_loop_interval
= 300¶
-
beat_scheduler
= 'redbeat.RedBeatScheduler'¶
-
include
= ('ms_graph_exporter.celery.tasks',)¶
-
redbeat_key_prefix
= 'ms_graph_exporter'¶
-
redbeat_lock_timeout
= 1500¶
-
task_acks_late
= True¶
-
task_ignore_result
= True¶
-
task_routes
= {'graph.fetch_stream': {'queue': 'graph.fetch'}, 'graph.map_streams': {'queue': 'graph.map'}, 'graph.store_records': {'queue': 'graph.store'}}¶
-
timezone
= 'UTC'¶
-
worker_prefetch_multiplier
= 1¶
-
-
class
ms_graph_exporter.celery.config.
CeleryConfigDev
[source]¶ Bases:
ms_graph_exporter.celery.config.CeleryConfigBase
Celery app configuration (DEV).
-
broker_url
= 'redis://localhost:6379/0'¶
-
-
class
ms_graph_exporter.celery.config.
CeleryConfigProd
[source]¶ Bases:
ms_graph_exporter.celery.config.CeleryConfigBase
Celery app configuration (PROD).
-
beat_max_loop_interval
= 5¶
-
broker_url
= 'redis://redis:6379/0'¶
-
redbeat_lock_timeout
= 15¶
-
-
class
ms_graph_exporter.celery.config.
CeleryConfigTest
[source]¶ Bases:
ms_graph_exporter.celery.config.CeleryConfigBase
Celery app configuration (TEST).
-
broker_url
= 'redis://localhost:6379/0'¶
-
task_always_eager
= True¶
-
ms_graph_exporter.celery.graph_api_base module¶
Module implements base class GraphApiTask
for data extraction Celery tasks.
GraphApiTask
defines Redis and MS Graph API clients as attributes to be shared
by all tasks within a Celery worker.
-
class
ms_graph_exporter.celery.graph_api_base.
GraphApiTask
[source]¶ Bases:
celery.app.task.Task
Base class for MS Graph API data extraction Celery tasks.
Allows to share instances of Redis connection pool
BlockingConnectionPool
andMsGraph
between all tasks executed within a single Celery worker.- Variables
_config (
Dict
[str
,Any
]) – Dictionary holding configuration parameters that are initialized by the worker on startup._logger (
Logger
) – Channel to be used for log output specific to the worker._ms_graph (
MsGraph
) – Ms Graph API client instance to be used for queries._redis_pool (
Redis
) – Connection pool to be used by Redis clients for response data queuing.
-
_records_to_redis_naive
(records)[source]¶ Push records to Redis.
Push
records
into a Redis list or publish in a channel. Does not apply any optimizations. Yields control to Gevent Hub at each iteration.
-
_records_to_redis_pipe
(records)[source]¶ Push records to Redis with pipelining.
Push
records
into a list or publish in a channel. Utilize pipelining to minimize connection overhead. Yields control to Gevent Hub at each iteration.
-
classmethod
config_update
(**options)[source]¶ Update internal configuration.
Accept arbitrary set of keyword arguments and update
_config
dict with all values matching keys defined there initially.- Parameters
**options – Set of arbitrary keyword arguments used to update
_config
.- Return type
None
-
classmethod
config_update_from_file
(config_file=None)[source]¶ Update internal configuration from file.
Read YAML
config_file
and update_config
dict with all values matching keys defined there initially.
-
ignore_result
= True¶
-
priority
= None¶
-
rate_limit
= None¶
-
property
redis_client
¶ Provide an instance of Redis client.
- Return type
Redis
-
property
redis_conn_pool
¶ Provide an instance of Redis connection pool.
- Return type
ConnectionPool
-
redis_rm_queue
()[source]¶ Delete Redis queue storing data records.
- Returns
Flag indicating success or failure of the operation.
- Return type
-
reject_on_worker_lost
= None¶
-
request_stack
= <celery.utils.threads._LocalStack object>¶
-
serializer
= 'json'¶
-
store_errors_even_if_ignored
= False¶
-
task_fetch_slice
(slice_start=None, slice_end=None)[source]¶ Fetch a time slice of records.
Request time-domain data slice
[slice_start - slice_end]
- Parameters
- Return type
-
task_get_time_slices
(timestamp=None)[source]¶ Calculate time slices from the point in time.
Based on available configuration, generates a series of consecutive time slices (streams) going back in time from
timestamp
.- Parameters
timestamp (
Optional
[datetime
]) – Point in time from which to calculate time slices. Microseconds are zeroed andtimestamp
is rounded up to a second. If not defined, uses current time.
Note
Following
_config
options influence the result:timelag
- seconds to shift back fromtimestamp
adjusting the whole time-frame for the series.streams
- Number of time slices to generate.stream_frame
- Size of each time slice in seconds.
So, with the
timestamp=...-07T22:02:53.123
,timelag=60
,streams=3
andstream_frame=10
following series is generated:1. [...-07T22:01:44 - ...-07T22:01:53] 2. [...-07T22:01:34 - ...-07T22:01:43] 3. [...-07T22:01:24 - ...-07T22:01:33]
-
task_redis_push_multi_imap
(records, push_mode='pipe')[source]¶ Push records to Redis in parallel.
Split
records
list into smaller data sets to be pushed to Redis with multiple Greenlets. Parameterpush_mode
defines which data transfer strategy to use.Note
See the note for
task_redis_push_multi_spawn()
for detailed explanation on parameterpush_mode
.
-
task_redis_push_multi_spawn
(records, push_mode='pipe')[source]¶ Push records to Redis in parallel.
Split
records
list into smaller data sets to be pushed to Redis with multiple Greenlets.Note
Parameter
push_mode
defines which data transfer strategy to use by providing the suffix for specific_records_to_redis_*
method to be called. Currently acceptsnaive
orpipe
.
-
task_redis_push_single
(records, push_mode='pipe')[source]¶ Push records to Redis in a single batch.
Note
See the note for
task_redis_push_multi_spawn()
for detailed explanation on parameterpush_mode
.
-
track_started
= False¶
-
typing
= True¶
ms_graph_exporter.celery.tasks module¶
Module defines parallel data retrieval tasks.
Tasks use GraphApiTask
as a base for interaction with MS Graph API and Redis.
-
(task)
ms_graph_exporter.celery.tasks.
fetch_stream
(slice_start, slice_end)¶ Get a slice of records.
Request time-domain data slice
[slice_start - slice_end]
and paginate through the response passing each page of records to a dedicated queuing task.
-
(task)
ms_graph_exporter.celery.tasks.
map_streams
¶ Split extraction into streams.
When invoked by the scheduler, splits past time-frame between previous and current invocation into smaller time slices (streams) and initiates parallel data requests for each slice.
Note
The task should be executed every
streams * stream_frame
seconds in order to fully cover the elapsed time-frame between executions.- Return type
None
-
(task)
ms_graph_exporter.celery.tasks.
store_records
(records)¶ Store records.
Use Redis client to push
records
into a list or publish under a channel mentioned in the app config. Just log data, if the queue client is not instantiated.
Module contents¶
Package contains following modules.
ms_graph_exporter.celery.app
- Celery app and worker customizations.ms_graph_exporter.celery.config
- app config objects.ms_graph_exporter.celery.graph_api_base
- base class for data extraction tasks.ms_graph_exporter.celery.tasks
- parallel data retrieval tasks.