ServiceXDataset#

class servicex.servicex.ServiceXDataset(dataset: ~typing.Union[str, ~typing.Iterable[str]], backend_name: ~typing.Optional[str] = None, image: ~typing.Optional[str] = None, max_workers: int = 20, result_destination: str = 'object-store', servicex_adaptor: ~typing.Optional[~servicex.servicex_adaptor.ServiceXAdaptor] = None, minio_adaptor: ~typing.Optional[~typing.Union[~servicex.minio_adaptor.MinioAdaptor, ~servicex.minio_adaptor.MinioAdaptorFactory]] = None, cache_adaptor: ~typing.Optional[~servicex.cache.Cache] = None, status_callback_factory: ~typing.Optional[~typing.Callable[[~typing.Union[str, ~typing.Iterable[str]], ~typing.Optional[str], bool], ~typing.Callable[[~typing.Optional[int], int, int, int], None]]] = <function _run_default_wrapper>, local_log: ~typing.Optional[~servicex.utils.log_adaptor] = None, session_generator: ~typing.Optional[~typing.Callable[[], ~typing.Awaitable[~aiohttp.client.ClientSession]]] = None, config_adaptor: ~typing.Optional[~servicex.servicex_config.ServiceXConfigAdaptor] = None, data_convert_adaptor: ~typing.Optional[~servicex.data_conversions.DataConverterAdaptor] = None, ignore_cache: bool = False)[source]#

Bases: ServiceXABC

Used to access an instance of ServiceX at an end point on the internet. Support conversion by configuration object config_adaptor or by creating the adaptors defined in the __init__ function.

__init__(dataset: ~typing.Union[str, ~typing.Iterable[str]], backend_name: ~typing.Optional[str] = None, image: ~typing.Optional[str] = None, max_workers: int = 20, result_destination: str = 'object-store', servicex_adaptor: ~typing.Optional[~servicex.servicex_adaptor.ServiceXAdaptor] = None, minio_adaptor: ~typing.Optional[~typing.Union[~servicex.minio_adaptor.MinioAdaptor, ~servicex.minio_adaptor.MinioAdaptorFactory]] = None, cache_adaptor: ~typing.Optional[~servicex.cache.Cache] = None, status_callback_factory: ~typing.Optional[~typing.Callable[[~typing.Union[str, ~typing.Iterable[str]], ~typing.Optional[str], bool], ~typing.Callable[[~typing.Optional[int], int, int, int], None]]] = <function _run_default_wrapper>, local_log: ~typing.Optional[~servicex.utils.log_adaptor] = None, session_generator: ~typing.Optional[~typing.Callable[[], ~typing.Awaitable[~aiohttp.client.ClientSession]]] = None, config_adaptor: ~typing.Optional[~servicex.servicex_config.ServiceXConfigAdaptor] = None, data_convert_adaptor: ~typing.Optional[~servicex.data_conversions.DataConverterAdaptor] = None, ignore_cache: bool = False)[source]#

Create and configure a ServiceX object for a dataset.

Arguments

dataset Name of a dataset from which queries will be selected. backend_name The type of backend. Used only if we need to find an

end-point. If we do not have a servicex_adaptor then this will default to xaod, unless you have any endpoint listed in your servicex file. It will default to best match there, or fail if a name has been given.

image Name of transformer image to use to transform the data. If

left as default, None, then the default image for the ServiceX backend will be used.

max_workers Maximum number of transformers to run simultaneously on

ServiceX.

result_destination Where the transformers should write the results.

Defaults to object-store, but could be used to save results to a posix volume

servicex_adaptor Object to control communication with the servicex instance

at a particular ip address with certain login credentials. Will be configured via the config_adaptor by default.

minio_adaptor Object to control communication with the minio servicex

instance.

cache_adaptor Runs the caching for data and queries that are sent up and

down.

status_callback_factory Factory to create a status notification callback for each

query. One is created per query.

local_log Log adaptor for logging. session_generator If you want to control the ClientSession object that

is used for callbacks. Otherwise a single one for all servicex queries is used.

config_adaptor Control how configuration options are read from the

a configuration file (e.g. servicex.yaml).

data_convert_adaptor Manages conversions between root and parquet and pandas

and awkward, including default settings for expected datatypes from the backend.

ignore_cache Always ignore the cache on any query for this dataset. This

is only meaningful if no cache adaptor is provided. Defaults to false - the cache is used if possible.

