Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions infra/neo4j/scripts/neo4j-indexes.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ CREATE INDEX tweet_by_hydrated
FOR (n:Tweet)
ON (n.hydrated)

CREATE INDEX account_by_hydrated
FOR (n:Account)
ON (n.hydrated)

CREATE INDEX tweet_by_text
FOR (n:Tweet)
ON (n.text)
Expand All @@ -27,5 +31,7 @@ CREATE INDEX tweet_by_record_created_at
FOR (n:Tweet)
ON (n.record_created_at)

CALL db.index.fulltext.createNodeIndex("tweet_by_hashtags_fulltext",["Tweet"],["hashtags"])

CALL db.index.fulltext.createNodeIndex("tweet_by_text_fulltext",["Tweet"],["text"])

228 changes: 228 additions & 0 deletions modules/IngestDrugSynonymsSync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import requests
import pandas as pd
import re
import json
import sys
from pathlib import Path
import xlrd
import csv
import os
import tempfile
import numpy as np
from typing import Optional
from functools import partial
import logging
logger = logging.getLogger('ds')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocker, for next time gardening: python world seems to be moving to tighter forms like (a) full-module imports all on one line (alphabetical) and (b) explicit imports per-line



class IngestDrugSynonyms():

def __init__(self, configPath: Path = Path("config.json")):
self.check_config(configPath)
self.url_international: str = os.environ["URL_INT"] if isinstance(
os.environ["URL_INT"], str) else None
self.url_USA: str = os.environ["URL_USA"] if isinstance(
os.environ["URL_USA"], str) else None
self.url_drugbank: str = os.environ["URL_DRUGBANK"] if isinstance(
os.environ["URL_DRUGBANK"], str) else None
self.query_keywords: [] = os.environ["QUERY_KEYWORDS"].split(
",") if isinstance(os.environ["QUERY_KEYWORDS"], str) else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was a little surprised to see configfile / envvar stuff here vs. regular parameters (ex: testability), but ok!


@staticmethod
def check_config(configPath: Path = Path("config.json")):
def load_config(configPath: Path):
with open(configPath) as f:
config = json.load(f)
for key in config:
os.environ[key] = config[key]
rel_path: Path = Path(os.path.realpath(
__file__)).parents[0] / configPath
if rel_path.exists:
load_config(rel_path)
elif configPath.exists:
load_config(configPath)
else:
logger.warning(
"Could not load config file from: {}".format(configPath))

@staticmethod
def api(query, from_study, to_study, url):
url = url.format(query, from_study, to_study)
response = requests.request("GET", url)
return response.json()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error handler?


def api_wrapper(self, query, from_study):
return self.api(query, from_study, from_study+99, self.url_USA)
Copy link
Copy Markdown
Contributor

@lmeyerov lmeyerov May 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+99 some sort of interval len - make an optional param ..., span=99)?


def getAllStudiesByQuery(self, query: str) -> list:
logger.info("> STARTING scraping with '{}' keyword".format(query))
Copy link
Copy Markdown
Contributor

@lmeyerov lmeyerov May 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using logger, instead of eagerly interpolating, better to use %s, so interpolation only executes when in that log level:

logger.info("zzzz %s", query)
studies: list = []
from_study = 1
temp = self.api_wrapper(query, from_study)
nstudies = temp['FullStudiesResponse']['NStudiesFound']
logger.info(
"> {} studies found by '{}' keyword".format(nstudies, query))
if nstudies > 0:
studies = temp['FullStudiesResponse']['FullStudies']
for study_index in range(from_study+100, nstudies, 100):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, good to have stride as a parameter w/ default value, ..., span=100)

temp = self.api_wrapper(query, study_index)
studies.extend(temp['FullStudiesResponse']['FullStudies'])

return studies

@staticmethod
def xls_handler(r):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, this downloads an xls, rewrites into csv, and rereads the csv w/ pandas, and returns the result

Pandas has an xls reader, and may be able to work directly on the bytes buffer. A bit surprising to see like this, and I recall discussion of some formatting issues encountered along the way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not urgent if works, to be clear)

df = pd.DataFrame()
with tempfile.NamedTemporaryFile("wb") as xls_file:
xls_file.write(r.content)

try:
book = xlrd.open_workbook(
xls_file.name, encoding_override="utf-8")
except:
book = xlrd.open_workbook(
xls_file.name, encoding_override="cp1251")

sh = book.sheet_by_index(0)
with tempfile.NamedTemporaryFile("w") as csv_file:
wr = csv.writer(csv_file, quoting=csv.QUOTE_ALL)

for rownum in range(sh.nrows):
wr.writerow(sh.row_values(rownum))
df = pd.read_csv(csv_file.name)
csv_file.close()

xls_file.close()
return df

@staticmethod
def csvzip_handler(r):
df = pd.DataFrame()
with tempfile.NamedTemporaryFile("wb", suffix='.csv.zip') as file:
file.write(r.content)
df = pd.read_csv(file.name)
file.close()
return df

