Source code for servicex.cache

import hashlib
import json
import logging
import os
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple

from .utils import ServiceXException, _query_cache_hash, sanitize_filename

_ignore_cache = False


[docs]@contextmanager def ignore_cache(): """This will cause all caches to be ignored while it is invoked: ``` with ignore_cache(): ServiceXDataset(...).get_data...() ``` If you want to do this globally, you can just use the `__enter__()` method. This is probably the only way to do this accross cells in a notebook. ``` i = ignore_cache() i.__enter__() ... Query code, jupyter notebook cells, etc. go here i.__exit(None, None, None) ``` Note: - The only time the cache is checked is when the query is actually made, not when the servicex dataset object is created! - Calls to this can be safely nested. - Note that calling this doesn't clear the cache or delete anything. It just prevents the cache lookup from working while it is in effect. """ global _ignore_cache old_value = _ignore_cache _ignore_cache = True yield _ignore_cache = old_value
# Track where we should write out an analysis cache (and search up-stream). # This contains the directory of the file. _g_analysis_cache_location: Optional[Path] = None _g_analysis_cache_filename: str = "servicex_query_cache.json" # List of queries we know to be bad from this run _g_bad_query_cache_ids: Set[str] = set()
[docs]def reset_local_query_cache(): """Used to reset the analysis cache location. Normally called only during testing. """ global _g_analysis_cache_location _g_analysis_cache_location = None global _g_analysis_cache_filename _g_analysis_cache_filename = "servicex_query_cache.json" global _g_bad_query_cache_ids _g_bad_query_cache_ids = set()
reset_local_query_cache()
[docs]def update_local_query_cache(analysis_cache: Optional[Path] = None): """Record analysis query hashes in an analysis cache. If this routine is not called, the current directory is searched for an analysis cache file. If found, it participates in the query lookup. After this routine is called, then when a query is made or looked up, an analysis cache file in the local directory is updated with new query request id's. This will allow one user to send a file to another user, along with the servicex backend in `servicex.yaml` allow them to fetch the same data. (or share on a similar machine). Args: analysis_cache (Optional[Path], optional): The directory or filename of the analysis cache file. If `None` defaults to the file `servicex_query_cache.json` in the local directory. If only a directory is passed, then the `servicex_query_cache.json` in that directory is used. Defaults to None. """ file_path = ( Path(".") if analysis_cache is None else analysis_cache if analysis_cache.is_dir() else analysis_cache.parent ) name = ( "servicex_query_cache.json" if (analysis_cache is None or analysis_cache.is_dir()) else analysis_cache.name ) global _g_analysis_cache_filename global _g_analysis_cache_location if _g_analysis_cache_location is not None and _g_analysis_cache_filename != name: raise ServiceXException( "Updating local query cache called twice, with " f"{_g_analysis_cache_filename} and {name}." ) _g_analysis_cache_filename = name if ( _g_analysis_cache_location is not None and _g_analysis_cache_location != file_path ): raise ServiceXException( "Updating local query cache called twice, with " f"{_g_analysis_cache_location} and {file_path}." ) _g_analysis_cache_location = file_path loc = _g_analysis_cache_location / _g_analysis_cache_filename if not loc.exists(): loc.parent.mkdir(parents=True, exist_ok=True) loc.touch()
[docs]class Cache: """ Caching for all data returns from the system. It provides both in-memory and on-disk cache. TODO: Rename this to be an adaptor, unifying how we name things """ _in_memory_cache = {}
[docs] @classmethod def reset_cache(cls): # # pragma: no cover "Reset the internal cache, usually used for testing" cls._in_memory_cache = {}
[docs] def __init__( self, cache_path: Path, ignore_cache: bool = False, analysis_query_key: str = "default", ): """ Create the cache object Arguments: cache_path The path to the cache directory. Only sub-directories will be created in this path. ignore_cache If true, then always ignore the cache for any queries against this dataset. """ self._path = cache_path self._ignore_cache = ignore_cache self._analysis_query_key = analysis_query_key # old windows versions have a limit of file path of 260 chars. self.max_path_len = 200 if os.name == "nt": self.max_path_len = self.max_path_len - 100
@property def path(self) -> Path: "Return root path of cache directory" return self._path
[docs] @contextmanager def ignore_cache(self): """Ignore the cache as long as we are held. Supports nesting.""" old_ignore = self._ignore_cache self._ignore_cache = True yield self._ignore_cache = old_ignore
[docs] def _query_cache_file(self, json: Dict[str, str]) -> Path: "Return the query cache file" h = _query_cache_hash(json) return self._path / "query_cache" / h
[docs] def _query_status_cache_file(self, request_id: str) -> Path: "Return the query cache file" return self._path / "query_cache_status" / request_id
[docs] def _files_cache_file(self, id: str) -> Path: "Return the file that contains the list of files" return self._path / "file_list_cache" / id
[docs] def lookup_query(self, json: Dict[str, str]) -> Optional[str]: global _ignore_cache if _ignore_cache or self._ignore_cache: return None f = self._query_cache_file(json) if not f.exists(): hash = _query_cache_hash(json) if hash in _g_bad_query_cache_ids: return None return self._lookup_analysis_query_cache(hash) with f.open("r") as i: request_id = i.readline().strip() logging.getLogger(__name__).debug(f"Found query '{request_id}' in cache") self._write_analysis_query_cache(json, request_id) return request_id
[docs] def set_query(self, json: Dict[str, str], v: str): """Associate a query with a request-id. A hash is taken of the query. Args: json (Dict[str, str]): The query JSON v (str): The `request-id` """ f = self._query_cache_file(json) f.parent.mkdir(parents=True, exist_ok=True) with f.open("w") as o: o.write(f"{v}\n") self._write_analysis_query_cache(json, v)
[docs] def remove_query(self, json: Dict[str, Any]): """Remove the query from our local and analysis caches Args: json (Dict[str, Any]): The query to remove """ f = self._query_cache_file(json) if f.exists(): f.unlink() self._remove_from_analysis_cache(_query_cache_hash(json))
[docs] def _load_full_analysis_query_cache(self) -> Optional[Dict[str, Dict[str, str]]]: """Safely load the analysis query cache, with all elements of the cache - If there is no cache file, return None - If the file is empty, return an empty cache - Return the contents of the file. Returns: Optional[Dict[str, str]]: Returns the query cache if it exists """ q_file = self._get_analysis_cache_file() if not q_file.exists(): return None analysis_cache = {} with q_file.open("r") as input: text = input.read() if len(text) > 0: analysis_cache: Dict[str, Dict[str, str]] = json.loads(text) return analysis_cache
[docs] def _load_analysis_query_cache(self) -> Optional[Dict[str, str]]: """Safely load the analysis query cache. - If there is no cache file, return None - If the file is empty, return an empty cache - Return the contents of the file. Returns: Optional[Dict[str, str]]: Returns the query cache if it exists """ full_analysis_cache = self._load_full_analysis_query_cache() if full_analysis_cache is None: return None # Find "our" analysis cache analysis_cache = {} if self._analysis_query_key in full_analysis_cache: analysis_cache = full_analysis_cache[self._analysis_query_key] return analysis_cache
[docs] def _save_analysis_query_cache(self, cache: Dict[str, str]): full_cache = self._load_full_analysis_query_cache() assert ( full_cache is not None ), "Internal error, should have been checked already" full_cache[self._analysis_query_key] = cache cache_file = self._get_analysis_cache_file() with cache_file.open("w") as output: json.dump(full_cache, output)
[docs] def _write_analysis_query_cache(self, query_info: Dict[str, str], request_id: str): """Write out a local analysis query hash-request-id assocaition. Args: query_info (Dict[str, str]): The JSON of the request request_id (str): The `request-id` """ analysis_cache = self._load_analysis_query_cache() if analysis_cache is None: return analysis_cache[_query_cache_hash(query_info)] = request_id self._save_analysis_query_cache(analysis_cache)
[docs] def _remove_from_analysis_cache(self, query_hash: str): """Remove an item from the analysis cache if we are writing to it! Args: query_hash (str): The hash we will remove """ if _g_analysis_cache_location is None: _g_bad_query_cache_ids.add(query_hash) return cache_contents = self._find_analysis_cached_query(query_hash) if cache_contents is not None: del cache_contents[query_hash] self._save_analysis_query_cache(cache_contents)
[docs] def _lookup_analysis_query_cache( self, query_hash: str, filename: Optional[str] = None, location: Optional[Path] = None, ) -> Optional[str]: """Look at all possible query caches for this query. If `location` is `None`, then start from the global location searching for a query file. If `location` is specified, check that directory. In both cases, if the query hash isn't found, then move up one directory and try again. `filename` is the name of the file we should be looking for. If `None` default to the global. Args: query_hash (str): The hash of the query we need to lookup. filename (Optional[str]): The name fo the file that contains the cache. If not specified then defaults to the global. location (Optional[Path]): Directory to start searching in. If not specified then defaults to the global. If that isn't specified, defaults to the current directory. Returns: (Optional[str]): The return hash of what we need to look up """ cache_contents = self._find_analysis_cached_query(query_hash) if cache_contents is not None: return cache_contents[query_hash] return None
[docs] def _get_analysis_cache_file( self, filename: Optional[str] = None, location: Optional[Path] = None ) -> Path: """Get our best guess as to where the analysis cache file will be It will use the globally set defaults if nothing is specified. Args: filename (Optional[str], optional): Cache filename to use. Defaults to None. location (Optional[str], optional): Cache location to use. Defaults to None. """ c_filename = filename if filename is not None else _g_analysis_cache_filename c_location = ( location if location is not None else _g_analysis_cache_location if _g_analysis_cache_location is not None else Path(".") ) # Now see if we can find it loc = c_location.absolute() / c_filename while (not loc.exists()) and (len(loc.parts) >= 3): new_parent = loc.parent.parent loc = new_parent / c_filename return loc
[docs] def _find_analysis_cached_query( self, query_hash: str, filename: Optional[str] = None, location: Optional[Path] = None, ) -> Optional[Dict[str, str]]: """Returns the contents of an analysis cache file and the file that contains a query hash Args: has (str): The hash of the query we are to find Returns: Tuple[Dict[str, str], Path]: The contents of the file and the path to the analysis cache that contains the hash. `None` if the query was not found """ # If the cache is here, then see if it has a hit analysis_cache = self._load_analysis_query_cache() if analysis_cache is not None: if query_hash in analysis_cache: return analysis_cache return None
[docs] def set_query_status(self, query_info: Dict[str, str]): """Cache a query status (json dict) Args: query_info (Dict[str, str]): The info we should cache. Must contain `request_id`. """ assert ( "request_id" in query_info ), "Internal error - request_id should always be part of info returned" f = self._query_status_cache_file(query_info["request_id"]) f.parent.mkdir(parents=True, exist_ok=True) with f.open("w") as o: json.dump(query_info, o)
[docs] def lookup_query_status(self, request_id: str) -> Dict[str, str]: """Returns the info from the last time the query status was cached. Args: request_id (str): Request id we should look up. """ f = self._query_status_cache_file(request_id) if not f.exists(): raise ServiceXException(f"No cache information for query {request_id}") with f.open("r") as o: return json.load(o)
[docs] def query_status_exists(self, request_id: str) -> bool: """Returns true if the query status file exists on the local machine. Args: request_id (str): The request-id to look up Returns: bool: True if present, false otherwise. """ return self._query_status_cache_file(request_id).exists()
[docs] def set_files(self, id: str, files: List[Tuple[str, Path]]): """Cache the files for this request Note: We do check to make sure all the files exist Args: id (str): The request-id files (List[Tuple[str, Path]]): the minio buck name and local file paths """ # Make sure they exist assert all(f.exists() for _, f in files) f = self._files_cache_file(id) f.parent.mkdir(parents=True, exist_ok=True) with f.open("w") as o: json.dump([(n, str(f)) for n, f in files], o)
[docs] def lookup_files(self, id: str) -> Optional[List[Tuple[str, Path]]]: """Return a list of files in the cache for a request id. - Returns None if there is nothing in the cache - Returns None if any of the files are missing Args: id (str): Request-id we are looking up Returns: Optional[List[Tuple[str, Path]]]: List of minio-bucket to local file mappings """ f = self._files_cache_file(id) if not f.exists(): return None with f.open("r") as i: list_of_cached_files = [(n, Path(p)) for n, p in json.load(i)] if all(f.exists() for _, f in list_of_cached_files): return list_of_cached_files return None
[docs] def set_inmem(self, id: str, v: Any): self._in_memory_cache[id] = v
[docs] def lookup_inmem(self, id: str) -> Optional[Any]: global _ignore_cache if _ignore_cache or self._ignore_cache: return None if id not in self._in_memory_cache: return None return self._in_memory_cache[id]
[docs] def data_file_location(self, request_id: str, data_name: str) -> Path: """ Return the path to the file that should be written out for this data_name. This is where the output file should get stored. Truncate the leftmost characters from filenames to avoid throwing a OSError: [Errno 63] File name too long error. Use a hash string to make sure that the file names remain unique. """ parent = self._path / "data" / request_id parent.mkdir(parents=True, exist_ok=True) sanitized = sanitize_filename(data_name) if (len(sanitized) + len(parent.name)) > self.max_path_len: hash = hashlib.md5(sanitized.encode()) hash_string = hash.hexdigest() max_len = self.max_path_len - len(parent.name) - len(hash_string) - 1 sanitized = f"{hash_string}-{sanitized[len(sanitized)-max_len:]}" return parent / sanitized