Development#
Features#
Implemented:
Accepts a
qastle
formatted queryExceptions are used to report back errors of all sorts from the service to the user’s code.
Data is return in the following forms:
pandas.DataFrame
an in process DataFrame of all the data requestedawkward
an in processJaggedArray
or dictionary ofJaggedArray
sA list of root files that can be opened with
uproot
and used as desired.Not all output formats are compatible with all transformations.
Complete returned data must fit in the process’ memory
Run in an async or a non-async environment and non-async methods will accommodate automatically (including
jupyter
notebooks).Support up to 100 simultaneous queries from a laptop-like front end without overwhelming the local machine (hopefully ServiceX will be overwhelmed!)
Start downloading files as soon as they are ready (before ServiceX is done with the complete transform).
It has been tested to run against 100 datasets with multiple simultaneous queries.
It supports local caching of query data
It will provide feedback on progress.
Configuration files supported so that user identification information does not have to be checked into repositories.
Testing#
This code has been tested in several environments:
Windows, Linux, MacOS
Python 3.7, 3.8, 3.9, 3.10
Jupyter Notebooks (not automated), regular python command-line invoked source files
Non-standard backends#
When doing backend development, often ports 9000 and 5000 are forwarded to the local machine exposing the minio
and ServiceX_App
instances. In that case, you’ll need to create a configuration file that has http://localhost:5000
as the end point. No API token is necessary if the development ServiceX
instance doesn’t have authorization turned on.
API#
Everything is based around the ServiceXDataset
object. Below is the documentation for the most common parameters.
ServiceXDataset(dataset: str,
backend_name: Optional[str] = None,
image: str = 'sslhep/servicex_func_adl_xaod_transformer:v0.4',
max_workers: int = 20,
result_destination = 'object-store',
servicex_adaptor: ServiceXAdaptor = None,
minio_adaptor: MinioAdaptor = None,
cache_adaptor: Optional[Cache] = None,
status_callback_factory: Optional[StatusUpdateFactory] = _run_default_wrapper,
local_log: log_adaptor = None,
session_generator: Callable[[], Awaitable[aiohttp.ClientSession]] = None,
config_adaptor: ConfigView = None):
'''
Create and configure a ServiceX object for a dataset.
Arguments
dataset Name of a dataset from which queries will be selected.
backend_name The type of backend. Used only if we need to find an
end-point. If we do not have a `servicex_adaptor` then this
will default to xaod, unless you have any endpoint listed
in your servicex file. It will default to best match there,
in that case.
image Name of transformer image to use to transform the data
max_workers Maximum number of transformers to run simultaneously on
ServiceX.
result_destination Where the transformers should write the results.
Defaults to object-store, but could be used to save
results to a posix volume
servicex_adaptor Object to control communication with the servicex instance
at a particular ip address with certain login credentials.
Default comes from the `config_adaptor`.
minio_adaptor Object to control communication with the minio servicex
instance.
cache_adaptor Runs the caching for data and queries that are sent up and
down.
status_callback_factory Factory to create a status notification callback for each
query. One is created per query.
local_log Log adaptor for logging.
session_generator If you want to control the `ClientSession` object that
is used for callbacks. Otherwise a single one for all
`servicex` queries is used.
config_adaptor Control how configuration options are read from the
configuration file (servicex.yaml, servicex.yml, .servicex).
Notes:
- The `status_callback` argument, by default, uses the `tqdm` library to render
progress bars in a terminal window or a graphic in a Jupyter notebook (with proper
jupyter extensions installed). If `status_callback` is specified as None, no
updates will be rendered. A custom callback function can also be specified which
takes `(total_files, transformed, downloaded, skipped)` as an argument. The
`total_files` parameter may be `None` until the system knows how many files need to
be processed (and some files can even be completed before that is known).
'''
To get the data use one of the get_data
method. They all have the same API, differing only by what they return.
| get_data_awkward_async(self, selection_query: str, title: Optional[str] = None) -> Dict[bytes, Union[awkward.array.jagged.JaggedArray, numpy.ndarray]]
| Fetch query data from ServiceX matching `selection_query` and return it as
| dictionary of awkward arrays, an entry for each column. The data is uniquely
| ordered (the same query will always return the same order). If specified, the optional title is passed to the backend and can be viewed on the status page.
|
| get_data_awkward(self, selection_query: str, title: Optional[str] = None) -> Dict[bytes, Union[awkward.array.jagged.JaggedArray, numpy.ndarray]]
| Fetch query data from ServiceX matching `selection_query` and return it as
| dictionary of awkward arrays, an entry for each column. The data is uniquely
| ordered (the same query will always return the same order). If specified, the optional title is passed to the backend and can be viewed on the status page.
Each data type comes in a pair - an async
version and a synchronous version.
get_data_awkward_async, get_data_awkward
- Returns a dictionary of the requested data asnumpy
orJaggedArray
objects.get_data_rootfiles
,get_data_rootfiles_async
- Returns a list of locally download files (aspathlib.Path
objects) containing the requested data. Suitable for opening withROOT::TFile
oruproot
.get_data_pandas_df
,get_data_pandas_df_async
- Returns the data as apandas
DataFrame
. This will fail if the data you’ve requested has any structure (e.g. is hierarchical, like a single entry for each event, and each event may have some number of jets).get_data_parquet
,get_data_parquet_async
- Returns a list of files locally downloaded that can be read by any parquet tools.
Streaming Results#
The ServiceX
backend generates results file-by-file. The above API will return the list of files when the transform has completed. For large transforms this can take some time: no need to wait until it is completely done before processing the files!
get_data_rootfiles_stream
,get_data_parquet_stream
,get_data_pandas_stream
, andget_data_awkward_stream
return a stream of local file path’s as each result from the backend is downloaded. All take just theqastle
query text as a parameter and return a pythonAsyncIterator
ofStreamInfoData
. Note that files downloaded locally are cached - so when you re-run the same query it will immediately render all theStreamInfoData
objects from the async stream with no waiting.get_data_rootfiles_url_stream
andget_data_parquet_url_stream
return a stream of URL’s that allow direct access in the backend to the data generated as it is finished. All take just theqastle
query text as a parameter, and return a pythonAsyncIterator
ofStreamInfoUrl
. These methods are probably most useful if you are working in the same data center that theServiceX
service is running in.
The StreamInfoURL
contains a bucket
, file
, and a url
property. The url
property can be used to access the requested data without authentication for about 24 hours (depends on the ServiceX
backend’s configuration). Use the file
to understand what part of the starting dataset that data came from. And as this de-facto points to a minio
database currently, the bucket
can be used to find the host bucket name.
The StreamInfoData
contains a file
and a path
property. The file
is as above, and the path
is a pathlib.Path
object that points to the file that has been downloaded into the cache locally.
An example using the async interface that performs the same operation as the initial example above:
from servicex import ServiceXDataset
query = "(call ResultTTree (call Select (call SelectMany (call EventDataset (list 'localds:bogus')) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (/ (call (attr j 'pt')) 1000.0))) (list 'JetPt') 'analysis' 'junk.root')"
dataset = "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00"
ds = ServiceXDataset(dataset)
async for f in ds.get_data_rootfiles_stream(query):
print(f.path)
Notes:
ServiceX
might fail part way through the transformation - so be ready for an exception to bubble out of yourAsyncIterator
!If you are combining different queries whose filtering is identical, make sure to use the
file
property to match results - otherwise you won’t have an event-to-event matching!
Development#
For any changes please feel free to submit pull requests! We are using the gitlab
workflow: the master
branch represents the latests updates that pass all tests working towards the next version of the software. Any PR’s should be based off the most recent version of master
if they are for new features. Each release is frozen on a dedicated release branch, e.g. v2.0.0. If a bug fix needs to be applied to an existing release, submit a PR to master mentioning the affected version(s). After the PR is merged to master, it will be applied to the relevant release branch(es) using git cherry-pick.
To do development please setup your environment with the following steps:
A Python
3.7+
development environmentFork/Pull down this package, XX
python -m pip install -e .[test]
python -m pip install nox
Run
nox --list
to list all session options.
Run the tests with
nox
to make sure everything is good:nox --session tests
.
Then add tests as you develop. When you are done, submit a pull request with any required changes to the documentation and the online tests will run.
To create a release branch#
get checkout 2.0.0
get switch -c v2.0.0
git push