import os
import logging
import objectpath
from datetime import datetime, timedelta
from indra.statements import Concept, Event, Influence, TimeContext, \
RefContext, WorldContext, Evidence, QualitativeDelta, MovementContext, \
Migration, QuantitativeState
logger = logging.getLogger(__name__)
# List out relation types and their default (implied) polarities.
polarities = {'causation': 1, 'precondition': 1, 'catalyst': 1,
'mitigation': -1, 'prevention': -1,
'temporallyPrecedes': None}
[docs]class HumeJsonLdProcessor(object):
"""This processor extracts INDRA Statements from Hume JSON-LD output.
Parameters
----------
json_dict : dict
A JSON dictionary containing the Hume extractions in JSON-LD format.
Attributes
----------
tree : objectpath.Tree
The objectpath Tree object representing the extractions.
statements : list[indra.statements.Statement]
A list of INDRA Statements that were extracted by the processor.
"""
def __init__(self, json_dict):
self.tree = objectpath.Tree(json_dict)
self.statements = []
self.document_dict = {}
self.concept_dict = {}
self.relation_dict = {}
self.eid_stmt_dict = {}
self.extractions_by_id = {}
self._get_documents()
self.relation_subj_obj_ids = []
self._get_extractions_by_id()
def _get_extractions_by_id(self):
self.extractions_by_id = {
extr['@id']: extr for extr in
self.tree.execute("$.extractions[(@.@type is 'Extraction')]")
if '@id' in extr}
def extract_relations(self):
relations = self._find_relations()
for relation_type, relation in relations:
# Extract concepts and contexts.
subj = self._get_event_and_context(relation, arg_type='source')
obj = self._get_event_and_context(relation, arg_type='destination')
if not subj.concept or not obj.concept:
continue
# Apply the naive polarity from the type of statement. For the
# purpose of the multiplication here, if obj.delta.polarity is
# None to begin with, we assume it is positive
obj_pol = obj.delta.polarity
obj_pol = obj_pol if obj_pol is not None else 1
rel_pol = polarities[relation_type]
obj.delta.polarity = rel_pol * obj_pol if rel_pol else None
evidence = self._get_evidence(relation, get_states(relation))
st = Influence(subj, obj, evidence=evidence)
self.eid_stmt_dict[relation['@id']] = st
self.statements.append(st)
def extract_events(self):
events = self._find_events()
for event in events:
evidence = self._get_evidence(event, get_states(event))
stmt = self._get_event_and_context(event, eid=event['@id'],
evidence=evidence)
self.eid_stmt_dict[event['@id']] = stmt
self.statements.append(stmt)
def _find_events(self):
"""Find standalone events and return them in a list."""
# First populate self.concept_dict and self.relations_subj_obj_ids
if not self.relation_dict or not self.concept_dict or \
not self.relation_subj_obj_ids:
self._find_relations()
# Check if events are part of relations
events = []
for e in self.concept_dict.values():
label_set = set(e.get('labels', []))
if 'Event' in label_set:
if e['@id'] not in self.relation_subj_obj_ids:
events.append(e)
if not events:
logger.debug('No standalone events found.')
else:
logger.debug('%d standalone events found.' % len(events))
return events
def _find_relations(self):
"""Find all relevant relation elements and return them in a list."""
# Get relations from extractions
relations = []
for eid, e in self.extractions_by_id.items():
label_set = set(e.get('labels', []))
# If this is a DirectedRelation
if 'DirectedRelation' in label_set:
self.relation_dict[eid] = e
subtype = e.get('subtype')
if any(t in subtype for t in polarities.keys()):
relations.append((subtype, e))
# Save IDs of relation's subject and object
if e['arguments']:
for a in e['arguments']:
if a['type'] == 'source' or \
a['type'] == 'destination':
self.relation_subj_obj_ids.append(
a['value']['@id'])
# If this is an Event or an Entity
if {'Event', 'Entity'} & label_set:
self.concept_dict[e['@id']] = e
if not relations and not self.relation_dict:
logger.debug("No relations found.")
else:
logger.debug('%d relations of types %s found'
% (len(relations), ', '.join(polarities.keys())))
logger.debug('%d relations in dict.' % len(self.relation_dict))
logger.debug('%d concepts found.' % len(self.concept_dict))
return relations
def _get_documents(self):
"""Populate sentences attribute with a dict keyed by document id."""
documents = self.tree.execute("$.documents")
for doc in documents:
sentences = {s['@id']: s['text'] for s in doc.get('sentences', [])}
self.document_dict[doc['@id']] = {'sentences': sentences,
'location': doc.get('location')}
def _make_world_context(self, entity):
"""Get place and time info from the json for this entity."""
loc_context = None
time_context = None
# Look for time and place contexts.
for argument in entity["arguments"]:
if argument["type"] in {"has_location", "has_origin_location",
"has_destination_location",
"has_intermediate_location"}:
entity_id = argument["value"]["@id"]
loc_entity = self.concept_dict[entity_id]
loc_context = _resolve_geo(loc_entity)
if argument["type"] in {"has_time", "has_start_time",
"has_end_time"}:
entity_id = argument["value"]["@id"]
temporal_entity = self.concept_dict[entity_id]
time_context = _resolve_time(temporal_entity)
# Put context together
context = None
if loc_context or time_context:
context = WorldContext(time=time_context, geo_location=loc_context)
return context
def _make_movement_context(self, entity):
movement_locations = list()
time_context = None
# Use None for quantitative_state if no information found, default
# value will be assigned when creating a Statement
quantitative_state = None
for argument in entity['arguments']:
entity_id = argument["value"]["@id"]
hume_entity = self.concept_dict[entity_id]
if argument['type'] in {"has_actor", "has_affected_actor",
"has_active_actor"}:
for count in hume_entity.get('counts', list()):
quantitative_state = QuantitativeState(
entity="person", value=count['value'],
unit=count['unit'], modifier=count['modifier'])
if argument['type'] == "has_origin_location":
movement_locations.append(
{'location': _resolve_geo(hume_entity), 'role': 'origin'})
if argument['type'] == 'has_destination_location':
movement_locations.append(
{'location': _resolve_geo(hume_entity),
'role': 'destination'})
if argument['type'] in {"has_time", "has_start_time",
"has_end_time"}:
time_context = _resolve_time(hume_entity)
return MovementContext(locations=movement_locations,
time=time_context), quantitative_state
def _make_concept(self, entity):
"""Return Concept from a Hume entity."""
# Use the canonical name as the name of the Concept by default
name = self._sanitize(entity['canonicalName'])
# But if there is a trigger head text, we prefer that since
# it almost always results in a cleaner name
# This is removed for now since the head word seems to be too
# minimal for some concepts, e.g. it gives us only "security"
# for "food security".
"""
trigger = entity.get('trigger')
if trigger is not None:
head_text = trigger.get('head text')
if head_text is not None:
name = head_text
"""
# Save raw text and Hume scored groundings as db_refs
db_refs = self._get_grounding(entity)
concept = Concept(name, db_refs=db_refs)
metadata = {arg['type']: arg['value']['@id']
for arg in entity['arguments']}
return concept, metadata
def _get_bounds(self, ref_dicts):
minb = None
maxb = None
for ref_dict in ref_dicts:
bounds = ref_dict.pop('BOUNDS', None)
if bounds:
minb = min(bounds[0], minb if minb is not None else bounds[0])
maxb = max(bounds[1], maxb if maxb is not None else bounds[1])
return minb, maxb
def _get_event_and_context(self, event, eid=None, arg_type=None,
evidence=None):
"""Return an INDRA Event based on an event entry."""
if not eid:
eid = _choose_id(event, arg_type)
ev = self.concept_dict[eid]
concept, metadata = self._make_concept(ev)
is_migration_event = False
hume_grounding = {x[0] for x in concept.db_refs['WM']}
for grounding_en in hume_grounding:
if "wm/concept/causal_factor/social_and_political/migration" in \
grounding_en:
is_migration_event = True
if is_migration_event:
movement_context, quantitative_state = (
self._make_movement_context(ev))
event_obj = Migration(concept, delta=quantitative_state,
context=movement_context, evidence=evidence)
else:
ev_delta = QualitativeDelta(
polarity=get_polarity(ev), adjectives=None)
context = self._make_world_context(ev)
event_obj = Event(concept, delta=ev_delta, context=context,
evidence=evidence)
return event_obj
def _get_text_and_bounds(self, provenance):
# First try looking up the full sentence through provenance
doc_id = provenance['document']['@id']
sent_id = provenance['sentence']
text = self.document_dict[doc_id]['sentences'][sent_id]
text = self._sanitize(text)
if 'sentenceCharPositions' in provenance:
bounds = [provenance['sentenceCharPositions'][k]
for k in ['start', 'end']]
else:
bounds = []
return text, bounds
def _get_evidence(self, event, adjectives):
"""Return the Evidence object for the INDRA Statement."""
provenance = event.get('provenance')
# First try looking up the full sentence through provenance
text, bounds = self._get_text_and_bounds(provenance[0])
annotations = {
'found_by': event.get('rule'),
'provenance': provenance,
'event_type': os.path.basename(event.get('type')),
'adjectives': adjectives,
'bounds': bounds
}
ev = Evidence(source_api='hume', text=text, annotations=annotations)
return [ev]
def _get_grounding(self, entity):
"""Return Hume grounding."""
db_refs = {'TEXT': entity['text']}
groundings = entity.get('grounding')
if not groundings:
return db_refs
# Get rid of leading slash
groundings = [(x['ontologyConcept'][1:], x['value']) for x in
groundings]
grounding_entries = sorted(list(set(groundings)),
key=lambda x: (x[1], x[0].count('/'), x[0]),
reverse=True)
# We could get an empty list here in which case we don't add the
# grounding
if grounding_entries:
db_refs['WM'] = grounding_entries
return db_refs
@staticmethod
def _sanitize(text):
"""Return sanitized Hume text field for human readability."""
# TODO: any cleanup needed here?
if text is None:
return None
text = text.replace('\n', ' ')
return text
[docs]class HumeJsonLdProcessorCompositional(HumeJsonLdProcessor):
def _get_grounding(self, entity):
"""Return Hume grounding."""
db_refs = {}
txt = entity.get('text')
if txt:
db_refs['TEXT'] = txt
groundings = entity.get('grounding')
if not groundings:
return db_refs
# Get rid of leading slash
groundings = [(x['ontologyConcept'][1:], x['value']) for x in
groundings]
grounding_entries = sorted(list(set(groundings)),
key=lambda x: (x[1], x[0].count('/'), x[0]),
reverse=True)
if 'mentions' in entity:
prov = entity['mentions'][0]['provenance'][0]
else:
prov = entity['provenance'][0]
_, bounds = self._get_text_and_bounds(prov)
db_refs['BOUNDS'] = bounds
# We could get an empty list here in which case we don't add the
# grounding
if grounding_entries:
db_refs['WM'] = grounding_entries
return db_refs
def _get_event_and_context(self, event, eid=None, arg_type=None,
evidence=None):
"""Return an INDRA Event based on an event entry."""
if not eid:
eid = _choose_id(event, arg_type)
ev = self.concept_dict[eid]
concept, metadata = self._make_concept(ev)
property_id = _choose_id(ev, 'has_property')
theme_id = _choose_id(ev, 'has_theme')
property = self.extractions_by_id[property_id] \
if property_id else None
theme = self.extractions_by_id[theme_id] \
if theme_id else None
process_grounding = concept.db_refs
theme_grounding = self._get_grounding(theme) if theme else {}
property_grounding = self._get_grounding(property) if property else {}
minb, maxb = self._get_bounds([theme_grounding, process_grounding,
property_grounding])
event_sentence, _ = self._get_text_and_bounds(event['provenance'][0])
doc_id = event['provenance'][0]['document']['@id']
sent_id = event['provenance'][0]['sentence']
# If we successfully got within-sentence coordinates, we can use the
# entity text from there and overwrite the concept name as well as
# the context grounding TEXT entry
if minb is not None and maxb is not None:
entity_text = \
self.document_dict[doc_id]['sentences'][sent_id][minb:maxb+1]
concept.name = entity_text
concept.db_refs['TEXT'] = entity_text
process_grounding_wm = process_grounding.get('WM')
theme_grounding_wm = theme_grounding.get('WM')
property_grounding_wm = property_grounding.get('WM')
# FIXME: what do we do if there are multiple entries in
# theme/property grounding?
#assert process_grounding_wm is None or len(process_grounding_wm) == 1
assert property_grounding_wm is None or len(property_grounding_wm) == 1
assert theme_grounding_wm is None or len(theme_grounding_wm) == 1
property_grounding_wm = property_grounding_wm[0] \
if property_grounding_wm else None
theme_grounding_wm = theme_grounding_wm[0] \
if theme_grounding_wm else None
process_grounding_wm = process_grounding_wm[0] \
if process_grounding_wm else None
# For some reason the event's grounding is sometimes duplicated as
# property grounding (e.g., price), in this case we treat the grounding
# as a property
if process_grounding_wm and property_grounding_wm and \
process_grounding_wm[0] == property_grounding_wm[0]:
process_grounding_wm = None
# First case: we have a theme so we apply the property and the process
# to it
if theme_grounding:
compositional_grounding = [[theme_grounding_wm,
property_grounding_wm,
process_grounding_wm, None]]
# Second case: we don't have a theme so we take the process as the theme
# and apply any property to it
elif process_grounding_wm:
compositional_grounding = [[process_grounding_wm,
property_grounding_wm,
None, None]]
elif property_grounding_wm:
compositional_grounding = [[property_grounding_wm,
None, None, None]]
assert compositional_grounding[0][0]
concept.db_refs['WM'] = compositional_grounding
# Migrations turned off for now
#for grounding_en in process_grounding:
# if "wm/concept/causal_factor/social_and_political/migration" in \
# grounding_en:
# is_migration_event = True
#if is_migration_event:
# movement_context, quantitative_state = (
# self._make_movement_context(ev))
# event_obj = Migration(concept, delta=quantitative_state,
# context=movement_context, evidence=evidence)
#else:
ev_delta = QualitativeDelta(
polarity=get_polarity(ev))
context = self._make_world_context(ev)
event_obj = Event(concept, delta=ev_delta, context=context,
evidence=evidence)
return event_obj
def _choose_id(event, arg_type):
args = event.get('arguments', [])
obj_tag = [arg for arg in args if arg['type'] == arg_type]
if obj_tag:
obj_id = obj_tag[0]['value']['@id']
else:
obj_id = None
return obj_id
def get_states(event):
ret_list = []
if 'states' in event:
for state_property in event['states']:
if state_property['type'] != 'polarity':
ret_list.append(state_property['text'])
return ret_list
def get_polarity(event):
pol_map = {'Positive': 1, 'Negative': -1}
if 'states' in event:
for state_property in event['states']:
if state_property['type'] == 'polarity':
return pol_map[state_property['text']]
return None
def _resolve_geo(hume_loc_entity):
place = hume_loc_entity.get('canonicalName', hume_loc_entity.get('text'))
geo_id = hume_loc_entity.get('geoname_id', None)
if geo_id is not None:
return RefContext(name=place, db_refs={"GEOID": geo_id})
else:
return RefContext(place)
def _resolve_time(hume_temporal_entity):
if 'mentions' in hume_temporal_entity:
text = hume_temporal_entity['mentions'][0]['text']
else:
text = hume_temporal_entity['text']
if len(hume_temporal_entity.get("timeInterval", [])) < 1:
return TimeContext(text=text)
time = hume_temporal_entity["timeInterval"][0]
start = datetime.strptime(time['start'], '%Y-%m-%dT%H:%M')
end = datetime.strptime(time['end'], '%Y-%m-%dT%H:%M')
end = end + timedelta(minutes=1)
duration = int((end - start).total_seconds())
return TimeContext(text=text, start=start, end=end,
duration=duration)