ms_graph_exporter.celery package

Submodules

ms_graph_exporter.celery.app module

Module defines Celery app.

Configure app with additional user options available for the Celery worker.

class ms_graph_exporter.celery.app.CustomArgs(worker, **options)[source]

Bases: celery.bootsteps.Step

Bootstep to handle custom options and setup of the Celery worker.

__init__(worker, **options)[source]

Initialize class instance.

Passes User Options defined by the worker to GraphApiTask base class and schedules periodic data extraction from MS Graph API into the post-processing queue.

Note

By default, no config file is expected by the app. All options are taken either from the command-line interface (CLI) or the execution environment (ENV).

Config file takes precedence over the CLI, which in turn takes precedence over ENV.

name = 'ms_graph_exporter.celery.app.CustomArgs'
ms_graph_exporter.celery.app.add_worker_arguments(parser)[source]

Configure custom options for the Celery worker.

ms_graph_exporter.celery.config module

Module defines Celery configuration objects.

class ms_graph_exporter.celery.config.CeleryConfigBase[source]

Bases: object

Baseline Celery app configuration.

beat_max_loop_interval = 300
beat_scheduler = 'redbeat.RedBeatScheduler'
include = ('ms_graph_exporter.celery.tasks',)
redbeat_key_prefix = 'ms_graph_exporter'
redbeat_lock_timeout = 1500
task_acks_late = True
task_ignore_result = True
task_routes = {'graph.fetch_stream': {'queue': 'graph.fetch'}, 'graph.map_streams': {'queue': 'graph.map'}, 'graph.store_records': {'queue': 'graph.store'}}
timezone = 'UTC'
worker_prefetch_multiplier = 1
class ms_graph_exporter.celery.config.CeleryConfigDev[source]

Bases: ms_graph_exporter.celery.config.CeleryConfigBase

Celery app configuration (DEV).

broker_url = 'redis://localhost:6379/0'
class ms_graph_exporter.celery.config.CeleryConfigProd[source]

Bases: ms_graph_exporter.celery.config.CeleryConfigBase

Celery app configuration (PROD).

beat_max_loop_interval = 5
broker_url = 'redis://redis:6379/0'
redbeat_lock_timeout = 15
class ms_graph_exporter.celery.config.CeleryConfigTest[source]

Bases: ms_graph_exporter.celery.config.CeleryConfigBase

Celery app configuration (TEST).

broker_url = 'redis://localhost:6379/0'
task_always_eager = True

ms_graph_exporter.celery.graph_api_base module

Module implements base class GraphApiTask for data extraction Celery tasks.

GraphApiTask defines Redis and MS Graph API clients as attributes to be shared by all tasks within a Celery worker.

class ms_graph_exporter.celery.graph_api_base.GraphApiTask[source]

Bases: celery.app.task.Task

Base class for MS Graph API data extraction Celery tasks.

Allows to share instances of Redis connection pool BlockingConnectionPool and MsGraph between all tasks executed within a single Celery worker.

Variables
  • _config (Dict [str, Any]) – Dictionary holding configuration parameters that are initialized by the worker on startup.

  • _logger (Logger) – Channel to be used for log output specific to the worker.

  • _ms_graph (MsGraph) – Ms Graph API client instance to be used for queries.

  • _redis_pool (Redis) – Connection pool to be used by Redis clients for response data queuing.

_records_to_redis_naive(records)[source]

Push records to Redis.

Push records into a Redis list or publish in a channel. Does not apply any optimizations. Yields control to Gevent Hub at each iteration.

Parameters

records (List[Any]) – List of records to store.

Returns

Flag indicating success or failure of the operation.

Return type

bool

_records_to_redis_pipe(records)[source]

Push records to Redis with pipelining.

Push records into a list or publish in a channel. Utilize pipelining to minimize connection overhead. Yields control to Gevent Hub at each iteration.

Parameters

records (List[Any]) – List of records to store.

Returns

Flag indicating success or failure of the operation.

Return type

bool

classmethod config_update(**options)[source]

Update internal configuration.

Accept arbitrary set of keyword arguments and update _config dict with all values matching keys defined there initially.

Parameters

**options – Set of arbitrary keyword arguments used to update _config.

Return type

None

classmethod config_update_from_file(config_file=None)[source]

Update internal configuration from file.

Read YAML config_file and update _config dict with all values matching keys defined there initially.

Parameters

config_file (Optional[str]) – Path to YAML config file. If not defined, tries to load file mentioned in _config["graph_app_config"]

Return type

None

property graph

Provide a shared instance of MS Graph API client.