Notes:

  • The status_callback argument, by default, uses the tqdm library to render progress bars in a terminal window or a graphic in a Jupyter notebook (with proper jupyter extensions installed). If status_callback is specified as None, no updates will be rendered. A custom callback function can also be specified which takes (total_files, transformed, downloaded, skipped) as an argument. The total_files parameter may be None until the system knows how many files need to be processed (and some files can even be completed before that is known).

  • The full description of calling parameters is recorded in the local cache, including things like image name and tag.

Attributes

dataset_as_name#

Return the dataset name as a string for “human” consumption.

Note that this can be very very long!

Returns:

str: The dataset name formatted as a string

Methods

_abc_impl()#

Internal state held by ABC machinery.

_build_json_query(selection_query: str, data_format: str, title: Optional[str]) Dict[str, Union[str, Iterable[str]]][source]#

Returns a list of locally written files for a given selection query.

Arguments:

selection_query The query to be send into the ServiceX API data_format What is the output data type (parquet, root-file, etc.)

Notes:
  • Internal routine.

async _data_return(selection_query: str, converter: Callable[[Path], Awaitable[Any]], title: Optional[str], data_format: str = 'root-file') List[Any][source]#

Given a query, return the data, in a unique order, that hold the data for the query.

For certain types of exceptions, the queries will be repeated. For example, if ServiceX indicates that it was restarted in the middle of the query, then the query will be re-submitted.

Arguments:

selection_query qastle data that makes up the selection request. converter A Callable that will convert the data returned from

ServiceX as a set of files.

title Title to send to the backend service data_format The data format that we want to render in

Returns:

data Data converted to the “proper” format, depending

on the converter call.

async _download_a_file(stream: AsyncIterator[str], request_id: str, minio_adaptor: MinioAdaptor, notifier: _status_update_wrapper) AsyncIterator[Tuple[str, Awaitable[Path]]][source]#

Given an object name and request id, fetch the data locally from the minio bucket. The copy can take a while, so send it off to another thread - don’t pause queuing up other files to download.

async _file_return(selection_query: str, data_format: str, title: Optional[str])[source]#

Given a query, return the list of files, in a unique order, that hold the data for the query.

For certain types of exceptions, the queries will be repeated. For example, if ServiceX indicates that it was restarted in the middle of the query, then the query will be re-submitted.

Arguments:

selection_query qastle data that makes up the selection request. data_format The file-based data format (root or parquet) title The title assigned to this transform request.

Returns:

data Data converted to the “proper” format, depending

on the converter call.

async _get_cached_files(cached_files: List[Tuple[str, Path]], notifier: _status_update_wrapper)[source]#

Return the list of files as an iterator that we have pulled from the cache

async _get_files(selection_query: str, data_format: str, notifier: _status_update_wrapper, title: Optional[str]) AsyncIterator[Tuple[str, Awaitable[Path]]][source]#

Return a list of files from servicex as they have been downloaded to this machine. The return type is an awaitable that will yield the path to the file.

For certain types of ServiceX failures we will automatically attempt a few retries:

  • When ServiceX forgets the query. This sometimes happens when a user submits a query, and then disconnects from the network, ServiceX is restarted, and then the user attempts to download the files from that “no-longer-existing” request.

Up to 3 re-tries are attempted automatically.

Arguments:

selection_query The query string to send to ServiceX data_format The type of data that we want to come back. notifier Status callback to let our progress be advertised title Title to pass to servicex backend.

Returns
Awaitable[Path] An awaitable that is a path. When it completes, the

path will be valid and point to an existing file. This is returned this way so a number of downloads can run simultaneously.

async _get_files_from_servicex(request_id: str, client: ClientSession, minio_adaptor: MinioAdaptor, notifier: _status_update_wrapper)[source]#

Fetch query result files from servicex. Given the request_id we will download files as they become available. We also coordinate caching.

async _get_minio_bucket_files_from_servicex(request_id: str, client: ClientSession, minio_adaptor: MinioAdaptor, notifier: _status_update_wrapper) AsyncIterator[str][source]#

Create an async stream of minio bucket/filenames from a request id.

Args:

request_id (str): The request id that we should be polling for updates. client (aiohttp.ClientSession): The client connection to make API queries on minio_adaptor (MinioAdaptor): The minio adaptor we can use to connect to the minio

bucket for new items.

notifier (_status_update_wrapper): Allows us to send updates of progress

back to the user

Yields:

[type]: Returns xxx and yyy.

async _get_request_id(client: ClientSession, query: Dict[str, Any]) str[source]#

