Source code for indra_world.assembly.operations

__all__ = ['get_expanded_events_influences', 'remove_namespaces',
           'remove_raw_grounding', 'check_event_context', 'reground_stmts',
           'remove_hume_redundant', 'fix_wm_ontology', 'filter_context_date',
           'filter_groundings', 'deduplicate_groundings',
           'compositional_grounding_filter_stmt',
           'compositional_grounding_filter', 'standardize_names_compositional',
           'add_flattened_grounding_compositional', 'validate_grounding_format',
           'make_display_name', 'make_display_name_linear',
           'set_positive_polarities',
           'filter_out_long_words', 'concept_matches_compositional',
           'matches_compositional', 'location_matches_compositional',
           'event_compositional_refinement', 'compositional_refinement',
           'location_refinement_compositional',
           'make_compositional_refinement_filter',
           'make_default_compositional_refinement_filter',
           'CompositionalRefinementFilter', 'get_relevants_for_stmt',
           'listify', 'merge_deltas']
import os
import yaml
import copy
import logging
import itertools
import statistics
from datetime import datetime
from collections import defaultdict
import indra.tools.assemble_corpus as ac
from indra.pipeline import register_pipeline
from indra.statements import Influence, Association, Event
from indra.statements.concept import get_sorted_compositional_groundings
from indra_world.sources.eidos.client import reground_texts

from .matches import *
from .refinement import *

logger = logging.getLogger(__name__)


register_pipeline(datetime)


