Source code for ms_graph_exporter.ms_graph.api

# -*- coding: utf-8 -*-
"""Module implements :class:`MsGraph` class to perform authenticated queries to the API.

:class:`MsGraph` obtains an OAuth 2.0 token from Azure AD with Service Principal
for subsequent non-interactive authentication with the API endpoint. It also maintains
persistent HTTPS session to the endpoint for efficient network communications.

Example
-------
Use the :class:`MsGraph` like this::

    from datetime import datetime, timedelta
    from os import environ
    from logging import Logger, getLogger
    from typing import Any, Dict, List

    from ms_graph_exporter.ms_graph.api import MsGraph
    from ms_graph_exporter.ms_graph.response import MsGraphResponse

    logger: Logger
    graph: MsGraph
    t_now: datetime
    response: MsGraphResponse
    batch: List
    record: Dict[str, Any]

    logger = getLogger(__name__)

    graph = MsGraph(
        client_id=environ.get("GRAPH_CLIENT_ID"),
        client_secret=environ.get("GRAPH_CLIENT_SECRET"),
        tenant=environ.get("GRAPH_TENANT"),
    )

    t_now = datetime.utcnow()

    response = graph.get_signins(
        user_id="badc0ffe42@cafe.com",
        timestamp_start=(t_now - timedelta(minutes=10)),
        timestamp_end=(t_now - timedelta(minutes=5)),
        page_size=50
    )

    for batch in response:
        for record in batch:
            logger.info(
                "%s: %s: %s",
                signins,
                record["id"],
                record["ipAddress"],
            )

"""
from datetime import datetime
from http.client import responses
from logging import Logger, getLogger
from time import sleep
from typing import Any, Dict, List, Optional, Tuple
from uuid import uuid4

from adal.authentication_context import AuthenticationContext

from requests import Response, Session

from ms_graph_exporter.ms_graph import response as api_response