For this query, fetch the request id. If we have it cached, use that. Otherwise, query ServiceX for a enw one (and cache it for later use).

async _stream_local_files(selection_query: str, title: Optional[str], data_format: str = 'root-file') AsyncGenerator[StreamInfoPath, None][source]#

Given a query, return the data as a list of paths pointing to local files that contain the results of the query. This is an async generator, and files are returned as they arrive.

For certain types of exceptions, the queries will be repeated. For example, if ServiceX indicates that it was restarted in the middle of the query, then the query will be re-submitted.

Arguments:

selection_query qastle data that makes up the selection request.

Returns:

data Data converted to the “proper” format, depending

on the converter call.

async _stream_return(selection_query: str, title: Optional[str], converter: Callable[[Path], Awaitable[Any]], data_format: str = 'root-file') AsyncIterator[StreamInfoData][source]#

Given a query, return the data, in the order it arrives back converted as appropriate.

For certain types of exceptions, the queries will be repeated. For example, if ServiceX indicates that it was restarted in the middle of the query, then the query will be re-submitted.

Arguments:

selection_query qastle data that makes up the selection request. converter A Callable that will convert the data returned from

ServiceX as a set of files.

Returns:

data Data converted to the “proper” format, depending

on the converter call.

async _stream_url_buckets(selection_query: str, data_format: str, title: Optional[str], as_signed_url: Optional[bool]) AsyncGenerator[StreamInfoUrl, None][source]#

Get a list of files back for a request

Args:

selection_query (str): The selection query we are to do data_format (str): The requested file format title (Optional[str]): The title of transform to pass to ServiceX as_signed_url (Optional[bool]): Return the uri as a presigned http url?

Yields:
AsyncIterator[Dict[str, str]]: A tuple of the minio bucket and file in that bucket.
The dict will have entries for:

bucket: The minio bucket name file: the completed file in the bucket

async _update_query_status(client: ClientSession, request_id: str)[source]#

Fetch the status from servicex and cache it locally, over writing what was there before.

Args:

request_id (str): Request id of the status to fetch and cache.

first_supported_datatype(datatypes: Union[List[str], str]) Optional[str][source]#

Return the first datatype format that this dataset/servicex instance can return.

Different instances of ServiceX are capable of returning different datatypes. Pass in the datatypes that your app supports, and this will return the first one that the servicex backend can return.

Args:
datatypes (Union[List[str], str]): A single or list of datatypes that are supported by

your app.

Returns:

str: The first datatype that is supported. If none of them are, then None is returned.

async get_data_awkward_async(selection_query: str, title: Optional[str] = None) Dict[bytes, Array]#

Fetch query data from ServiceX matching selection_query and return it as dictionary of awkward arrays, an entry for each column. The data is uniquely ordered (the same query will always return the same order).

Arguments:

selection_query The qastle string specifying the data to be queried title Title reported to the ServiceX backend for status reporting

Returns:
a Dictionary of jagged arrays (as needed), one for each

column. The dictionary keys are bytes to support possible unicode characters.

async get_data_awkward_stream(selection_query: str, title: Optional[str] = None) AsyncGenerator[StreamInfoData, None][source]#

Returns, as an async iterator, each completed batch of work from Servicex as a separate awkward array. The data is returned in a StreamInfoData object.

Args:

selection_query (str): The qastle query for the data to retrieve.

Yields:
AsyncIterator[StreamInfoData]: As ServiceX completes the data, and it is downloaded

to the local machine, the async iterator returns a StreamInfoData which can be used to access the data that has been loaded from the file.

async get_data_pandas_df_async(selection_query: str, title: Optional[str] = None) DataFrame#

Fetch query data from ServiceX matching selection_query and return it as a pandas DataFrame. The data is uniquely ordered (the same query will always return the same order).

Arguments:

selection_query The qastle string specifying the data to be queried title Title reported to the ServiceX backend for status reporting

Returns:

df The pandas DataFrame

Exceptions:
xxx If the data is not the correct shape (e.g. a flat,

rectangular table).

async get_data_pandas_stream(selection_query: str, title: Optional[str] = None) AsyncGenerator[StreamInfoData, None][source]#

Returns, as an async iterator, each completed batch of work from Servicex as a separate pandas.DataFrame array. The data is returned in a StreamInfoData object.

Args:

selection_query (str): The qastle query for the data to retrieve.

Yields:
AsyncIterator[StreamInfoData]: As ServiceX completes the data, and it is downloaded