@staticmethod
def urlToDF(url: str, respHandler) -> pd.DataFrame:
r = requests.get(url, allow_redirects=True)
return respHandler(r)

@staticmethod
def _convert_US_studies(US_studies: dict) -> pd.DataFrame:
list_of_US_studies: list = []
for key in US_studies.keys():
for study in US_studies[key]:
temp_dict: dict = {}

temp_dict["trial_id"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["NCTId"]
temp_dict["study_url"] = "https://clinicaltrials.gov/show/" + \
temp_dict["trial_id"]

try:
temp_dict["intervention"] = study["Study"]["ProtocolSection"]["ArmsInterventionsModule"][
"ArmGroupList"]["ArmGroup"][0]["ArmGroupInterventionList"]["ArmGroupInterventionName"][0]
except:
temp_dict["intervention"] = ""
try:
temp_dict["study_type"] = study["Study"]["ProtocolSection"]["DesignModule"]["StudyType"]
except:
temp_dict["study_type"] = ""
try:
temp_dict["target_size"] = study["Study"]["ProtocolSection"]["DesignModule"]["EnrollmentInfo"]["EnrollmentCount"]
except:
temp_dict["target_size"] = ""
try:
if "OfficialTitle" in study["Study"]["ProtocolSection"]["IdentificationModule"].keys():
temp_dict["public_title"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["OfficialTitle"]
else:
temp_dict["public_title"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["BriefTitle"]
except:
temp_dict["public_title"] = ""
list_of_US_studies.append(temp_dict)
US_studies_df: pd.DataFrame = pd.DataFrame(list_of_US_studies)
return US_studies_df

def _scrapeData(self):
self.internationalstudies = self.urlToDF(
self.url_international, self.xls_handler)
self.drug_vocab_df = self.urlToDF(
self.url_drugbank, self.csvzip_handler)
self.all_US_studies_by_keyword: dict = {}
for key in self.query_keywords:
self.all_US_studies_by_keyword[key] = self.getAllStudiesByQuery(
key)

def _filterData(self):
self.drug_vocab_reduced = self.drug_vocab_df[[
'Common name', 'Synonyms']]
self.internationalstudies_reduced = self.internationalstudies[[
'TrialID', 'Intervention', 'Study type', 'web address', 'Target size', "Public title"]]
self.internationalstudies_reduced.columns = [col.replace(
" ", "_").lower() for col in self.internationalstudies_reduced.columns]
cols_to_replace: dict = {
"trialid": "trial_id",
"web_address": "study_url"
}
self.internationalstudies_reduced.columns = [cols_to_replace.get(
n, n) for n in self.internationalstudies_reduced.columns]

self.drug_vocab: dict = {}
for row in self.drug_vocab_reduced.to_dict('records'):
self.drug_vocab[row['Common name'].lower().strip()] = list(
filter(
lambda x: x is not None and len(
x) >= 3,
list(map(lambda x: x.lower().strip() if x.lower().strip() != row['Common name'].lower().strip() else None,
row["Synonyms"].split("|"))))) if isinstance(row["Synonyms"], str) else row["Synonyms"]

self.US_studies_df = self._convert_US_studies(
self.all_US_studies_by_keyword)

self.all_studies_df = pd.concat(
[self.US_studies_df, self.internationalstudies_reduced], sort=False, ignore_index=True)
self.all_studies_df.drop_duplicates(subset="trial_id", inplace=True)
self.all_studies_df.reset_index(drop=True, inplace=True)
self.all_studies_df.fillna("", inplace=True)
self.urls: list = list(self.all_studies_df["study_url"])
logger.info("> {} distinct studies found".format(
len(self.all_studies_df)))

def save_data_to_file(self):
"""Saving data option for debug purposes"""
logger.warning("Only Use it for debug purposes!!!")
self.internationalstudies.to_csv("internationalstudies.csv")
self.drug_vocab_df.to_csv("drug_vocab.csv")
with open('all_US_studies_by_keyword.json', 'w', encoding='utf-8') as f:
json.dump(self.all_US_studies_by_keyword,
f, ensure_ascii=False, indent=4)

def auto_get_and_clean_data(self):
self._scrapeData()
self._filterData()

def create_drug_study_links(self):

drug_vocab = self.drug_vocab

self.drugs = list(drug_vocab.keys())
drugs_and_syms: list = [item.lower() for item in self.drugs]
self.synonyms: list = [item for key in drug_vocab.keys() if isinstance(
drug_vocab[key], list) for item in drug_vocab[key]]

self.drug_synonym_rels = [(key, item, {}) for key in drug_vocab.keys(
) if isinstance(drug_vocab[key], list) for item in drug_vocab[key]]

drugs_and_syms.extend(item.lower() for item in self.synonyms)
ids_and_interventions: list = [(row["trial_id"], row["intervention"].lower(
)) for row in self.all_studies_df.to_dict('records')]

logger.info("Creating links between {} studies and {} drugs and synonyms".format(
len(ids_and_interventions), len(drugs_and_syms)))
self.appeared_in_edges: list = [(drug, trial_id, {}) for drug in drugs_and_syms for trial_id, intervention in ids_and_interventions if bool(
re.compile(r"\b%s\b" % re.escape(drug)).search(intervention))]

def create_url_study_links(self):
self.url_points_at_study_edges: list = [
(row["study_url"], row["trial_id"]) for row in self.all_studies_df.to_dict('records')]
35 changes: 35 additions & 0 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.created_at = t.user_created_at,
user.record_created_at = timestamp(),
user.job_name = t.job_name,
user.hydrated = 'FULL',
user.job_id = t.job_id
ON MATCH SET
user.name = t.user_name,
Expand All @@ -88,6 +89,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.created_at = t.user_created_at,
user.record_updated_at = timestamp(),
user.job_name = t.job_name,
user.hydrated = 'FULL',
user.job_id = t.job_id

//Add Reply to tweets if needed
Expand Down Expand Up @@ -154,6 +156,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.mentioned_screen_name = t.user_screen_name,
user.record_created_at = timestamp(),
user.job_name = t.job_name,
user.hydrated = 'PARTIAL',
user.job_id = t.job_id
WITH user, tweet
MERGE (tweet)-[:MENTIONED]->(user)
Expand Down Expand Up @@ -191,6 +194,11 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
RETURN tweet
"""

self.fetch_account_status = """UNWIND $ids AS i
MATCH (user:Account {id:i.id})
RETURN user.id, user.hydrated
"""

def __get_neo4j_graph(self, role_type):
creds = None
logging.debug('role_type: %s', role_type)
Expand Down Expand Up @@ -310,6 +318,33 @@ def get_tweet_hydrated_status_by_id(self, df: pd.DataFrame):
raise Exception(
'Parameter df must be a DataFrame with a column named "id" ')

# Get the status of a DataFrame of Tweets by id. Returns a dataframe with the hydrated status

# Get the status of a DataFrame of Account by id. Returns a dataframe with the hydrated status
def get_account_hydrated_status_by_id(self, df: pd.DataFrame):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
with graph.session() as session:
result = session.run(self.fetch_account_status, ids=ids)
res = pd.DataFrame([dict(record) for record in result])
logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
if len(res) == 0:
return df[['id']].assign(hydrated=None)
else:
res = res.rename(
columns={'user.id': 'id', 'user.hydrated': 'hydrated'})
# ensures hydrated=None if Neo4j does not answer for id
res = df[['id']].merge(res, how='left', on='id')
return res
else:
logging.debug('df columns %s', df.columns)
raise Exception(
'Parameter df must be a DataFrame with a column named "id" ')

# This saves the User and Tweet data right now
def __save_df_to_graph(self, df, job_name, job_id=None):
graph = self.__get_neo4j_graph('writer')
Expand Down
31 changes: 31 additions & 0 deletions modules/tests/test_Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ def setup_class(cls):
except Exception as err:
print(err)

# Now add some account data
user_data = [{'account_id': 1, 'hydrated': 'FULL'},
{'account_id': 2, 'hydrated': 'FULL'},
{'account_id': 3},
{'account_id': 4, 'hydrated': 'PARTIAL'},
{'account_id': 5, 'hydrated': 'PARTIAL'},
]

traversal = '''UNWIND $users AS u
MERGE (account:Account {id:u.account_id})
ON CREATE SET
account.hydrated = u.hydrated
'''
try:
with graph.session() as session:
session.run(traversal, users=user_data)
cls.ids = pd.DataFrame({'id': [1, 2, 3, 4, 5]})
except Exception as err:
print(err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check the error handling... e.g., should we detect & rerun if error?


def test_get_tweet_hydrated_status_by_id(self):
df = Neo4jDataAccess(
neo4j_creds=self.creds).get_tweet_hydrated_status_by_id(self.ids)
Expand All @@ -66,6 +86,17 @@ def test_get_tweet_hydrated_status_by_id(self):
assert df[df['id'] == 4]['hydrated'][3] == 'PARTIAL'
assert df[df['id'] == 5]['hydrated'][4] == 'PARTIAL'

def test_get_account_hydrated_status_by_id(self):
df = Neo4jDataAccess(
neo4j_creds=self.creds).get_account_hydrated_status_by_id(self.ids)

assert len(df) == 5
assert df[df['id'] == 1]['hydrated'][0] == 'FULL'
assert df[df['id'] == 2]['hydrated'][1] == 'FULL'
assert df[df['id'] == 3]['hydrated'][2] == None
assert df[df['id'] == 4]['hydrated'][3] == 'PARTIAL'
assert df[df['id'] == 5]['hydrated'][4] == 'PARTIAL'

def test_save_parquet_to_graph(self):
filename = os.path.join(os.path.dirname(__file__),
'data/2020_03_22_02_b1.snappy2.parquet')
Expand Down