Source code for indra_world.sources.eidos.processor

import logging
import datetime
from indra.statements import Event, QualitativeDelta, WorldContext, \
    TimeContext, RefContext
from indra.sources.eidos.processor import EidosProcessor, EidosDocument


logger = logging.getLogger(__name__)


[docs]class EidosWorldProcessor(EidosProcessor): def __init__(self, json_dict, grounding_ns): super().__init__(json_dict=json_dict) self.doc = EidosWorldDocument(json_dict) self.statements = [] self.grounding_ns = grounding_ns def get_event(self, event): concept = self.get_concept(event) states = event.get('states', []) extracted_states = self.extract_entity_states(states) polarity = extracted_states.get('polarity') adjectives = extracted_states.get('adjectives') delta = QualitativeDelta(polarity=polarity, adjectives=adjectives) timex = extracted_states.get('time_context', None) geo = extracted_states.get('geo_context', None) context = WorldContext(time=timex, geo_location=geo) \ if timex or geo else None stmt = Event(concept, delta=delta, context=context) return stmt
[docs] def get_groundings(self, entity): """Return groundings as db_refs for an entity.""" def get_grounding_entries(grounding): if not grounding: return None entries = [] values = grounding.get('values', []) # Values could still have been a None entry here if values: for entry in values: ont_concept = entry.get('ontologyConcept') value = entry.get('value') if ont_concept is None or value is None: continue entries.append((ont_concept, value)) return entries # Save raw text and Eidos scored groundings as db_refs db_refs = {'TEXT': entity['text']} groundings = entity.get('groundings') if not groundings: return db_refs for g in groundings: entries = get_grounding_entries(g) # Only add these groundings if there are actual values listed if entries: key = g['name'].upper() if self.grounding_ns is not None and \ key not in self.grounding_ns: continue if key == 'UN': db_refs[key] = [(s[0].replace(' ', '_'), s[1]) for s in entries] elif key == 'WM_FLATTENED' or key == 'WM': db_refs['WM'] = [(s[0].strip('/'), s[1]) for s in entries] else: db_refs[key] = entries return db_refs
[docs] def time_context_from_ref(self, timex): """Return a time context object given a timex reference entry.""" # If the timex has a value set, it means that it refers to a DCT or # a TimeExpression e.g. "value": {"@id": "_:DCT_1"} and the parameters # need to be taken from there value = timex.get('value') if value: # Here we get the TimeContext directly from the stashed DCT # dictionary tc = self.doc.timexes.get(value['@id']) return tc return None
[docs] def geo_context_from_ref(self, ref): """Return a ref context object given a location reference entry.""" value = ref.get('value') if value: # Here we get the RefContext from the stashed geoloc dictionary rc = self.doc.geolocs.get(value['@id']) return rc return None
def extract_entity_states(self, states): states_processed = super().extract_entity_states(states) states_processed.update(self.extract_entity_time_loc_states(states)) return states_processed def extract_entity_time_loc_states(self, states): if states is None: return {'time_context': None, 'geo_context': None} time_context = None geo_context = None for state in states: if state['type'] == 'TIMEX': time_context = self.time_context_from_ref(state) elif state['type'] == 'LocationExp': # TODO: here we take only the first geo_context occurrence. # Eidos sometimes provides a list of locations, it may # make sense to break those up into multiple statements # each with one location if not geo_context: geo_context = self.geo_context_from_ref(state) return {'time_context': time_context, 'geo_context': geo_context}
class EidosWorldDocument(EidosDocument): def __init__(self, json_dict): self.timexes = {} self.geolocs = {} super().__init__(json_dict) def _preprocess_extractions(self): super()._preprocess_extractions() # Build a dictionary of sentences and document creation times (DCTs) documents = self.tree.execute("$.documents[(@.@type is 'Document')]") for document in documents: dct = document.get('dct') # We stash the DCT here as a TimeContext object if dct is not None: self.dct = self.time_context_from_dct(dct) self.timexes[dct['@id']] = self.dct sentences = document.get('sentences', []) for sent in sentences: timexes = sent.get('timexes') if timexes: for timex in timexes: tc = time_context_from_timex(timex) self.timexes[timex['@id']] = tc geolocs = sent.get('geolocs') if geolocs: for geoloc in geolocs: rc = ref_context_from_geoloc(geoloc) self.geolocs[geoloc['@id']] = rc @staticmethod def time_context_from_dct(dct): """Return a time context object given a DCT entry.""" time_text = dct.get('text') start = _get_time_stamp(dct.get('start')) end = _get_time_stamp(dct.get('end')) duration = _get_duration(start, end) tc = TimeContext(text=time_text, start=start, end=end, duration=duration) return tc
[docs]class EidosProcessorCompositional(EidosWorldProcessor):
[docs] def get_groundings(self, entity): """Return groundings as db_refs for an entity.""" def get_grounding_entries_comp(grounding): if not grounding: return None entry_types = ['theme', 'themeProperties', 'themeProcess', 'themeProcessProperties'] entries = [] values = grounding.get('values', []) # Values could still have been a None entry here if values: for entry in values: compositional_entry = [None, None, None, None] for idx, entry_type in enumerate(entry_types): val = entry.get(entry_type) if val is None: continue # FIXME: can there be multiple entries here? val = val[0] ont_concept = val.get('ontologyConcept') score = val.get('value') if ont_concept is None or score is None: continue if ont_concept.endswith('/'): ont_concept = ont_concept[:-1] compositional_entry[idx] = \ (ont_concept, score) # Some special cases # Promote process into theme if compositional_entry[2] and not compositional_entry[0]: compositional_entry[0] = compositional_entry[2] compositional_entry[2] = None if compositional_entry[3]: compositional_entry[1] = compositional_entry[3] compositional_entry[3] = None # Promote dangling property if compositional_entry[1] and not compositional_entry[0]: compositional_entry[0] = compositional_entry[1] compositional_entry[1] = None # Promote theme process property into theme property if compositional_entry[3] and compositional_entry[0] and \ not compositional_entry[2] \ and not compositional_entry[1]: compositional_entry[1] = compositional_entry[3] compositional_entry[3] = None # If there is only a theme process property and nothing # else, we promote it to be the theme if compositional_entry[3] and not any( compositional_entry[:-1]): compositional_entry[0] = compositional_entry[3] compositional_entry[3] = None # Remove dangling theme process property if theme # and property are available but there is no process if compositional_entry[0] and compositional_entry[1] and \ not compositional_entry[2] \ and compositional_entry[3]: compositional_entry[3] = None if any(compositional_entry): entries.append(compositional_entry) return entries # Save raw text and Eidos scored groundings as db_refs db_refs = {'TEXT': entity['text']} groundings = entity.get('groundings') if not groundings: return db_refs for g in groundings: key = g['name'].upper() if key == 'WM_COMPOSITIONAL': entries = get_grounding_entries_comp(g) if entries: db_refs['WM'] = entries else: continue return db_refs
def _get_time_stamp(entry): """Return datetime object from a timex constraint start/end entry. Example string format to convert: 2018-01-01T00:00 """ if not entry or entry == 'Undef': return None try: dt = datetime.datetime.strptime(entry, '%Y-%m-%dT%H:%M') except Exception as e: logger.debug('Could not parse %s format' % entry) return None return dt def _get_duration(start, end): if not start or not end: return None try: duration = int((end - start).total_seconds()) except Exception as e: logger.debug('Failed to get duration from %s and %s' % (str(start), str(end))) duration = None return duration
[docs]def ref_context_from_geoloc(geoloc): """Return a RefContext object given a geoloc entry.""" text = geoloc.get('text') geoid = geoloc.get('geoID') rc = RefContext(name=text, db_refs={'GEOID': geoid}) return rc
[docs]def time_context_from_timex(timex): """Return a TimeContext object given a timex entry.""" time_text = timex.get('text') intervals = timex.get('intervals') if not intervals: start = end = duration = None else: constraint = intervals[0] start = _get_time_stamp(constraint.get('start')) end = _get_time_stamp(constraint.get('end')) duration = _get_duration(start, end) tc = TimeContext(text=time_text, start=start, end=end, duration=duration) return tc