to the local machine, the async iterator returns a StreamInfoData which can be used to access the data that has been loaded from the file.

async get_data_parquet_async(selection_query: str, title: Optional[str] = None) List[Path]#

Fetch query data from ServiceX matching selection_query and return it as a list of parquet files. The files are uniquely ordered (the same query will always return the same order).

Arguments:

selection_query The qastle string specifying the data to be queried title Title reported to the ServiceX backend for status reporting

Returns:

root_files The list of parquet files

async get_data_parquet_stream(selection_query: str, title: Optional[str] = None) AsyncIterator[StreamInfoPath][source]#

Returns, as an async iterator, each completed batch of work from Servicex. The StreamInfoPath contains a path where downstream consumers can directly access the data.

Args:

selection_query (str): The qastle query for the data to retrieve.

Yields:
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded

to the local machine, the async iterator returns a StreamInfoPath which can be used to access the file locally.

get_data_parquet_uri(selection_query: str, title: Optional[str] = None, as_signed_url: Optional[bool] = False) List[StreamInfoUrl]#

Returns a list of each of the files from the minio bucket.

Args:

selection_query (str): The ServiceX Selection title (str): Optional title for transform request as_signed_url (bool): Return the uri as a presigned http url?

async get_data_parquet_uri_async(selection_query: str, title: Optional[str] = None, as_signed_url: Optional[bool] = False) List[StreamInfoUrl][source]#

Returns a list of each of the files from the minio bucket.

Args:

selection_query (str): The ServiceX Selection title (str): Optional title for transform request as_signed_url (bool): Return the uri as a presigned http url?

async get_data_parquet_uri_stream(selection_query: str, title: Optional[str] = None, as_signed_url: Optional[bool] = False) AsyncIterator[StreamInfoUrl][source]#

Returns, as an async iterator, each of the files from the minio bucket, as the files are added there.

Args:

selection_query (str): The ServiceX Selection title (str): Optional title for transform request as_signed_url (bool): Return the uri as a presigned http url?

async get_data_rootfiles_async(selection_query: str, title: Optional[str] = None) List[Path]#

Fetch query data from ServiceX matching selection_query and return it as a list of root files. The files are uniquely ordered (the same query will always return the same order).

Arguments:

selection_query The qastle string specifying the data to be queried title Title reported to the ServiceX backend for status reporting

Returns:

root_files The list of root files

async get_data_rootfiles_stream(selection_query: str, title: Optional[str] = None) AsyncIterator[StreamInfoPath][source]#

Returns, as an async iterator, each completed batch of work from Servicex. The StreamInfoPath contains a path where downstream consumers can directly access the data.

Args:

selection_query (str): The qastle query for the data to retrieve.

Yields:
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded

to the local machine, the async iterator returns a StreamInfoPath which can be used to access the file locally.

get_data_rootfiles_uri(selection_query: str, title: Optional[str] = None, as_signed_url: Optional[bool] = False) List[StreamInfoUrl]#

Returns a list of StreamInfoUrl entries containing a url for each output file from the transform, taken from get_data_rootfiles_uri_stream. The data that comes back includes a url that can be accessed to download the data.

Args:

selection_query (str): The ServiceX Selection title (str): Optional title for transform request as_signed_url (bool): Return the uri as a presigned http url?

async get_data_rootfiles_uri_async(selection_query: str, title: Optional[str] = None, as_signed_url: Optional[bool] = False) List[StreamInfoUrl][source]#

Returns a list of StreamInfoUrl entries containing a url for each output file from the transform, taken from get_data_rootfiles_uri_stream. The data that comes back includes a url that can be accessed to download the data.

Args:

selection_query (str): The ServiceX Selection title (str): Optional title for transform request as_signed_url (bool): Return the uri as a presigned http url?

async get_data_rootfiles_uri_stream(selection_query: str, title: Optional[str] = None, as_signed_url: Optional[bool] = False) AsyncIterator[StreamInfoUrl][source]#

Returns, as an async iterator, each completed batch of work from ServiceX. The data that comes back includes a url that can be accessed to download the data.

Args:

selection_query (str): The ServiceX Selection title (str): Optional title for transform request as_signed_url (bool): Return the uri as a presigned http url?

ignore_cache()[source]#

Return a context manager that, as long as it is held, will cause any queries against just this dataset to ignore any locally cached data.

Returns:

ContextManager: As long as this is held, the local query cache will be ignored.