Return type

MsGraph

ignore_result = True
priority = None
rate_limit = None
property redis_client

Provide an instance of Redis client.

Return type

Redis

property redis_conn_pool

Provide an instance of Redis connection pool.

Return type

ConnectionPool

redis_rm_queue()[source]

Delete Redis queue storing data records.

Returns

Flag indicating success or failure of the operation.

Return type

bool

reject_on_worker_lost = None
request_stack = <celery.utils.threads._LocalStack object>
serializer = 'json'
store_errors_even_if_ignored = False
task_fetch_slice(slice_start=None, slice_end=None)[source]

Fetch a time slice of records.

Request time-domain data slice [slice_start - slice_end]

Parameters
Return type

MsGraphResponse

task_get_time_slices(timestamp=None)[source]

Calculate time slices from the point in time.

Based on available configuration, generates a series of consecutive time slices (streams) going back in time from timestamp.

Parameters

timestamp (Optional[datetime]) – Point in time from which to calculate time slices. Microseconds are zeroed and timestamp is rounded up to a second. If not defined, uses current time.

Note

Following _config options influence the result:

  • timelag - seconds to shift back from timestamp adjusting the whole time-frame for the series.

  • streams - Number of time slices to generate.

  • stream_frame - Size of each time slice in seconds.

So, with the timestamp=...-07T22:02:53.123, timelag=60, streams=3 and stream_frame=10 following series is generated:

1. [...-07T22:01:44 - ...-07T22:01:53]
2. [...-07T22:01:34 - ...-07T22:01:43]
3. [...-07T22:01:24 - ...-07T22:01:33]
Return type

List[Tuple[datetime, datetime]]

task_records_to_log(records)[source]

Push records to log.

Output records in the log.

Parameters

records (List[Any]) – List of records to queue.

Return type

None

task_redis_push_multi_imap(records, push_mode='pipe')[source]

Push records to Redis in parallel.

Split records list into smaller data sets to be pushed to Redis with multiple Greenlets. Parameter push_mode defines which data transfer strategy to use.

Note

See the note for task_redis_push_multi_spawn() for detailed explanation on parameter push_mode.

Parameters
  • records (List[Any]) – List of records to store in Redis.

  • push_mode (str) – Specifies the data transfer strategy to use.

Returns

Flag indicating success or failure of the operation

Return type

bool

task_redis_push_multi_spawn(records, push_mode='pipe')[source]

Push records to Redis in parallel.

Split records list into smaller data sets to be pushed to Redis with multiple Greenlets.

Note

Parameter push_mode defines which data transfer strategy to use by providing the suffix for specific _records_to_redis_* method to be called. Currently accepts naive or pipe.

Parameters
  • records (List[Any]) – List of records to store in Redis.

  • push_mode (str) – Specifies the data transfer strategy to use.

Returns

Flag indicating success or failure of the operation

Return type

bool

task_redis_push_single(records, push_mode='pipe')[source]

Push records to Redis in a single batch.

Note

See the note for task_redis_push_multi_spawn() for detailed explanation on parameter push_mode.

Parameters
  • records (List[Any]) – List of records to queue.

  • push_mode (str) – Specifies the data transfer strategy to use.

Returns

Flag indicating success or failure of the operation

Return type

bool

track_started = False
typing = True

ms_graph_exporter.celery.tasks module

Module defines parallel data retrieval tasks.

Tasks use GraphApiTask as a base for interaction with MS Graph API and Redis.

(task)ms_graph_exporter.celery.tasks.fetch_stream(slice_start, slice_end)

Get a slice of records.

Request time-domain data slice [slice_start - slice_end] and paginate through the response passing each page of records to a dedicated queuing task.

Parameters
  • slice_start (str) – Beginning of the data slice (ISO 8601 formated timestamp).

  • slice_end (str) – End of the data slice (ISO 8601 formated timestamp).

Return type

None

(task)ms_graph_exporter.celery.tasks.map_streams

Split extraction into streams.

When invoked by the scheduler, splits past time-frame between previous and current invocation into smaller time slices (streams) and initiates parallel data requests for each slice.

Note

The task should be executed every streams * stream_frame seconds in order to fully cover the elapsed time-frame between executions.

Return type

None

(task)ms_graph_exporter.celery.tasks.store_records(records)

Store records.

Use Redis client to push records into a list or publish under a channel mentioned in the app config. Just log data, if the queue client is not instantiated.

Parameters

records (List[Dict[str, Any]]) – List of records to queue.

Return type

None

Module contents

Package contains following modules.