[docs]@register_pipeline def get_expanded_events_influences(stmts): """Return a list of all standalone events from a list of statements.""" events_influences = [] for stmt_orig in stmts: stmt = copy.deepcopy(stmt_orig) if isinstance(stmt, Influence): for member in [stmt.subj, stmt.obj]: member.evidence = stmt.evidence[:] # Remove the context since it may be for the other member for ev in member.evidence: ev.context = None events_influences.append(member) # We add the Influence too events_influences.append(stmt_orig) elif isinstance(stmt, Association): for member in stmt.members: member.evidence = stmt.evidence[:] # Remove the context since it may be for the other member for ev in member.evidence: ev.context = None events_influences.append(member) elif isinstance(stmt, Event): events_influences.append(stmt) return events_influences
[docs]@register_pipeline def remove_namespaces(stmts, namespaces): """Remove unnecessary namespaces from Concept grounding.""" logger.info('Removing unnecessary namespaces') for stmt in stmts: for agent in stmt.agent_list(): for namespace in namespaces: if namespace in copy.deepcopy(agent.db_refs): agent.db_refs.pop(namespace, None) logger.info('Finished removing unnecessary namespaces') return stmts
[docs]@register_pipeline def remove_raw_grounding(stmts): """Remove the raw_grounding annotation to decrease output size.""" for stmt in stmts: for ev in stmt.evidence: if not ev.annotations: continue agents = ev.annotations.get('agents') if not agents: continue if 'raw_grounding' in agents: agents.pop('raw_grounding', None) return stmts
@register_pipeline def check_event_context(events): for event in events: if not event.context and event.evidence[0].context: assert False, ('Event context issue', event, event.evidence) ej = event.to_json() if 'context' not in ej and 'context' in ej['evidence'][0]: assert False, ('Event context issue', event, event.evidence) @register_pipeline def reground_stmts(stmts, ont_manager, namespace, eidos_service=None, overwrite=True, sources=None): ont_manager.initialize() if sources is None: sources = {'sofia', 'cwms'} if eidos_service is None: eidos_service = 'http://localhost:9000' logger.info(f'Regrounding {len(stmts)} statements') # Send the latest ontology and list of concept texts to Eidos yaml_str = yaml.dump(ont_manager.yml) concepts = [] for stmt in stmts: # Skip statements from sources that shouldn't be regrounded if not any(ev.source_api in sources for ev in stmt.evidence): continue for concept in stmt.agent_list(): concept_txt = concept.db_refs.get('TEXT') concepts.append(concept_txt) logger.info(f'Finding grounding for {len(concepts)} texts') groundings = reground_texts(concepts, yaml_str, webservice=eidos_service) # Update the corpus with new groundings idx = 0 logger.info(f'Setting new grounding for {len(stmts)} statements') for stmt in stmts: # Skip statements from sources that shouldn't be regrounded if not any(ev.source_api in sources for ev in stmt.evidence): continue for concept in stmt.agent_list(): if overwrite: if groundings[idx]: concept.db_refs[namespace] = groundings[idx] elif namespace in concept.db_refs: concept.db_refs.pop(namespace, None) else: if (namespace not in concept.db_refs) and groundings[idx]: concept.db_refs[namespace] = groundings[idx] idx += 1 logger.info(f'Finished setting new grounding for {len(stmts)} statements') return stmts @register_pipeline def remove_hume_redundant(stmts, matches_fun): logger.info(f'Removing Hume redundancies on {len(stmts)} statements.') raw_stmt_groups = defaultdict(list) for stmt in stmts: sh = stmt.get_hash(matches_fun=matches_fun, refresh=True) eh = (stmt.evidence[0].pmid, stmt.evidence[0].text, stmt.subj.concept.name, stmt.obj.concept.name, stmt.evidence[0].annotations['adjectives']) key = str((sh, eh)) raw_stmt_groups[key].append(stmt) new_stmts = list({group[0] for group in raw_stmt_groups.values()}) logger.info(f'{len(new_stmts)} statements after filter.') return new_stmts @register_pipeline def fix_wm_ontology(stmts): for stmt in stmts: for concept in stmt.agent_list(): if 'WM' in concept.db_refs: concept.db_refs['WM'] = [(entry.replace(' ', '_'), score) for entry, score in concept.db_refs['WM']] @register_pipeline def filter_context_date(stmts, from_date=None, to_date=None): logger.info(f'Filtering dates on {len(stmts)} statements') if not from_date and not to_date: return stmts new_stmts = [] for stmt in stmts: doc_id = stmt.evidence[0].text_refs.get('DART') if isinstance(stmt, Influence): events = [stmt.subj, stmt.obj] elif isinstance(stmt, Association): events = stmt.members else: events = [stmt] for event in events: if event.context and event.context.time: if from_date and event.context.time.start and \ (event.context.time.start < from_date): logger.info(f'Removing date {event.context.time.start}' f'({event.context.time.text}) from {doc_id}') event.context.time = None if to_date and event.context.time.end and \ (event.context.time.end > to_date): event.context.time = None logger.info(f'Removing date {event.context.time.end}' f'({event.context.time.text}) from {doc_id}') new_stmts.append(stmt) logger.info(f'{len(new_stmts)} statements after date filter') return new_stmts @register_pipeline def filter_groundings(stmts): excl_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, 'resources', 'groundings_to_exclude.txt') with open(excl_file, 'r') as f: groundings_to_exclude = [l.strip() for l in f.readlines()] stmts = ac.filter_by_db_refs( stmts, 'WM', groundings_to_exclude, 'all', invert=True) return stmts def deduplicate_groundings(groundings): groundings = get_sorted_compositional_groundings(groundings) # TODO: sometimes we have duplication which is not exact, rather, # the same grounding (after filtering) is present but with a different # score. We could eliminate these by retaining only the one with the # highest ranking. return list(gr for gr, _ in itertools.groupby(groundings)) def compositional_grounding_filter_stmt(stmt, score_threshold, groundings_to_exclude, remove_self_loops=False): stmt = copy.deepcopy(stmt) for concept in stmt.agent_list(): if concept is not None and 'WM' in concept.db_refs: wm_groundings = copy.copy(concept.db_refs['WM']) new_groundings = [] for idx, gr in enumerate(wm_groundings): for jdx, entry in enumerate(gr): if entry is not None: if (entry[0] in groundings_to_exclude or entry[1] < score_threshold): if isinstance(wm_groundings[idx], tuple): wm_groundings[idx] = \ list(wm_groundings[idx]) wm_groundings[idx][jdx] = None # Promote dangling property if wm_groundings[idx][0] is None and \ wm_groundings[idx][1] is not None: wm_groundings[idx][0] = wm_groundings[idx][1] wm_groundings[idx][1] = None # Promote process if wm_groundings[idx][0] is None and \ wm_groundings[idx][2] is not None: wm_groundings[idx][0] = wm_groundings[idx][2] wm_groundings[idx][2] = None if wm_groundings[idx][3] is not None: wm_groundings[idx][1] = wm_groundings[idx][3] wm_groundings[idx][3] = None # Remove dangling process property if wm_groundings[idx][3] is not None and \ wm_groundings[idx][2] is None: wm_groundings[idx][3] = None # If we have a theme and want to remove self loops # i.e., where both the theme and the process/property # are the same, we remove the process/property if remove_self_loops and wm_groundings[idx][0]: # Theme and property are the same: remove property if wm_groundings[idx][1] and \ (wm_groundings[idx][0][0] == wm_groundings[idx][1][0]): wm_groundings[idx][1] = None # Theme and process are the same: remove process if wm_groundings[idx][2] and \ (wm_groundings[idx][0][0] == wm_groundings[idx][2][0]): wm_groundings[idx][2] = None wm_groundings[idx][3] = None if not all(entry is None for entry in wm_groundings[idx]): new_groundings.append(wm_groundings[idx]) new_groundings = deduplicate_groundings(new_groundings) concept.db_refs['WM'] = new_groundings # Get rid of all None tuples concept.db_refs['WM'] = [ gr for gr in concept.db_refs['WM'] if not all(g is None for g in gr) ] # Pop out the WM key if there is no grounding at all if not concept.db_refs['WM']: return None else: return None validate_grounding_format([stmt]) return stmt @register_pipeline def compositional_grounding_filter(stmts, score_threshold, groundings_to_exclude=None, remove_self_loops=False): groundings_to_exclude = groundings_to_exclude \ if groundings_to_exclude else [] stmts_out = [] for stmt in stmts: stmt_out = compositional_grounding_filter_stmt(stmt, score_threshold, groundings_to_exclude, remove_self_loops=remove_self_loops) if stmt_out: stmts_out.append(stmt_out) return stmts_out @register_pipeline def standardize_names_compositional(stmts): for stmt in stmts: for concept in stmt.agent_list(): comp_grounding = concept.db_refs['WM'][0] disp_name = make_display_name(comp_grounding) concept.name = disp_name return stmts @register_pipeline def add_flattened_grounding_compositional(stmts): for stmt in stmts: for concept in stmt.agent_list(): wm_flat = [] for comp_grounding in concept.db_refs['WM']: theme_grounding = comp_grounding[0][0] other_groundings = [entry[0].split('/')[-1] for entry in comp_grounding[1:] if entry] flat_grounding = '_'.join([theme_grounding] + other_groundings) standard_name = make_display_name(comp_grounding) score = statistics.mean([entry[1] for entry in comp_grounding if entry is not None]) wm_flat.append( { 'grounding': flat_grounding, 'name': standard_name, 'score': score } ) concept.db_refs['WM_FLAT'] = wm_flat return stmts @register_pipeline def validate_grounding_format(stmts): for idx, stmt in enumerate(stmts): for concept in stmt.agent_list(): if 'WM' not in concept.db_refs: continue wms = concept.db_refs['WM'] assert isinstance(wms, list) wm = wms[0] assert len(wm) == 4 assert wm[0] is not None if wm[2] is None: assert wm[3] is None, (idx, stmt, stmt.evidence[0].source_api, stmt.evidence[0].annotations, wm) return stmts
[docs]def make_display_name(comp_grounding): """Return display name from a compositional grounding with 'of' linkers.""" entries = tuple(entry[0].split('/')[-1].replace('_', ' ') if entry else None for entry in comp_grounding) entries_reversed = [entry for entry in entries[::-1] if entry is not None] return ' of '.join(entries_reversed)
[docs]def make_display_name_linear(comp_grounding): """Return display name from compositional grounding with linear joining.""" entries = tuple(entry[0].split('/')[-1].replace('_', ' ') if entry else None for entry in comp_grounding) entries = [entry for entry in entries if entry is not None] return ' '.join(entries)
@register_pipeline def set_positive_polarities(stmts): for stmt in stmts: if isinstance(stmt, Influence): for event in [stmt.subj, stmt.obj]: if event.delta.polarity is None: event.delta.polarity = 1 return stmts @register_pipeline def filter_out_long_words(stmts, k=10): logger.info(f'Filtering to concepts with max {k} words on {len(stmts)}' f' statements.') def get_text(ag): return ag.db_refs['TEXT'] def text_too_long(txt, k): if len(txt.split()) > k: return True return False new_stmts = [] for stmt in stmts: if any(text_too_long(get_text(c), k) for c in stmt.agent_list()): continue new_stmts.append(stmt) logger.info(f'{len(new_stmts)} statements after filter.') return new_stmts @register_pipeline def sort_compositional_groundings(statements): for stmt in statements: for concept in stmt.agent_list(): if 'WM' in concept.db_refs: concept.db_refs['WM'] = \ get_sorted_compositional_groundings(concept.db_refs['WM']) return statements
[docs]@register_pipeline def merge_deltas(stmts_in): """Gather and merge original Influence delta information from evidence. This function is only applicable to Influence Statements that have subj and obj deltas. All other statement types are passed through unchanged. Polarities and adjectives for subjects and objects respectivey are collected and merged by travesrsing all evidences of a Statement. Parameters ---------- stmts_in : list[indra.statements.Statement] A list of INDRA Statements whose influence deltas should be merged. These Statements are meant to have been preassembled and potentially have multiple pieces of evidence. Returns ------- stmts_out : list[indra.statements.Statement] The list of Statements now with deltas merged at the Statement level. """ stmts_out = [] for stmt in stmts_in: # This operation is only applicable to Influences if not isinstance(stmt, Influence): stmts_out.append(stmt) continue # At this point this is guaranteed to be an Influence deltas = {} for role in ('subj', 'obj'): for info in ('polarity', 'adjectives'): key = (role, info) deltas[key] = [] for ev in stmt.evidence: entry = ev.annotations.get('%s_%s' % key) deltas[key].append(entry if entry else None) # POLARITY # For polarity we need to work in pairs polarity_pairs = list(zip(deltas[('subj', 'polarity')], deltas[('obj', 'polarity')])) # If we have some fully defined pairs, we take the most common one both_pols = [pair for pair in polarity_pairs if pair[0] is not None and pair[1] is not None] if both_pols: subj_pol, obj_pol = max(set(both_pols), key=both_pols.count) stmt.subj.delta.polarity = subj_pol stmt.obj.delta.polarity = obj_pol # Otherwise we prefer the case when at least one entry of the # pair is given else: one_pol = [pair for pair in polarity_pairs if pair[0] is not None or pair[1] is not None] if one_pol: subj_pol, obj_pol = max(set(one_pol), key=one_pol.count) stmt.subj.delta.polarity = subj_pol stmt.obj.delta.polarity = obj_pol # ADJECTIVES for attr, role in ((stmt.subj.delta, 'subj'), (stmt.obj.delta, 'obj')): all_adjectives = [] for adj in deltas[(role, 'adjectives')]: if isinstance(adj, list): all_adjectives += adj elif adj is not None: all_adjectives.append(adj) attr.adjectives = all_adjectives stmts_out.append(stmt) return stmts_out
@register_pipeline def listify(obj): if not isinstance(obj, list): return [obj] else: return obj