Source code for servicex.servicex_adaptor
import asyncio
from datetime import datetime
from typing import AsyncIterator, Dict, Optional, Tuple
import logging
import aiohttp
from google.auth import jwt
from .utils import (
ServiceXException,
ServiceXFailedFileTransform,
ServiceXFatalTransformException,
ServiceXUnknownRequestID,
TransformTuple,
)
# Number of seconds to wait between polling servicex for the status of a transform job
# while waiting for it to finish.
servicex_status_poll_time = 5.0
# Low level routines for interacting with a ServiceX instance via the WebAPI
[docs]class ServiceXAdaptor:
[docs] def __init__(self, endpoint, refresh_token=None):
"""
Authenticated access to ServiceX
"""
self._endpoint = endpoint
self._token = None
self._refresh_token = refresh_token
[docs] async def _get_token(self, client: aiohttp.ClientSession):
url = f"{self._endpoint}/token/refresh"
headers = {"Authorization": f"Bearer {self._refresh_token}"}
async with client.post(url, headers=headers, json=None) as response:
status = response.status
if status == 200:
j = await response.json()
self._token = j["access_token"]
else:
raise ServiceXException(
f"ServiceX access token request rejected: {status}"
)
[docs] async def _get_authorization(self, client: aiohttp.ClientSession):
if not self._refresh_token:
return {}
now = datetime.utcnow().timestamp()
if not self._token or jwt.decode(self._token, verify=False)["exp"] - now < 0:
await self._get_token(client)
return {"Authorization": f"Bearer {self._token}"}
[docs] async def submit_query(
self, client: aiohttp.ClientSession, json_query: Dict[str, str]
) -> Dict[str, str]:
"""
Submit a query to ServiceX, and return a request ID
"""
headers = await self._get_authorization(client)
async with client.post(
f"{self._endpoint}/servicex/transformation",
headers=headers,
json=json_query,
) as response:
status = response.status
if status != 200:
# This was an error at ServiceX, bubble it up so code above us can
# handle as needed.
t = await response.text()
raise ServiceXException(
"ServiceX rejected the transformation request: " f"({status}){t}"
)
r = await response.json()
return r
[docs] async def get_query_status(
self, client: aiohttp.ClientSession, request_id: str
) -> Dict[str, str]:
"""Returns the full query information from the endpoint.
Args:
client (aiohttp.ClientSession): Client session on which to make the request.
request_id (str): The request id to return the tranform status
Raises:
ServiceXException: If we fail to find the information.
Returns:
Dict[str, str]: The JSON dictionary of information returned from ServiceX
"""
headers = await self._get_authorization(client)
async with client.get(
f"{self._endpoint}/servicex/transformation/{request_id}", headers=headers
) as response:
status = response.status
if status != 200:
# This was an error at ServiceX, bubble it up so code above us can
# handle as needed.
t = await response.text()
raise ServiceXException(
"ServiceX rejected the transformation status fetch: "
f"({status}){t}"
)
r = await response.json()
return r
[docs] async def dump_query_errors(self, client: aiohttp.ClientSession, request_id: str):
"""Dumps to the logging system any error messages we find from ServiceX.
Args:
client (aiohttp.ClientSession): Client along which to send queries.
request_id (str): Fetch all errors from there.
"""
headers = await self._get_authorization(client)
async with client.get(
f"{self._endpoint}/servicex/transformation/{request_id}/errors",
headers=headers,
) as response:
status = response.status
if status != 200:
t = await response.text()
if "Request not found" in t:
raise ServiceXUnknownRequestID(
f"Unable to get errors for request {request_id}"
f": {status} - {t}"
)
else:
raise ServiceXException(
f"Failed to get request errors for {request_id}: "
f"{status} - {t}"
)
# Dump the messages out to the logger if there are any!
# Keep the output under control - only dump the first 20 errors (see #188)
errors = (await response.json())["errors"]
log = logging.getLogger(__name__)
log.warning(f"Transform {request_id} had {len(errors)} errors:")
for e in errors[:20]:
log.warning(f' Error transforming file: {e["file"]}')
for ln in e["info"].split("\n"):
log.warning(f" -> {ln}")
[docs] @staticmethod
def _get_transform_stat(info: Dict[str, str], stat_name: str) -> Optional[int]:
"Return the info from a servicex status reply, protecting against bad internet returns"
return (
None
if ((stat_name not in info) or (info[stat_name] is None))
else int(info[stat_name])
)
[docs] async def get_transform_status(
self, client: aiohttp.ClientSession, request_id: str
) -> Tuple[Optional[int], int, Optional[int]]:
"""
Internal routine that queries for the current stat of things. We expect the
following things to come back:
- files-processed
- files-remaining
- files-skipped
- request-id
- stats
If the transform has already completed, we return data from cache.
Arguments:
endpoint Web API address where servicex lives
request_id The id of the request to check up on
Raises:
ServiceXException If the status returns `Fatal`.
Returns:
files_remaining How many files remain to be processed. None if the number
has not yet been determined
files_processed How many files have been successfully processed
by the system.
files_failed Number of files that were skipped
"""
headers = await self._get_authorization(client)
# Make the actual query
async with client.get(
f"{self._endpoint}/servicex/transformation/{request_id}/status",
headers=headers,
) as response:
status = response.status
if status != 200:
raise ServiceXUnknownRequestID(
f"Unable to get transform status "
f"for request id {request_id}"
f" - http error {status}"
)
info = await response.json()
logging.getLogger(__name__).debug(
f"Status response for {request_id}: {info}"
)
if "status" in info and info["status"] == "Fatal":
raise ServiceXFatalTransformException(
f"Transform status for {request_id}" ' is marked "Fatal".'
)
files_remaining = self._get_transform_stat(info, "files-remaining")
files_failed = self._get_transform_stat(info, "files-skipped")
files_processed = self._get_transform_stat(info, "files-processed")
if files_remaining is not None and int(files_remaining) < 0:
raise ServiceXFatalTransformException(
f"Transform {request_id} returned negative files remaining. Bug in ServiceX?"
)
assert files_processed is not None
return files_remaining, files_processed, files_failed
[docs]async def transform_status_stream(
sa: ServiceXAdaptor, client: aiohttp.ClientSession, request_id: str
) -> AsyncIterator[TransformTuple]:
"""
Returns an async stream of `(files-remaining, files_processed, files_failed)` until the
servicex `request_id` request is finished, against the servicex instance located at
`sa`.
Arguments:
sa The servicex low level adaptor
client An async http function we can call and use
request_id The request id for this request
Returns:
remaining, processed, skipped Returns an async stream triple of the
status numbers. Every time we find something
we send it on.
Note:
"""
done = False
while not done:
next_processed = await sa.get_transform_status(client, request_id)
remaining, _, _ = next_processed
done = remaining is not None and remaining == 0
yield next_processed
if not done:
await asyncio.sleep(servicex_status_poll_time)
[docs]async def trap_servicex_failures(
stream: AsyncIterator[TransformTuple],
) -> AsyncIterator[TransformTuple]:
"""
Looks for any failed files. If it catches one, it will remember it and throw once the stream
is done. This allows all the files to come down first.
"""
async for p in stream:
remain, processed, did_fail = p
if did_fail is not None and did_fail != 0:
raise ServiceXFailedFileTransform(
f"ServiceX failed to transform {did_fail} "
f"files - data incomplete (remaining: {remain}, "
f"processed: {processed})."
)
yield p