[docs]class MsGraph: """Class to maintain authenticated connection, and post queries to MS Graph API. Authenticates with Azure AD, maintains an OAuth 2.0 token and HTTP session with connection pool to interact with MS Graph API. Attributes ---------- __api_endpoint : :obj:`str` MS Graph API endpoint to call. __api_version : :obj:`str` MS Graph API version to call. __logger : :obj:`~logging.Logger` Channel to be used for log output specific to the module. __throttling_retries : :obj:`int` Number of retries when getting API throttling response. _auth_context : :obj:`AuthenticationContext <adal:adal.AuthenticationContext>` Authentication context maintained by MS ADAL for Python. Manages OAuth 2.0 token cache and refreshes it if necessary. _authority_url : :obj:`str` A URL that identifies a token authority. Should be of the format ``https://login.microsoftonline.com/your_tenant`` _client_id : :obj:`str` The OAuth client id of the calling application. (``appId`` part of the Service Principal) _client_secret : :obj:`str` The OAuth client secret of the calling application. (``password`` part of the Service Principal) _tenant : :obj:`str` The Azure AD tenant granting the token and where the calling application is registered. Can be in GUID or friendly name format. (``tenant`` part of the Service Principal) _uuid : :obj:`str` Universally unique identifier of the class instance to be used in logging. """ __api_endpoint: str = "https://graph.microsoft.com" __api_version: str = "v1.0" __logger: Logger = getLogger(__name__) __http_session: Optional[Session] = None __throttling_retries: int = 5 _auth_context: AuthenticationContext _authority_url: str = "https://login.microsoftonline.com/{tenant}" _client_id: str = "<undefined>" _client_secret: str = "<undefined>" _tenant: str = "<undefined>" _uuid: str = "<undefined>"
[docs] def __init__( # noqa: S107 self, client_id: str = "", client_secret: str = "", tenant: str = "", *args, **kwargs, ) -> None: """Initialize class instance. Parameters ---------- client_id The OAuth client id of the calling application. (``appId`` part of the Service Principal) client_secret The OAuth client secret of the calling application. (``password`` part of the Service Principal) tenant The Azure AD tenant granting the token and where the calling application is registered. Can be in GUID or friendly name format. (``tenant`` part of the Service Principal) *args Variable length argument list for possible extension with subclass. **kwargs Arbitrary keyword arguments for possible extension with subclass. """ self._uuid = str(uuid4()) self._client_id = client_id self._client_secret = client_secret self._tenant = tenant self._authority_url = self._authority_url.format(tenant=self._tenant) self._auth_context = AuthenticationContext( self._authority_url, validate_authority=True, verify_ssl=True ) self.__logger.info("%s: Initialized with client_id[%s]", self, self._client_id) self.__logger.debug("%s: client_secret: %s", self, self._client_secret) self.__logger.debug("%s: tenant: %s", self, self._tenant)
[docs] def __repr__(self) -> str: """Return string representation of class instance.""" return "MsGraph[{}]".format(self._uuid)
@property def http_session(self) -> Session: """Provide access to HTTP session instance. Open persistent HTTP session with connection pool, if one does not exist yet. Returns ------- :obj:`Session <requests:requests.Session>` Persistent HTTP session instance. """ if self.__http_session is None: self.__http_session = Session() return self.__http_session @property def token(self) -> Dict[str, Any]: """Token to interact with MS Graph API. Use client credentials within ADAL authentication context to get OAuth 2.0 token from cache if not expired or re-request token otherwise. Returns ------- :obj:`~typing.Dict` [:obj:`str`, :obj:`~typing.Any`] Cached or newly (re-)issued OAuth 2.0 token. Token obtained with client credentials has the following structure: .. code-block:: { "tokenType": "Bearer", "expiresIn": 3600, "expiresOn": "2019-07-12 23:38:57.541597", "resource": "https://graph.microsoft.com", "accessToken": "{{token string}}", "isMRRT": True, "_clientId": "cafebabe-4242-4242-cafe-badc0ffebabe", "_authority": "https://login.microsoftonline.com/cafebabe-4242-4242-cafe-badc0ffebabe" # noqa: E501 } """ token: Dict[str, Any] token = self._auth_context.acquire_token_with_client_credentials( self.__api_endpoint, self._client_id, self._client_secret ) if "expiresOn" in token: self.__logger.info( "%s: Acquired token expires on: %s", self, token["expiresOn"] ) else: self.__logger.error("%s: No valid API token received", self) return token
[docs] def _build_filter( self, filter_options: List[Tuple[str, str, str]] = None, filter_join_op: str = "and", ) -> str: """Build OData filter. Construct an OData filter from a list of tuples with filtering parameters expressions and values. Parameters ---------- filter_options List of ``("option", "operand", "value")`` tuples to construct OData request filter by joining them with ``filter_join_op`` operator. filter_join_op Logic operator (i.e. ``or`` or ``and``) to join ``filter_options`` with. Note ---- ``filter_options`` parameter follows this structure: .. code-block:: python [ ("userPrincipalName", "eq", "badc0ffe42@cafe.com"), ("createdDateTime", "ge", "2019-07-26T02:02:02Z"), ("createdDateTime", "le", "2019-07-26T04:04:04Z"), ] So, ``startwith()`` expression must be presented as: .. code-block:: python ("startwith(userPrincipalName, 'badcafe')", "", "") Returns ------- :obj:`str` OData filter. """ filter_results: str = "" if filter_options is not None: filter_results = " {} ".format(filter_join_op).join( "{} {} {}".format(option, operand, value) for option, operand, value in filter_options ) return filter_results
[docs] def _http_get_with_auth( self, api_url: str, params: Dict[str, Any] = None ) -> Response: """Perform authenticated GET request. Request ``api_url`` with ``params`` using available OAuth 2.0 token and a connection pool from the established HTTP session. Note ---- MS Graph sends ``HTTP 429`` code to signal `API throttling`_ with ``Retry-After`` HTTP header specifying the wait period in seconds. Throttling is handled by sleeping for ``Retry-After`` seconds and retrying again up to ``__throttling_retries`` times. .. _API throttling: https://docs.microsoft.com/en-us/graph/throttling Parameters ---------- api_url URL to be requested with available OAuth 2.0 token. params URL parameters to supply with the request. Returns ------- :obj:`Response <requests:requests.Response>` HTTP response to the authenticated GET request. """ headers: Dict[str, str] = { "Authorization": "Bearer {}".format(self.token["accessToken"]), "Accept": "application/json", "Content-Type": "application/json", "client-request-id": str(uuid4()), "return-client-request-id": "true", } params_log: str = "" if params is not None: params_log = "&".join("{}={}".format(k, v) for k, v in params.items()) retry: int = self.__throttling_retries while retry >= 0: self.__logger.info( "%s: Sending HTTP Request[%s]", self, headers["client-request-id"] ) self.__logger.debug( "Request[%s][HTTP GET]: %s?%s", headers["client-request-id"], api_url, params_log, ) response: Response = self.http_session.get( api_url, headers=headers, params=params, timeout=(3, 30) ) self.__logger.info( "Request[%s][HTTP %s]: %s", response.headers["client-request-id"], response.status_code, responses[response.status_code], ) if response.status_code == 200: self.__logger.debug( "Request[%s][HTTP %s]: @odata.context: %s", response.headers["client-request-id"], response.status_code, ("present" if "@odata.context" in response.json() else "absent"), ) self.__logger.debug( "Request[%s][HTTP %s]: @odata.nextLink: %s", response.headers["client-request-id"], response.status_code, ("present" if "@odata.nextLink" in response.json() else "absent"), ) self.__logger.info( "Request[%s][HTTP %s]: size: %s", response.headers["client-request-id"], response.status_code, len(response.json()["value"]), ) break elif response.status_code == 429: throttling_delay = int(response.headers["Retry-After"]) self.__logger.warning( "%s: Throttled for %s seconds (%s retries left)", self, throttling_delay, retry, ) sleep(throttling_delay) retry = retry - 1 else: self.__logger.exception( "Request[%s][HTTP %s]: Unexpected HTTP code [%s]: %s", response.headers["client-request-id"], response.status_code, response.json()["error"]["code"], response.json()["error"]["message"], ) self.__logger.debug("%s: Headers: %s", self, response.headers) break return response
[docs] def _query_api( self, resource: str = None, odata_filter: str = None, page_size: int = None, cache_enabled: bool = False, ) -> "api_response.MsGraphResponse": """Query MS Graph API. Perform authenticated request to MS Graph ``resource`` with ``odata_filter``. Returns paginated response, if ``page_size`` is defined and greater than zero. Parameters ---------- resource Resource/relationship to be queried from MS Graph API endpoint (e.g. ``me/messages``). Expected to start with resource name, but not with ``/``. odata_filter Query filter allowing to retrieve a subset of available data (e.g. ``createdDateTime le 2019-07-14T05:20:00``). page_size Number of records to be returned in a single (paginated) response. See `paging in MS Graph API`_ for more details. .. _paging in MS Graph API: https://docs.microsoft.com/en-us/graph/paging cache_enabled Flag indicating if response data should be cached (``True``) or not (``False``). Note ---- If all requested records do not fit into the initial response, iterating through :obj:`~ms_graph_exporter.ms_graph.response.MsGraphResponse` instance would be needed to retrieve all available records in batches of ``page_size`` size. Returns ------- :obj:`~ms_graph_exporter.ms_graph.response.MsGraphResponse` Response which (depending on the ``page_size``) would either contain a full set of returned records, or just the first batch cached and an iterator to get all the subsequent paginated results. """ query_api_url: str = "{api_endpoint}/{version}/{resource}".format( api_endpoint=self.__api_endpoint, version=self.__api_version, resource=resource, ) query_api_params: Dict = {} if (page_size is not None) and (page_size > 0): query_api_params["$top"] = page_size query_api_params["$filter"] = odata_filter self.__logger.info("%s: Query '%s'", self, resource) self.__logger.debug("%s: odata_filter: '%s'", self, odata_filter) self.__logger.debug("%s: page_size: %s", self, page_size) msgraph_results: Optional["api_response.MsGraphResponse"] = None response: Response = self._http_get_with_auth( api_url=query_api_url, params=query_api_params ) msgraph_results = api_response.MsGraphResponse( ms_graph=self, initial_data=response.json() if response.status_code == 200 else None, initial_url=response.url, cache_enabled=cache_enabled, ) return msgraph_results
[docs] def _query_api_time_domain( self, resource: str = None, filter_options: List[Tuple[str, str, str]] = None, filter_join_op: str = "and", timestamp_start: datetime = None, timestamp_end: datetime = None, page_size: int = None, cache_enabled: bool = False, ) -> "api_response.MsGraphResponse": """Query time-domain records from MS Graph API. Request ``resource`` for the time-frame starting at ``timestamp_start`` and ending at ``timestamp_end``. Returns paginated response, if ``page_size`` is defined. Note ---- * Queries all available data up to ``timestamp_end``, if ``timestamp_start`` is not defined. * Without ``timestamp_end`` defined, gets the data up to the moment of the query execution minus intrinsic (~2 minutes) data population delay of the API. Parameters ---------- resource Resource/relationship to be queried from MS Graph API endpoint (e.g. ``me/messages``). Expected to start with resource name, but not with ``/``. filter_options List of ``("option", "operand", "value")`` tuples to construct OData request filter by joining them with ``filter_join_op`` operator. filter_join_op Logic operator (i.e. ``or`` or ``and``) to join ``filter_options`` with. timestamp_start Limit results to records with greater or equal ``createdDateTime`` values. See ``Note`` section below for more details. timestamp_end Limit results to records with lower or equal ``createdDateTime`` values. page_size Number of records to be returned in a single batch (paginated) response. See ``Note`` section below for more details. cache_enabled Flag indicating if response data should be cached (``True``) or not (``False``). Note ---- MS Graph API v1.0 output below suggests that API operates with timestamps of ``0.1 microsecond precision``, resulting in **7 digits** after the point for ``createdDateTime``. .. code-block:: { "@odata.context": "...snip...", "value": [ { "id": "cafebabe-4242-4242-cafe-badc0ffebabe", "createdDateTime": "2019-07-21T22:05:58.8424069Z", "...snip...": "...snip..." } ] } On the other hand, Python ``datetime`` module allows to define time down to ``1 microsecond precision``, resulting in **6 digits** after the point. >>> import datetime >>> timestamp_start = datetime.datetime(2019, 8, 24, 21, 10, 30, 9999999) Traceback (most recent call last): ... ValueError: microsecond must be in 0..999999 >>> timestamp_start = datetime.datetime(2019, 8, 24, 21, 10, 30, 999999) >>> timestamp_start.isoformat() '2019-08-24T21:10:30.999999' To ensure no records are missed due to difference in timestamp precision between ``datetime`` module and the API, both ``timestamp_start`` and ``timestamp_end`` parameters are truncated to seconds (microseconds value is replaced by ``0``) for ``$filter`` construction. Then ``.0000000`` is added to the string representation of ``timestamp_start`` and ``.9999999`` is added to ``timestamp_end``. In other words, request time-frame always starts at the beginning of a 0.1 microsecond and ends at the end of the 0.1 microsecond as defined by the precision of the MS Graph API v1.0 timestamps. Returns ------- :obj:`~ms_graph_exporter.ms_graph.response.MsGraphResponse` A response which (depending on the ``page_size``) would either contain a full set of returned records, or just the first batch cached and an iterator to get all the subsequent paginated results. """ query_filter: List[Tuple[str, str, str]] = [] if filter_options is not None: query_filter = filter_options if timestamp_start is not None: query_filter.append( ( "createdDateTime", "ge", "{}.0000000Z".format( timestamp_start.replace(microsecond=0).isoformat() ), ) ) if timestamp_end is not None: query_filter.append( ( "createdDateTime", "le", "{}.9999999Z".format( timestamp_end.replace(microsecond=0).isoformat() ), ) ) response: "api_response.MsGraphResponse" = self._query_api( resource=resource, odata_filter=self._build_filter(query_filter, filter_join_op), page_size=page_size, cache_enabled=cache_enabled, ) return response
[docs] def get_signins( self, user_id: str = None, timestamp_start: datetime = None, timestamp_end: datetime = None, page_size: int = None, cache_enabled: bool = False, ) -> "api_response.MsGraphResponse": """Get Azure AD signin log records from MS Graph API. Request ``user_id`` login data for the time-frame starting at ``timestamp_start`` and ending at ``timestamp_end``. Returns paginated response, if ``page_size`` is defined. Parameters ---------- user_id Limit results to records with ``userPrincipalName`` equal to ``user_id``. timestamp_start Limit results to records with greater or equal ``createdDateTime`` values. See :meth:`_query_api_time_domain` for more details. timestamp_end Limit results to records with lower or equal ``createdDateTime`` values. page_size Number of records to be returned in a single batch (paginated) response. See :meth:`_query_api_time_domain` for more details. cache_enabled Flag indicating if response data should be cached (``True``) or not (``False``). Returns ------- :obj:`~ms_graph_exporter.ms_graph.response.MsGraphResponse` A response which (depending on the ``page_size``) would either contain a full set of returned records, or just the first batch cached and an iterator to get all the subsequent paginated results. """ filter_options: List = [] if user_id is not None: filter_options.append(("userPrincipalName", "eq", "'{}'".format(user_id))) response: "api_response.MsGraphResponse" = self._query_api_time_domain( resource="auditLogs/signIns", filter_options=filter_options, timestamp_start=timestamp_start, timestamp_end=timestamp_end, page_size=page_size, cache_enabled=cache_enabled, ) return response