Source code for indra_world.sources.dart.client

"""A client for accessing reader output from the DART system."""
__all__ = ['DartClient', 'prioritize_records']
import os
import tqdm
import json
import glob
import logging
import requests
import itertools
from typing import Optional
from datetime import datetime
from collections import defaultdict
from indra.config import get_config
from .api import get_unique_records


logger = logging.getLogger(__name__)

default_dart_url = ('https://wm-ingest-pipeline-rest-1.prod.dart.'
                    'worldmodelers.com/dart/api/v1')


[docs]class DartClient: """A client for the DART web service with optional local storage. Parameters ---------- storage_mode : Optional[str] If `web`, the configured DART URL and credentials are used to communicate with the DART web service. If `local`, a local storage is used to access and store reader outputs. dart_url : Optional[str] The DART service URL. If given, it overrides the DART_WM_URL configuration value. dart_uname : Optional[str] The DART service user name. If given, it overrides the DART_WM_USERNAME configuration value. dart_pwd : Optional[str] The DART service password. If given, it overrides the DART_WM_PASSWORD configuration value. local_storage : Optional[str] A path that points to a folder for local storage. If the storage_mode is `web`, this local_storage is used as a local cache. If the storage_mode is `local`, it is used as the primary location to access reader outputs. If given, it overrides the INDRA_WM_CACHE configuration value. """ def __init__(self, storage_mode='web', dart_url=None, dart_uname=None, dart_pwd=None, local_storage=None): self.storage_mode = storage_mode # We set the local storage in either mode, since even in web mode # it is used as a cache self.local_storage = local_storage if local_storage else \ get_config('INDRA_WM_CACHE') # In web mode, we try to get a URL, a username and a password. In order # of priority, we first take arguments provided directly, otherwise # we take configuration values. if self.storage_mode == 'web': if dart_url: self.dart_url = dart_url else: dart_config_url = get_config('DART_WM_URL') self.dart_url = dart_config_url if dart_config_url else \ default_dart_url # for backwards compatibility, make sure we are using the base # URL here. if self.dart_url and self.dart_url.endswith('/readers'): self.dart_url = self.dart_url[:-8] if dart_uname: self.dart_uname = dart_uname else: self.dart_uname = get_config('DART_WM_USERNAME') if dart_pwd: self.dart_pwd = dart_pwd else: self.dart_pwd = get_config('DART_WM_PASSWORD') if not self.dart_uname or not self.dart_pwd: logger.warning('DART is used in web mode but username or ' 'password were not provided or set in the ' 'DART_WM_USERNAME and DART_WM_PASSWORD ' 'configurations.') # In local mode, we need to have a local storage set else: self.dart_url = None if not self.local_storage: raise ValueError('DART client initialized in local mode ' 'without a local storage path.') # If the local storage doesn't exist, we try create the folder if self.local_storage and (not os.path.exists(self.local_storage)): logger.info('The local storage path %s for the DART client ' 'doesn\'t exist and will now be created' % self.local_storage) try: os.makedirs(self.local_storage) except Exception as e: logger.error('Could not create DART client local storage: %s' % e) logger.info('Running DART client in %s mode with local storage at %s' % (self.storage_mode, self.local_storage))
[docs] def get_outputs_from_records(self, records): """Return reader outputs corresponding to a list of records. Parameters ---------- records : list of dict A list of records returned from the reader output query. Returns ------- dict(str, dict) A two-level dict of reader output keyed by reader and then document id. """ # Loop document keys and get documents reader_outputs = defaultdict(dict) for record in tqdm.tqdm(records): reader_outputs[record['identity']][record['document_id']] = \ self.get_output_from_record(record) reader_outputs = dict(reader_outputs) return reader_outputs
[docs] def get_output_from_record(self, record): """Return reader output corresponding to a single record. Parameters ---------- record : dict A single DART record. Returns ------- str The reader output corresponding to the given record. """ storage_key = record['storage_key'] fname = self.get_local_storage_path(record) output = None if fname and os.path.exists(fname): with open(fname, 'r') as fh: output = fh.read() elif self.storage_mode == 'web': try: output = self.download_output(storage_key) except Exception as e: logger.warning('Error downloading %s: %s' % (storage_key, e)) return None try: if self.local_storage: with open(fname, 'w') as fh: fh.write(output) except Exception as e: logger.warning('Error storing %s: %s' % (storage_key, e)) else: logger.info('Record with storage key %s doesn\'t exist ' 'in local storage.' % storage_key) return output
[docs] def cache_record(self, record, overwrite=False): """Download and cache a given record in local storage. Parameters ---------- record : dict A DART record. """ fname = self.get_local_storage_path(record) if overwrite or not os.path.exists(fname): output = self.download_output(record['storage_key']) with open(fname, 'w') as fh: fh.write(output)
[docs] def cache_records(self, records, overwrite=False): """Download and cache a list of records in local storage. Parameters ---------- records : list[dict] A list of DART records. """ for record in tqdm.tqdm(records): self.cache_record(record, overwrite=overwrite)
[docs] def download_output(self, storage_key): """Return content from the DART web service based on its storage key. Parameters ---------- storage_key : str A DART storage key. Returns ------- str The content corresponding to the storage key. """ url = self.dart_url + '/readers/download/%s' % storage_key res = requests.get(url=url, auth=(self.dart_uname, self.dart_pwd)) res.raise_for_status() return res.text
[docs] def get_local_storage_path(self, record): """Return the local storage path for a DART record.""" if not self.local_storage: return None folder = os.path.join(self.local_storage, record['identity'], record['version']) if not os.path.exists(folder): os.makedirs(folder) fname = os.path.join(folder, record['document_id']) return fname
[docs] def get_reader_output_records(self, readers=None, versions=None, document_ids=None, timestamp=None, tenant=None, ontology_id=None, unique=False): """Return reader output metadata records by querying the DART API Query json structure: {"readers": ["MyAwesomeTool", "SomeOtherAwesomeTool"], "versions": ["3.1.4", "1.3.3.7"], "document_ids": ["qwerty1234", "poiuyt0987"], "timestamp": {"before": "yyyy-mm-ddThh:mm:ss", "after": "yyyy-mm-ddThh:mm:ss"}} Parameters ---------- readers : list A list of reader names versions : list A list of versions to match with the reader name(s) document_ids : list A list of document identifiers timestamp : dict("before"|"after",str) The timestamp string must be formatted "yyyy-mm-ddThh:mm:ss". tenant : Optional[str] Return only records for the given tenant. ontology_id : Optional[str] Return only records for the given ontology ID. unique : Optional[bool] If true, records that are duplicates are collapsed. Default: False. Returns ------- dict The JSON payload of the response from the DART API """ if self.storage_mode == 'web': query_data = _jsonify_query_data(readers, versions, document_ids, timestamp) if not query_data: return {} full_query_data = {'metadata': query_data} url = self.dart_url + '/readers/query' res = requests.post(url, data=full_query_data, auth=(self.dart_uname, self.dart_pwd)) res.raise_for_status() rj = res.json() # This handles both empty list and dict if not rj or 'records' not in rj: records = [] else: records = rj['records'] else: records = [] if readers: if versions: for reader, version in itertools.product(readers, versions): path = os.path.join(self.local_storage, reader, version, '*') for file in glob.glob(path): doc_id = os.path.basename(file) record = { 'identity': reader, 'version': version, 'doc_id': doc_id, 'storage_key': file } records.append(record) else: for reader in readers: path = os.path.join(self.local_storage, reader, '*') version_paths = glob.glob(path) for version_path in version_paths: version = os.path.basename(version_path) path = glob.glob(version_path, '*') for file in glob.glob(path): doc_id = os.path.basename(file) record = { 'identity': reader, 'version': version, 'doc_id': doc_id, 'storage_key': file } records.append(record) if document_ids: records = [r for r in records if r['doc_id'] in document_ids] else: raise ValueError('Must provide readers for searching in local ' 'mode.') if ontology_id: records = [r for r in records if r['output_version'] == ontology_id] if tenant: records = [r for r in records if r['tenants'] and tenant in r['tenants']] if unique: records = get_unique_records(records) return records
[docs] def get_reader_versions(self, reader): """Return the available versions for a given reader.""" records = self.get_reader_output_records([reader]) return {record['version'] for record in records}
[docs] def get_ontology(self, ontology_id: str): """Return the DART ontology record JSON for the given ontology ID.""" url = self.dart_url + '/ontologies' res = requests.get(url, params={'id': ontology_id}, auth=(self.dart_uname, self.dart_pwd)) return res.json()
[docs] def get_tenant_ontology(self, tenant_id: str, version: Optional[str] = None): """Return the DART ontology record JSON for the given tenant ID and optional version.""" params = {'tenant': tenant_id} if version: params['version'] = version url = self.dart_url + '/ontologies' res = requests.get(url, params=params, auth=(self.dart_uname, self.dart_pwd)) return res.json()
[docs] def get_ontology_graph(self, ontology_id: str): """Return the ontology graph for the given ontology ID.""" ont_json = self.get_ontology(ontology_id) return self._get_ontology_graph_from_json(ont_json)
[docs] def get_tenant_ontology_graph(self, tenant_id: str, version: Optional[str] = None): """Return the ontology graph for the given tenant ID and optional version.""" ont_json = self.get_tenant_ontology(tenant_id, version=version) return self._get_ontology_graph_from_json(ont_json)
@staticmethod def _get_ontology_graph_from_json(ont_json): import yaml from indra_world.ontology import WorldOntology ontology = \ WorldOntology(url=None, yml=yaml.load(ont_json['ontology'], Loader=yaml.FullLoader)) ontology.initialize() return ontology
[docs]def prioritize_records(records, priorities=None): """Return unique records per reader and document prioritizing by version. Parameters ---------- records : list of dict A list of records returned from the reader output query. priorities : dict of list A dict keyed by reader names (e.g., cwms, eidos) with values representing reader versions in decreasing order of priority. Returns ------- records : list of dict A list of records that are unique per reader and document, picked by version priority when multiple records exist for the same reader and document. """ priorities = {} if not priorities else priorities prioritized_records = [] key = lambda x: (x['identity'], x['document_id']) for (reader, doc_id), group in itertools.groupby(sorted(records, key=key), key=key): group_records = list(group) if len(group_records) == 1: prioritized_records.append(group_records[0]) else: reader_prio = priorities.get(reader) if reader_prio: first_rec = sorted( group_records, key=lambda x: reader_prio.index(x['version']))[0] prioritized_records.append(first_rec) else: logger.warning('Could not prioritize between records: %s' % str(group_records)) prioritized_records.append(group_records[0]) return prioritized_records
def _check_lists(lst): if not isinstance(lst, (list, tuple)): return False elif any(not isinstance(s, str) for s in lst): logger.warning('At least one object in list is not a string') return False return True def _check_timestamp_dict(ts_dict): """Check the timestamp dict Parameters ---------- ts_dict : dict Timestamp should be of format "yyyy-mm-ddThh:mm:ss". Returns ------- dict """ def _is_valid_ts(tstr): """ %Y - Year as Zero padded decimal %m - month as zero padded number %d - day as zero padded number %H - 24h hour as zero padded number %M - minute as zero padded number %S - second as zero padded number """ ts_long_fmt = '%Y-%m-%dT%H:%M:%S' try: dt = datetime.strptime(tstr, ts_long_fmt) except ValueError as err: raise ValueError( f'Timestamp "{tstr}" is not in a valid format. ' f'Format must be "%Y-%m-%dT%H:%M:%S"') from err try: if dt < datetime(1900, 1, 1): logger.warning('Timestamp is before 1900-JAN-01, ignoring') return False except (ValueError, OverflowError): logger.warning('Could not parse timestamp, ignoring') return False return True ek = {'before', 'after'} if sum(k in ek for k in ts_dict) > 0: ts = {k: v for k, v in ts_dict.items() if k in ek and _is_valid_ts(v)} else: raise ValueError(f'None of the allowed keys ' f'{", ".join(list(ek))} were provided') return ts def _jsonify_query_data(readers=None, versions=None, document_ids=None, timestamp=None): """Check and json.dumps the metadata dictionary Parameters ---------- readers : list The list of reading systems. versions : list Versions of reading systems. document_ids : list Document IDs. timestamp : dict("before"|"after",str) Reader output time stamp constraint. Returns ------- str The json.dumps representation of the query metadata """ if all(v is None for v in [readers, versions, document_ids, timestamp]): logger.warning('No query parameters were filled out') return '' pd = {} if readers and _check_lists(readers): pd['readers'] = readers if versions and _check_lists(versions): pd['versions'] = versions if document_ids and _check_lists(document_ids): pd['document_ids'] = document_ids if isinstance(timestamp, dict): pd['timestamp'] = _check_timestamp_dict(timestamp) elif timestamp is not None: raise ValueError('Argument "timestamp" must be of type dict') return json.dumps(pd)