import asyncio
import functools
import logging
import time
from datetime import timedelta
from pathlib import Path
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)
import aiohttp
import backoff
import minio
from backoff import on_exception
from make_it_sync import make_sync
from servicex.servicex_config import ServiceXConfigAdaptor
from .cache import Cache
from .data_conversions import DataConverterAdaptor
from .minio_adaptor import MinioAdaptor, MinioAdaptorFactory, find_new_bucket_files
from .servicex_adaptor import (
ServiceXAdaptor,
transform_status_stream,
trap_servicex_failures,
)
from .servicex_utils import _wrap_in_memory_sx_cache
from .servicexabc import ServiceXABC
from .utils import (
DatasetType,
ServiceXException,
ServiceXFailedFileTransform,
ServiceXFatalTransformException,
ServiceXNoFilesInCache,
ServiceXUnknownDataRequestID,
ServiceXUnknownRequestID,
StatusUpdateFactory,
_run_default_wrapper,
_status_update_wrapper,
default_client_session,
get_configured_cache_path,
log_adaptor,
on_exception_itr,
stream_status_updates,
stream_unique_updates_only,
)
# The allowed file formats.
# You could modify this if you wanted to add new...
g_allowed_formats = ["parquet", "root-file"]
[docs]class StreamInfoBase:
"""Contains base information about results that are streamed back from
ServiceX.
"""
[docs] def __init__(self, file: str):
self._file = file
@property
def file(self) -> str:
"""Returns the ServiceX filename
This filename is unique in the dataset, and will be the same across different queries
against the dataset. It can be used as a key to sort results.
Notes:
- May contains non-file system characters
Returns:
str: servicex filename
"""
return self._file
[docs]class StreamInfoUrl(StreamInfoBase):
"""Contains information about results that are streamed back from ServiceX.
Used when a URL to access the data directly from ServiceX is requested.
"""
[docs] def __init__(self, file: str, url: str, bucket: str):
super().__init__(file)
self._url = url
self._bucket = bucket
@property
def url(self) -> str:
"""URL that can can be used to stream data back from ServiceX.
Returns:
str: The URL of the transformed data for this file.
"""
return self._url
@property
def bucket(self) -> str:
"""Returns the buck name - unique and constant across transformations.
Can be used to order the results
Returns:
str: The bucket name as produced by ServiceX
"""
return self._bucket
[docs]class StreamInfoPath(StreamInfoBase):
"""Contains information about results that are streamed back from ServiceX.
Used when the user has requested streaming, but copying the file locally first.
"""
[docs] def __init__(self, file: str, path: Path):
super().__init__(file)
self._path = path
@property
def path(self) -> Path:
"""Path the the local file of ServiceX data that represents this query.
Returns:
Path: The path object that points to the data requested.
"""
return self._path
@property
def url(self) -> str:
"""URI to the locally downloaded file.
Returns:
str: The URI of the transformed data for this file.
"""
return self._path.as_uri()
[docs]class StreamInfoData(StreamInfoBase):
"""Contains information about results that are streamed back from ServiceX.
Used when data (`pandas` or `awkward`) is requested. The data is downloaded from
ServiceX, converted into the requested format, and then streamed to the user in these
chunks. There is a single chunk per file.
"""
[docs] def __init__(self, file: str, data: Any):
super().__init__(file)
self._data = data
@property
def data(self) -> Any:
"""The `pandas.DataFrame` or `awkward` array return
Returns:
Any: The ServiceX transformed data for this file.
"""
return self._data
[docs]class ServiceXDataset(ServiceXABC):
"""
Used to access an instance of ServiceX at an end point on the internet. Support conversion
by configuration object `config_adaptor` or by creating the adaptors defined in the `__init__`
function.
"""
[docs] def __init__(
self,
dataset: DatasetType,
backend_name: Optional[str] = None,
image: Optional[str] = None,
max_workers: int = 20,
result_destination: str = "object-store",
servicex_adaptor: Optional[ServiceXAdaptor] = None,
minio_adaptor: Optional[Union[MinioAdaptor, MinioAdaptorFactory]] = None,
cache_adaptor: Optional[Cache] = None,
status_callback_factory: Optional[StatusUpdateFactory] = _run_default_wrapper,
local_log: Optional[log_adaptor] = None,
session_generator: Optional[
Callable[[], Awaitable[aiohttp.ClientSession]]
] = None,
config_adaptor: Optional[ServiceXConfigAdaptor] = None,
data_convert_adaptor: Optional[DataConverterAdaptor] = None,
ignore_cache: bool = False,
):
"""
Create and configure a ServiceX object for a dataset.
Arguments
dataset Name of a dataset from which queries will be selected.
backend_name The type of backend. Used only if we need to find an
end-point. If we do not have a `servicex_adaptor` then this
will default to xaod, unless you have any endpoint listed
in your servicex file. It will default to best match there,
or fail if a name has been given.
image Name of transformer image to use to transform the data. If
left as default, `None`, then the default image for the
ServiceX backend will be used.
max_workers Maximum number of transformers to run simultaneously on
ServiceX.
result_destination Where the transformers should write the results.
Defaults to object-store, but could be used to save
results to a posix volume
servicex_adaptor Object to control communication with the servicex instance
at a particular ip address with certain login credentials.
Will be configured via the `config_adaptor` by default.
minio_adaptor Object to control communication with the minio servicex
instance.
cache_adaptor Runs the caching for data and queries that are sent up and
down.
status_callback_factory Factory to create a status notification callback for each
query. One is created per query.
local_log Log adaptor for logging.
session_generator If you want to control the `ClientSession` object that
is used for callbacks. Otherwise a single one for all
`servicex` queries is used.
config_adaptor Control how configuration options are read from the
a configuration file (e.g. servicex.yaml).
data_convert_adaptor Manages conversions between root and parquet and `pandas`
and `awkward`, including default settings for expected
datatypes from the backend.
ignore_cache Always ignore the cache on any query for this dataset. This
is only meaningful if no cache adaptor is provided.
Defaults to false - the cache is used if possible.
Notes:
- The `status_callback` argument, by default, uses the `tqdm` library to render
progress bars in a terminal window or a graphic in a Jupyter notebook (with proper
jupyter extensions installed). If `status_callback` is specified as None, no
updates will be rendered. A custom callback function can also be specified which
takes `(total_files, transformed, downloaded, skipped)` as an argument. The
`total_files` parameter may be `None` until the system knows how many files need to
be processed (and some files can even be completed before that is known).
- The full description of calling parameters is recorded in the local cache, including
things like `image` name and tag.
"""
ServiceXABC.__init__(
self,
dataset,
image,
max_workers,
result_destination,
status_callback_factory,
)
# Get the local settings
config = (
config_adaptor if config_adaptor is not None else ServiceXConfigAdaptor()
)
# Establish the cache that will store all our queries
self._cache = (
Cache(
get_configured_cache_path(config.settings),
ignore_cache,
analysis_query_key=backend_name
if backend_name is not None
else "default",
)
if cache_adaptor is None
else cache_adaptor
)
if not servicex_adaptor:
# Given servicex adaptor is none, this should be ok. Fixes type checkers
end_point, token = config.get_servicex_adaptor_config(backend_name)
servicex_adaptor = ServiceXAdaptor(end_point, token)
self._servicex_adaptor = servicex_adaptor
if not minio_adaptor:
self._minio_adaptor = MinioAdaptorFactory()
else:
if isinstance(minio_adaptor, MinioAdaptor):
self._minio_adaptor = MinioAdaptorFactory(always_return=minio_adaptor)
else:
self._minio_adaptor = minio_adaptor
self._log = log_adaptor() if local_log is None else local_log
self._session_generator = (
session_generator
if session_generator is not None
else default_client_session
)
self._return_types = [config.get_default_returned_datatype(backend_name)]
self._converter = (
data_convert_adaptor
if data_convert_adaptor is not None
else DataConverterAdaptor(self._return_types[0])
)
[docs] def first_supported_datatype(
self, datatypes: Union[List[str], str]
) -> Optional[str]:
"""Return the first datatype format that this dataset/servicex instance can return.
Different instances of `ServiceX` are capable of returning different datatypes. Pass in
the datatypes that your app supports, and this will return the first one that the servicex
backend can return.
Args:
datatypes (Union[List[str], str]): A single or list of datatypes that are supported by
your app.
Returns:
str: The first datatype that is supported. If none of them are, then `None` is
returned.
"""
datatypes = [datatypes] if isinstance(datatypes, str) else datatypes
for dt in datatypes:
if dt in self._return_types:
return dt
return None
[docs] def ignore_cache(self):
"""Return a context manager that, as long as it is held, will cause any queries against
just this dataset to ignore any locally cached data.
Returns:
ContextManager: As long as this is held, the local query cache will be ignored.
"""
return self._cache.ignore_cache()
@functools.wraps(ServiceXABC.get_data_rootfiles_async, updated=())
@_wrap_in_memory_sx_cache
async def get_data_rootfiles_async(
self, selection_query: str, title: Optional[str] = None
) -> List[Path]:
return await self._file_return(selection_query, "root-file", title)
[docs] async def get_data_rootfiles_stream(
self, selection_query: str, title: Optional[str] = None
) -> AsyncIterator[StreamInfoPath]:
"""Returns, as an async iterator, each completed batch of work from Servicex.
The `StreamInfoPath` contains a path where downstream consumers can directly
access the data.
Args:
selection_query (str): The `qastle` query for the data to retrieve.
Yields:
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded
to the local machine, the async iterator returns
a `StreamInfoPath` which can be used to access the
file locally.
"""
async for f_info in self._stream_local_files(
selection_query, title, "root-file"
): # type: ignore
yield f_info
@functools.wraps(ServiceXABC.get_data_parquet_async, updated=())
@_wrap_in_memory_sx_cache
async def get_data_parquet_async(
self, selection_query: str, title: Optional[str] = None
) -> List[Path]:
return await self._file_return(selection_query, "parquet", title)
[docs] async def get_data_parquet_stream(
self, selection_query: str, title: Optional[str] = None
) -> AsyncIterator[StreamInfoPath]:
"""Returns, as an async iterator, each completed batch of work from Servicex.
The `StreamInfoPath` contains a path where downstream consumers can directly
access the data.
Args:
selection_query (str): The `qastle` query for the data to retrieve.
Yields:
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded
to the local machine, the async iterator returns
a `StreamInfoPath` which can be used to access the
file locally.
"""
async for f_info in self._stream_local_files(
selection_query, title, "parquet"
): # type: ignore
yield f_info
@functools.wraps(ServiceXABC.get_data_pandas_df_async, updated=())
@_wrap_in_memory_sx_cache
async def get_data_pandas_df_async(
self, selection_query: str, title: Optional[str] = None
):
data_format = self._return_types[0]
return self._converter.combine_pandas(
await self._data_return(
selection_query,
lambda f: self._converter.convert_to_pandas(f),
title,
data_format=data_format,
)
)
@functools.wraps(ServiceXABC.get_data_awkward_async, updated=())
@_wrap_in_memory_sx_cache
async def get_data_awkward_async(
self, selection_query: str, title: Optional[str] = None
):
data_format = self._return_types[0]
return self._converter.combine_awkward(
await self._data_return(
selection_query,
lambda f: self._converter.convert_to_awkward(f),
title,
data_format=data_format,
)
)
[docs] async def get_data_awkward_stream(
self, selection_query: str, title: Optional[str] = None
) -> AsyncGenerator[StreamInfoData, None]:
"""Returns, as an async iterator, each completed batch of work from Servicex
as a separate `awkward` array. The data is returned in a `StreamInfoData` object.
Args:
selection_query (str): The `qastle` query for the data to retrieve.
Yields:
AsyncIterator[StreamInfoData]: As ServiceX completes the data, and it is downloaded
to the local machine, the async iterator returns
a `StreamInfoData` which can be used to access the
data that has been loaded from the file.
"""
async for a in self._stream_return(
selection_query, title, lambda f: self._converter.convert_to_awkward(f)
):
yield a
[docs] async def get_data_pandas_stream(
self, selection_query: str, title: Optional[str] = None
) -> AsyncGenerator[StreamInfoData, None]:
"""Returns, as an async iterator, each completed batch of work from Servicex
as a separate `pandas.DataFrame` array. The data is returned in a `StreamInfoData` object.
Args:
selection_query (str): The `qastle` query for the data to retrieve.
Yields:
AsyncIterator[StreamInfoData]: As ServiceX completes the data, and it is downloaded
to the local machine, the async iterator returns
a `StreamInfoData` which can be used to access the
data that has been loaded from the file.
"""
async for a in self._stream_return(
selection_query, title, lambda f: self._converter.convert_to_pandas(f)
):
yield a
[docs] async def get_data_rootfiles_uri_stream(
self,
selection_query: str,
title: Optional[str] = None,
as_signed_url: Optional[bool] = False,
) -> AsyncIterator[StreamInfoUrl]:
"""Returns, as an async iterator, each completed batch of work from ServiceX.
The data that comes back includes a `url` that can be accessed to download the
data.
Args:
selection_query (str): The ServiceX Selection
title (str): Optional title for transform request
as_signed_url (bool): Return the uri as a presigned http url?
"""
async for f_info in self._stream_url_buckets(
selection_query, "root-file", title, as_signed_url
): # type: ignore
yield f_info
[docs] async def get_data_rootfiles_uri_async(
self,
selection_query: str,
title: Optional[str] = None,
as_signed_url: Optional[bool] = False,
) -> List[StreamInfoUrl]:
"""Returns a list of StreamInfoUrl entries containing a `url` for each output file
from the transform, taken from get_data_rootfiles_uri_stream.
The data that comes back includes a `url` that can be accessed to download the
data.
Args:
selection_query (str): The ServiceX Selection
title (str): Optional title for transform request
as_signed_url (bool): Return the uri as a presigned http url?
"""
return [
f
async for f in self.get_data_rootfiles_uri_stream(
selection_query, title=title, as_signed_url=as_signed_url
)
]
[docs] async def get_data_parquet_uri_stream(
self,
selection_query: str,
title: Optional[str] = None,
as_signed_url: Optional[bool] = False,
) -> AsyncIterator[StreamInfoUrl]:
"""Returns, as an async iterator, each of the files from the minio bucket,
as the files are added there.
Args:
selection_query (str): The ServiceX Selection
title (str): Optional title for transform request
as_signed_url (bool): Return the uri as a presigned http url?
"""
async for f_info in self._stream_url_buckets(
selection_query, "parquet", title, as_signed_url
): # type: ignore
yield f_info
[docs] async def get_data_parquet_uri_async(
self,
selection_query: str,
title: Optional[str] = None,
as_signed_url: Optional[bool] = False,
) -> List[StreamInfoUrl]:
"""Returns a list of each of the files from the minio bucket.
Args:
selection_query (str): The ServiceX Selection
title (str): Optional title for transform request
as_signed_url (bool): Return the uri as a presigned http url?
"""
return [
f_info
async for f_info in self.get_data_parquet_uri_stream(
selection_query, title, as_signed_url
)
]
[docs] async def _file_return(
self, selection_query: str, data_format: str, title: Optional[str]
):
"""
Given a query, return the list of files, in a unique order, that hold
the data for the query.
For certain types of exceptions, the queries will be repeated. For example,
if `ServiceX` indicates that it was restarted in the middle of the query, then
the query will be re-submitted.
Arguments:
selection_query `qastle` data that makes up the selection request.
data_format The file-based data format (root or parquet)
title The title assigned to this transform request.
Returns:
data Data converted to the "proper" format, depending
on the converter call.
"""
async def convert_to_file(f: Path) -> Path:
return f
return await self._data_return(
selection_query, convert_to_file, title, data_format
)
[docs] @on_exception_itr(
backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3
)
@on_exception_itr(
backoff.constant,
(ServiceXUnknownDataRequestID, minio.error.NoSuchBucket),
interval=0.1,
max_tries=2,
)
async def _stream_url_buckets(
self,
selection_query: str,
data_format: str,
title: Optional[str],
as_signed_url: Optional[bool],
) -> AsyncGenerator[StreamInfoUrl, None]:
"""Get a list of files back for a request
Args:
selection_query (str): The selection query we are to do
data_format (str): The requested file format
title (Optional[str]): The title of transform to pass to ServiceX
as_signed_url (Optional[bool]): Return the uri as a presigned http url?
Yields:
AsyncIterator[Dict[str, str]]: A tuple of the minio bucket and file in that bucket.
The dict will have entries for:
bucket: The minio bucket name
file: the completed file in the bucket
"""
query = self._build_json_query(selection_query, data_format, title)
async with aiohttp.ClientSession() as client:
# Get a request id - which might be cached, but if not, submit it.
request_id = await self._get_request_id(client, query)
# Make sure cache status exists (user could have deleted, see #176)
if not self._cache.query_status_exists(request_id):
await self._update_query_status(client, request_id)
# Get the minio adaptor we are going to use for downloading.
minio_adaptor = self._minio_adaptor.from_best(
self._cache.lookup_query_status(request_id)
)
# Look up the cache, and then fetch an iterator going through the results
# from either servicex or the cache, depending.
try:
notifier = self._create_notifier(title, False)
minio_files = self._get_minio_bucket_files_from_servicex(
request_id, client, minio_adaptor, notifier
)
# Reflect the files back up a level.
async for object_name in minio_files:
uri = (
minio_adaptor.get_access_url(request_id, object_name)
if as_signed_url
else minio_adaptor.get_s3_uri(request_id, object_name)
)
yield StreamInfoUrl(object_name, uri, request_id)
# Cache the final status
await self._update_query_status(client, request_id)
except ServiceXUnknownRequestID as e:
self._cache.remove_query(query)
raise ServiceXUnknownDataRequestID(
"Expected the ServiceX backend to know about "
f"query {request_id}. It did not. Cleared local"
"cache. Please resubmit to trigger a new "
"query."
) from e
except ServiceXFatalTransformException as e:
transform_status = await self._servicex_adaptor.get_query_status(
client, request_id
)
self._cache.remove_query(query)
raise ServiceXFatalTransformException(
f'ServiceX Fatal Error: {transform_status["failure-info"]}'
) from e
except ServiceXFailedFileTransform as e:
self._cache.remove_query(query)
await self._servicex_adaptor.dump_query_errors(client, request_id)
raise ServiceXException(
f"Failed to transform all files in {request_id}"
) from e
[docs] @on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
@on_exception(
backoff.constant,
(ServiceXUnknownDataRequestID, minio.error.NoSuchBucket),
interval=0.1,
max_tries=2,
)
async def _data_return(
self,
selection_query: str,
converter: Callable[[Path], Awaitable[Any]],
title: Optional[str],
data_format: str = "root-file",
) -> List[Any]:
"""Given a query, return the data, in a unique order, that hold
the data for the query.
For certain types of exceptions, the queries will be repeated. For example,
if `ServiceX` indicates that it was restarted in the middle of the query, then
the query will be re-submitted.
Arguments:
selection_query `qastle` data that makes up the selection request.
converter A `Callable` that will convert the data returned from
`ServiceX` as a set of files.
title Title to send to the backend service
data_format The data format that we want to render in
Returns:
data Data converted to the "proper" format, depending
on the converter call.
"""
all_data = {
f.file: f.data
async for f in self._stream_return(
selection_query, title, converter, data_format
)
}
# Finally, we need them in the proper order so we append them
# all together
ordered_data = [all_data[k] for k in sorted(all_data.keys())]
return ordered_data
[docs] async def _stream_return(
self,
selection_query: str,
title: Optional[str],
converter: Callable[[Path], Awaitable[Any]],
data_format: str = "root-file",
) -> AsyncIterator[StreamInfoData]:
"""Given a query, return the data, in the order it arrives back
converted as appropriate.
For certain types of exceptions, the queries will be repeated. For example,
if `ServiceX` indicates that it was restarted in the middle of the query, then
the query will be re-submitted.
Arguments:
selection_query `qastle` data that makes up the selection request.
converter A `Callable` that will convert the data returned from
`ServiceX` as a set of files.
Returns:
data Data converted to the "proper" format, depending
on the converter call.
"""
as_data = (
StreamInfoData(f.file, await asyncio.ensure_future(converter(f.path)))
async for f in self._stream_local_files(selection_query, title, data_format)
) # type: ignore
async for r in as_data:
yield r
[docs] async def _stream_local_files(
self, selection_query: str, title: Optional[str], data_format: str = "root-file"
) -> AsyncGenerator[StreamInfoPath, None]:
"""
Given a query, return the data as a list of paths pointing to local files
that contain the results of the query. This is an async generator, and files
are returned as they arrive.
For certain types of exceptions, the queries will be repeated. For example,
if `ServiceX` indicates that it was restarted in the middle of the query, then
the query will be re-submitted.
Arguments:
selection_query `qastle` data that makes up the selection request.
Returns:
data Data converted to the "proper" format, depending
on the converter call.
"""
# Get a notifier to update anyone who wants to listen.
notifier = self._create_notifier(title, True)
# Get all the files
as_files = (
f
async for f in self._get_files(
selection_query, data_format, notifier, title
)
) # type: ignore
async for name, a_path in as_files:
yield StreamInfoPath(name, Path(await a_path))
[docs] async def _get_files(
self,
selection_query: str,
data_format: str,
notifier: _status_update_wrapper,
title: Optional[str],
) -> AsyncIterator[Tuple[str, Awaitable[Path]]]:
"""
Return a list of files from servicex as they have been downloaded to this machine. The
return type is an awaitable that will yield the path to the file.
For certain types of `ServiceX` failures we will automatically attempt a few retries:
- When `ServiceX` forgets the query. This sometimes happens when a user submits a
query, and then disconnects from the network, `ServiceX` is restarted, and then the
user attempts to download the files from that "no-longer-existing" request.
Up to 3 re-tries are attempted automatically.
Arguments:
selection_query The query string to send to ServiceX
data_format The type of data that we want to come back.
notifier Status callback to let our progress be advertised
title Title to pass to servicex backend.
Returns
Awaitable[Path] An awaitable that is a path. When it completes, the
path will be valid and point to an existing file.
This is returned this way so a number of downloads can run
simultaneously.
"""
query = self._build_json_query(selection_query, data_format, title)
async with aiohttp.ClientSession() as client:
# Get a request id - which might be cached, but if not, submit it.
request_id = await self._get_request_id(client, query)
# Make sure cache status exists (user could have deleted, see #176)
if not self._cache.query_status_exists(request_id):
await self._update_query_status(client, request_id)
# Get the minio adaptor we are going to use for downloading.
minio_adaptor = self._minio_adaptor.from_best(
self._cache.lookup_query_status(request_id)
)
# Look up the cache, and then fetch an iterator going through the results
# from either servicex or the cache, depending.
try:
cached_files = self._cache.lookup_files(request_id)
stream_local_files = (
self._get_cached_files(cached_files, notifier)
if cached_files is not None
else self._get_files_from_servicex(
request_id, client, minio_adaptor, notifier
)
)
# Reflect the files back up a level.
async for r in stream_local_files:
yield r
# Cache the final status
if cached_files is None:
await self._update_query_status(client, request_id)
except ServiceXUnknownRequestID as e:
self._cache.remove_query(query)
raise ServiceXUnknownDataRequestID(
"Expected the ServiceX backend to know about "
f"query {request_id}. It did not. Cleared local"
" cache. Please resubmit to trigger a new "
"query."
) from e
except ServiceXNoFilesInCache as e:
self._cache.remove_query(query)
raise ServiceXUnknownDataRequestID(
"Expected the ServiceX backend to have cached data "
f"from query {request_id}. It did not. Cleared local"
" cache. Please resubmit to trigger a new "
"query."
) from e
except ServiceXFatalTransformException as e:
transform_status = await self._servicex_adaptor.get_query_status(
client, request_id
)
self._cache.remove_query(query)
raise ServiceXFatalTransformException(
f'ServiceX Fatal Error: {transform_status["failure-info"]}'
) from e
except ServiceXFailedFileTransform as e:
self._cache.remove_query(query)
await self._servicex_adaptor.dump_query_errors(client, request_id)
raise ServiceXException(
f"Failed to transform all files in {request_id}"
) from e
[docs] async def _get_request_id(
self, client: aiohttp.ClientSession, query: Dict[str, Any]
) -> str:
"""
For this query, fetch the request id. If we have it cached, use that. Otherwise, query
ServiceX for a enw one (and cache it for later use).
"""
request_id = self._cache.lookup_query(query)
if request_id is None:
request_info = await self._servicex_adaptor.submit_query(client, query)
request_id = request_info["request_id"]
self._cache.set_query(query, request_id)
await self._update_query_status(client, request_id)
return request_id
[docs] async def _update_query_status(
self, client: aiohttp.ClientSession, request_id: str
):
"""Fetch the status from servicex and cache it locally, over
writing what was there before.
Args:
request_id (str): Request id of the status to fetch and cache.
"""
info = await self._servicex_adaptor.get_query_status(client, request_id)
self._cache.set_query_status(info)
[docs] async def _get_cached_files(
self, cached_files: List[Tuple[str, Path]], notifier: _status_update_wrapper
):
"""
Return the list of files as an iterator that we have pulled from the cache
"""
notifier.update(processed=len(cached_files), remaining=0, failed=0)
loop = asyncio.get_event_loop()
for f, p in cached_files:
path_future = loop.create_future()
path_future.set_result(p)
notifier.inc(downloaded=1)
yield f, path_future
[docs] async def _download_a_file(
self,
stream: AsyncIterator[str],
request_id: str,
minio_adaptor: MinioAdaptor,
notifier: _status_update_wrapper,
) -> AsyncIterator[Tuple[str, Awaitable[Path]]]:
"""
Given an object name and request id, fetch the data locally from the minio bucket.
The copy can take a while, so send it off to another thread - don't pause queuing up other
files to download.
"""
async def do_copy(final_path):
assert request_id is not None
await minio_adaptor.download_file(request_id, f, final_path)
notifier.inc(downloaded=1)
notifier.broadcast()
return final_path
file_object_list: List[Tuple[str, Path]] = []
async for f in stream:
copy_to_path = self._cache.data_file_location(request_id, f)
file_object_list.append((f, copy_to_path))
yield f, do_copy(copy_to_path)
self._cache.set_files(request_id, file_object_list)
[docs] async def _get_files_from_servicex(
self,
request_id: str,
client: aiohttp.ClientSession,
minio_adaptor: MinioAdaptor,
notifier: _status_update_wrapper,
):
"""
Fetch query result files from `servicex`. Given the `request_id` we will download
files as they become available. We also coordinate caching.
"""
start_time = time.monotonic()
good = True
try:
# Get the stream of minio bucket new files.
stream_new_object = self._get_minio_bucket_files_from_servicex(
request_id, client, minio_adaptor, notifier
)
# Next, download the files as they are found (and return them):
stream_downloaded = self._download_a_file(
stream_new_object, request_id, minio_adaptor, notifier
)
# Return the files to anyone that wants them!
async for info in stream_downloaded:
yield info
except Exception:
good = False
raise
finally:
end_time = time.monotonic()
run_time = timedelta(seconds=end_time - start_time)
logging.getLogger(__name__).info(
f"Running servicex query for " f"{request_id} took {run_time}"
)
self._log.write_query_log(
request_id,
notifier.total,
notifier.failed,
run_time,
good,
self._cache.path,
)
[docs] async def _get_minio_bucket_files_from_servicex(
self,
request_id: str,
client: aiohttp.ClientSession,
minio_adaptor: MinioAdaptor,
notifier: _status_update_wrapper,
) -> AsyncIterator[str]:
"""Create an async stream of `minio` bucket/filenames from a request id.
Args:
request_id (str): The request id that we should be polling for updates.
client (aiohttp.ClientSession): The client connection to make API queries on
minio_adaptor (MinioAdaptor): The minio adaptor we can use to connect to the minio
bucket for new items.
notifier (_status_update_wrapper): Allows us to send updates of progress
back to the user
Yields:
[type]: Returns xxx and yyy.
"""
start_time = time.monotonic()
try:
# Setup the status sequence from servicex
stream_status = transform_status_stream(
self._servicex_adaptor, client, request_id
)
stream_notified = stream_status_updates(stream_status, notifier)
stream_watched = trap_servicex_failures(stream_notified)
stream_unique = stream_unique_updates_only(stream_watched)
# Next, download the files as they are found (and return them):
stream_new_object = find_new_bucket_files(
minio_adaptor, request_id, stream_unique
)
# Return the minio information.
async for info in stream_new_object:
yield info
finally:
end_time = time.monotonic()
run_time = timedelta(seconds=end_time - start_time)
logging.getLogger(__name__).info(
f"Running servicex query for "
f"{request_id} took {run_time} (no files downloaded)"
)
[docs] def _build_json_query(
self, selection_query: str, data_format: str, title: Optional[str]
) -> Dict[str, Union[str, Iterable[str]]]:
"""
Returns a list of locally written files for a given selection query.
Arguments:
selection_query The query to be send into the ServiceX API
data_format What is the output data type (parquet, root-file, etc.)
Notes:
- Internal routine.
"""
assert data_format in g_allowed_formats
# Items that must always be present
json_query: Dict[str, Union[str, Iterable[str]]] = {
"selection": selection_query,
"result-destination": self._result_destination,
"result-format": data_format,
"chunk-size": "1000",
"workers": str(self._max_workers),
}
# Add the appropriate did.
# Capture full did as well as single item files (see #178)
if isinstance(self._dataset, str):
if self._dataset[0:7].lower() in ["root://", "http://"]:
json_query["file-list"] = [self._dataset]
else:
json_query["did"] = self._dataset
else:
json_query["file-list"] = self._dataset
# Optional items
if self._image is not None:
json_query["image"] = self._image
if title is not None:
json_query["title"] = title
logging.getLogger(__name__).debug(
f"JSON to be sent to servicex: {str(json_query)}"
)
return json_query
# Define the synchronous versions of the async methods for easy of use
get_data_rootfiles_uri = make_sync(get_data_rootfiles_uri_async)
get_data_parquet_uri = make_sync(get_data_parquet_uri_async)