from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional
from make_it_sync import make_sync
import awkward as ak
import pandas as pd
from .utils import (
DatasetType,
StatusUpdateFactory,
_null_progress_feedback,
_run_default_wrapper,
_status_update_wrapper,
dataset_as_name,
)
[docs]class ServiceXABC(ABC):
"""
Abstract base class for accessing the ServiceX front-end for a particular dataset. This does
have some implementations, but not a full set (hence why it isn't an ABC).
A light weight, mostly immutable, base class that holds basic configuration information for use
with ServiceX file access, including the dataset name. Subclasses implement the various access
methods. Note that not all methods may be accessible!
"""
[docs] def __init__(
self,
dataset: DatasetType,
image: Optional[str] = None,
max_workers: int = 20,
result_destination: str = "object-store",
status_callback_factory: Optional[StatusUpdateFactory] = _run_default_wrapper,
):
"""
Create and configure a ServiceX object for a dataset.
Arguments
dataset Name of a dataset from which queries will be selected.
image Name of transformer image to use to transform the data. If
None the default implementation is used.
cache_adaptor Runs the caching for data and queries that are sent up and
down.
max_workers Maximum number of transformers to run simultaneously on
ServiceX.
result_destination Where the transformers should write the results.
Defaults to object-store, but could be used to save
results to a posix volume
cache_path Path to the cache
status_callback_factory Factory to create a status notification callback for each
query. One is created per query.
Notes:
- The `status_callback` argument, by default, uses the `tqdm` library to render
progress bars in a terminal window or a graphic in a Jupyter notebook (with proper
jupyter extensions installed). If `status_callback` is specified as None, no
updates will be rendered. A custom callback function can also be specified which
takes `(total_files, transformed, downloaded, skipped)` as an argument. The
`total_files` parameter may be `None` until the system knows how many files need to
be processed (and some files can even be completed before that is known).
"""
self._dataset = dataset
self._image = image
self._max_workers = max_workers
self._result_destination = result_destination
# We can't create the notifier until the actual query,
# so only need to save the status update.
self._status_callback_factory = (
status_callback_factory
if status_callback_factory is not None
else _null_progress_feedback
)
@property
def dataset_as_name(self) -> str:
"""Return the dataset name as a string for "human" consumption.
Note that this can be very very long!
Returns:
str: The dataset name formatted as a string
"""
return dataset_as_name(self._dataset, max_len=None)
[docs] def _create_notifier(
self, title: Optional[str], downloading: bool
) -> _status_update_wrapper:
"Internal method to create a updater from the status call-back"
return _status_update_wrapper(
self._status_callback_factory(self._dataset, title, downloading)
)
[docs] @abstractmethod
async def get_data_rootfiles_async(
self, selection_query: str, title: Optional[str] = None
) -> List[Path]:
"""
Fetch query data from ServiceX matching `selection_query` and return it as
a list of root files. The files are uniquely ordered (the same query will always
return the same order).
Arguments:
selection_query The `qastle` string specifying the data to be queried
title Title reported to the ServiceX backend for status reporting
Returns:
root_files The list of root files
"""
[docs] @abstractmethod
async def get_data_pandas_df_async(
self, selection_query: str, title: Optional[str] = None
) -> pd.DataFrame:
"""
Fetch query data from ServiceX matching `selection_query` and return it as
a pandas DataFrame. The data is uniquely ordered (the same query will always
return the same order).
Arguments:
selection_query The `qastle` string specifying the data to be queried
title Title reported to the ServiceX backend for status reporting
Returns:
df The pandas DataFrame
Exceptions:
xxx If the data is not the correct shape (e.g. a flat,
rectangular table).
"""
[docs] @abstractmethod
async def get_data_awkward_async(
self, selection_query: str, title: Optional[str] = None
) -> Dict[bytes, ak.Array]:
"""
Fetch query data from ServiceX matching `selection_query` and return it as
dictionary of awkward arrays, an entry for each column. The data is uniquely
ordered (the same query will always return the same order).
Arguments:
selection_query The `qastle` string specifying the data to be queried
title Title reported to the ServiceX backend for status reporting
Returns:
a Dictionary of jagged arrays (as needed), one for each
column. The dictionary keys are `bytes` to support possible
unicode characters.
"""
[docs] @abstractmethod
async def get_data_parquet_async(
self, selection_query: str, title: Optional[str] = None
) -> List[Path]:
"""
Fetch query data from ServiceX matching `selection_query` and return it as
a list of parquet files. The files are uniquely ordered (the same query will always
return the same order).
Arguments:
selection_query The `qastle` string specifying the data to be queried
title Title reported to the ServiceX backend for status reporting
Returns:
root_files The list of parquet files
"""
# Define the synchronous versions of the async methods for easy of use
get_data_rootfiles = make_sync(get_data_rootfiles_async)
get_data_pandas_df = make_sync(get_data_pandas_df_async)
get_data_awkward = make_sync(get_data_awkward_async)
get_data_parquet = make_sync(get_data_parquet_async)