Source code for seeq.addons.azureml.backend._aml_online_endpoint_service

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime, timedelta
from ._aml_response_models import OnlineDeployment, OnlineEndpoint, AmlModel
from seeq.addons.azureml.utils import AzureMLException

API_VERSION = "2021-03-01-preview"

[docs]class AmlOnlineEndpointService: """ Provides a service to connect to Azure ML Studio and get endpoints that are tagged with `{Seeq: true}` and their associated deployments and models. Methods ------- list_online_endpoints() Returns a list containing online endpoints tagged with `{Seeq: true}` in Azure ML Studio """ def __init__(self, tenant_id, app_id, app_secret, subscription_id, resource_group, workspace_name) -> None: self._tenant_id = tenant_id self._app_id = app_id self._app_secret = app_secret self._subscription_id = subscription_id self._resource_group = resource_group self._workspace_name = workspace_name self._token = None self._token_expires_on = None retry_strategy = Retry( total=3, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["POST", "GET"] ) adapter = HTTPAdapter(max_retries=retry_strategy) self._http = requests.Session() self._http.mount("https://", adapter) self._http.mount("http://", adapter) def _authorize(self): """ Private method to authenticate to Returns ------- token: str Authentication token """ if self._token is not None: if self._token_expires_on is not None: if < self._token_expires_on: return self._token url = f"{self._tenant_id}/oauth2/token?api-version=1.0" payload = f"client_secret={self._app_secret}&grant_type=client_credentials&resource=https%3A%2F%2Fmanagement" \ f"{self._app_id}" headers = {'Content-Type': 'application/x-www-form-urlencoded'} response =, headers=headers, data=payload) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Azure Login Failed") token_info = response.json() self._token = token_info['access_token'] self._token_expires_on = + timedelta(minutes=59) return self._token def _get_base_mgmt_url(self): """ Private method to get the base management URL Returns ------- url, headers: tuple (str, str) URL and headers of the management service """ url = f"{self._subscription_id}/resourceGroups/" \ f"{self._resource_group}/providers/Microsoft.MachineLearningServices/workspaces/{self._workspace_name}/" headers = {'Authorization': f'Bearer {self._authorize()}'} return url, headers def _get_regional_model_mgmt_url(self): """ Private method to get the region management URL Returns ------- url, headers: tuple (str, str) URL and headers of the management service """ url_base, headers = self._get_base_mgmt_url() url = f"{url_base}?api-version={API_VERSION}" headers = {'Authorization': f'Bearer {self._authorize()}'} response = self._http.get(url, headers=headers) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Error getting workspace details") workspace_discovery_url = response.json()['properties']['discoveryUrl'] response = self._http.get(workspace_discovery_url) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Error accessing workspace discovery") mgmt_url = f"{response.json()['modelmanagement']}/modelmanagement/v1.0/subscriptions/" \ f"{self._subscription_id}/resourceGroups/" \ f"{self._resource_group}/providers/Microsoft.MachineLearningServices/" \ f"workspaces/{self._workspace_name}/services/" mgmt_headers = {'Authorization': f'Bearer {self._authorize()}'} return mgmt_url, mgmt_headers def _get_models(self, deployment: OnlineDeployment): """ Private method to get models associated with a given deployment in Azure ML Studio and attach them to the OnlineDeployment object Parameters ---------- deployment: seeq.addons.azureml.backend.OnlineDeployment Returns ------- -: None """ path = deployment.modelId.replace('/versions/', ':') url = f"{deployment.location}/modelmanagement/v1.0{path}" headers = {'Authorization': f'Bearer {self._authorize()}'} response = self._http.get(url, headers=headers) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Error getting models") model = AmlModel.deserialize_aml_model_response(response.json()) deployment.model = model def _add_deployments_to_endpoint(self, endpoint: OnlineEndpoint): """ Private method to get the associated deployments for an online endpoint in Azure ML Studio and attach them to the OnlineEndpoint object Parameters ---------- endpoint: seeq.addons.azureml.backend.OnlineEndpoint Returns ------- -: None """ url_base, headers = self._get_base_mgmt_url() url = f"{url_base}onlineEndpoints/{}/deployments?api-version={API_VERSION}" response = self._http.get(url, headers=headers) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Error getting deployments") deployments = OnlineDeployment.deserialize_aml_deployment_response(response.json(), "Managed") for d in deployments: self._get_models(d) endpoint.add_deployment(d) return endpoint def _add_keys_to_endpoint(self, endpoint: OnlineEndpoint): """ Private method to attach the primary and secondary keys to a given OnlineEndpoint object Parameters ---------- endpoint: seeq.addons.azureml.backend.OnlineEndpoint Returns ------- -: None """ if endpoint.kind == "Managed": base_url, headers = self._get_base_mgmt_url() url = f"{base_url}onlineEndpoints/{}/listKeys?api-version={API_VERSION}" else: base_url, headers = self._get_regional_model_mgmt_url() url = f"{base_url}{}/listKeys" response =, headers=headers) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message=f"Error listing keys for endpoint. Endpoint name: {}") keys = response.json() endpoint.primaryKey = keys['primaryKey'] endpoint.secondaryKey = keys['secondaryKey'] def _get_unmanaged_online_endpoints(self): """ Private method to get a list of endpoints that are deployed as an ACI compute type. This is a workaround due to the endpoints API not returing endpoints that have a compute type of ACI. Returns ------- oes: list List of OnlineEndpoint objects with deployments and models attached to each object """ url, headers = self._get_regional_model_mgmt_url() headers['computeType'] = "ACI" response = self._http.get(url, headers=headers) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Error getting ACI endpoints") oes = OnlineEndpoint.deserialize_unmanaged_endpoint_response(response.json()) for oe in oes: self._add_keys_to_endpoint(oe) # unmanaged endpoints only have one deployment oe.deployment[0].modelId = f"/subscriptions/{self._subscription_id}/resourceGroups/" \ f"{self._resource_group}/providers/Microsoft.MachineLearningService" \ f"s/workspaces/{self._workspace_name}/models/" \ f"{oe.deployment[0].model}/versions/" \ f"{oe.deployment[0].model_version}" self._get_models(oe.deployment[0]) return oes def _get_managed_online_endpoints(self): """ Private method to get a list of endpoints tagged with `{Seeq: true}` in Azure ML Studio and attach the associated deployments and models in the endpoint to each OnlineEndpoint object Returns ------- oes: list List of OnlineEndpoint objects with deployments and models attached to each object """ url_base, headers = self._get_base_mgmt_url() url = f"{url_base}onlineEndpoints?api-version=2021-03-01-preview" response = self._http.get(url, headers=headers) if response.status_code != 200: raise AzureMLException(code=response.status_code, reason=response.reason, message="Error getting endpoints") oes = OnlineEndpoint.deserialize_managed_endpoint_response(response.json()) for oe in oes: self._add_keys_to_endpoint(oe) self._add_deployments_to_endpoint(oe) return oes
[docs] def list_online_endpoints(self): """ Public method to get a list of endpoints tagged with `{Seeq: true}` in Azure ML Studio and attach the associated deployments and models in the endpoint to each OnlineEndpoint object. Returns ------- oes: list List of OnlineEndpoint objects with deployments and models attached to each object """ oes = self._get_unmanaged_online_endpoints() oes += self._get_managed_online_endpoints() return oes
def exception_message(message, code, reason): return f'{message}. Return code: {str(code)} with reason: {str(reason)}'