Source code for servicex.data_conversions

import asyncio
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Iterable, Optional

import pandas as pd
import awkward as ak

from .utils import ServiceXException

_conversion_pool = ThreadPoolExecutor(4)


[docs]class DataConverterAdaptor: """Methods to convert from one type of data to the other."""
[docs] def __init__(self, default_file_type: str): """Create a data converter adaptor. By default it will do the conversation as requested. Args: default_file_type (str): The default file type (`parquet` or `root`) """ self._default_file_type = default_file_type
[docs] async def convert_to_pandas(self, file: Path, file_type: Optional[str] = None): """Convert to a pandas DataFrame from data stored in a file of a particular file_type Args: file (Path): Path to the file file_type (str): What the file contains (root, parquet, etc) """ file_type = file_type if file_type is not None else self._default_file_type if file_type == "root-file": return await self._convert_root_to_pandas(file) elif file_type == "parquet": return await self._convert_parquet_to_pandas(file) else: raise ServiceXException( f"Conversion from {file_type} into an pandas DF is not " "yet supported" )
[docs] async def convert_to_awkward(self, file: Path, file_type: Optional[str] = None): """Convert to an awkward data array from data stored in a file of a particular file_type Args: file (Path): Path to the file file_type (str): What the file contains (root, parquet, etc) """ file_type = file_type if file_type is not None else self._default_file_type if file_type == "root-file": return await self._convert_root_to_awkward(file) elif file_type == "parquet": return await self._convert_parquet_to_awkward(file) else: raise ServiceXException( f"Conversion from {file_type} into an awkward array is not " "yet supported" )
[docs] def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame: """Combine many pandas DataFrame into a single one, in order. Args: dfs (Iterable[pd.DataFrame]): The list of DataFrames """ return pd.concat(dfs)
[docs] def combine_awkward(self, awks: Iterable[ak.Array]) -> ak.Array: """Combine many awkward arrays into a single one, in order. Args: awks (Iterable[ChunkedArray]): The input list of awkward arrays """ return ak.concatenate(awks) # type: ignore
[docs] async def _convert_root_to_pandas(self, file: Path): """ Convert the contents of a ROOT file to pandas. Arguments: file A `Path` to the file containing the pandas data Returns: DataFrame A pandas DataFrame Note: - Work is done on a second thread. - Pandas is only imported if this is called. """ from pandas import DataFrame def do_the_work(file: Path) -> DataFrame: import uproot as uproot with uproot.open(file) as f_in: r = f_in[f_in.keys()[0]] return r.arrays(library="pd") # type: ignore return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
[docs] async def _convert_parquet_to_pandas(self, file: Path): """ Convert the contents of a parquet file to pandas. Arguments: file A `Path` to the file containing the pandas data Returns: DataFrame A pandas DataFrame Note: - Work is done on a second thread. - Pandas is only imported if this is called. """ import pandas as pd def do_the_work(file: Path) -> pd.DataFrame: return pd.read_parquet(str(file)) return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
[docs] async def _convert_root_to_awkward(self, file: Path): """ Convert the contents of a ROOT file to an awkward dictionary. Arguments: file A `Path` to the file containing the pandas data Returns: DataFrame A pandas DataFrame Note: - Work is done on a second thread. - Awkward is only imported if this is called. - A LazyArray is returned, so it isn't completely loaded into memory. That also means this will leak file handles - as that has to be left open. """ def do_the_work(file: Path) -> ak.Array: import uproot as uproot with uproot.open(file) as f_in: tree_name = f_in.keys()[0] return uproot.lazy(f"{file}:{tree_name}") return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
[docs] async def _convert_parquet_to_awkward(self, file: Path): """ Convert the contents of a parquet file to an awkward dictionary. Arguments: file A `Path` to the file containing the pandas data Returns: DataFrame A pandas DataFrame Note: - Work is done on a second thread. - Pandas is only imported if this is called. """ def do_the_work(file: Path) -> ak.Array: # TODO: When we move to awkward1, make sure this becomes lazy return ak.from_parquet(str(file)) # type: ignore return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))