Skip to content

Commit a80fd10

Browse files
committed
2 parents d6dcaff + b28dd2d commit a80fd10

4 files changed

Lines changed: 300 additions & 0 deletions

File tree

‎infra/neo4j/scripts/neo4j-indexes.cypher‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ CREATE INDEX tweet_by_hydrated
1515
FOR (n:Tweet)
1616
ON (n.hydrated)
1717

18+
CREATE INDEX account_by_hydrated
19+
FOR (n:Account)
20+
ON (n.hydrated)
21+
1822
CREATE INDEX tweet_by_text
1923
FOR (n:Tweet)
2024
ON (n.text)
@@ -27,5 +31,7 @@ CREATE INDEX tweet_by_record_created_at
2731
FOR (n:Tweet)
2832
ON (n.record_created_at)
2933

34+
CALL db.index.fulltext.createNodeIndex("tweet_by_hashtags_fulltext",["Tweet"],["hashtags"])
35+
3036
CALL db.index.fulltext.createNodeIndex("tweet_by_text_fulltext",["Tweet"],["text"])
3137

‎modules/IngestDrugSynonymsSync.py‎

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import requests
2+
import pandas as pd
3+
import re
4+
import json
5+
import sys
6+
from pathlib import Path
7+
import xlrd
8+
import csv
9+
import os
10+
import tempfile
11+
import numpy as np
12+
from typing import Optional
13+
from functools import partial
14+
import logging
15+
logger = logging.getLogger('ds')
16+
17+
18+
class IngestDrugSynonyms():
19+
20+
def __init__(self, configPath: Path = Path("config.json")):
21+
self.check_config(configPath)
22+
self.url_international: str = os.environ["URL_INT"] if isinstance(
23+
os.environ["URL_INT"], str) else None
24+
self.url_USA: str = os.environ["URL_USA"] if isinstance(
25+
os.environ["URL_USA"], str) else None
26+
self.url_drugbank: str = os.environ["URL_DRUGBANK"] if isinstance(
27+
os.environ["URL_DRUGBANK"], str) else None
28+
self.query_keywords: [] = os.environ["QUERY_KEYWORDS"].split(
29+
",") if isinstance(os.environ["QUERY_KEYWORDS"], str) else None
30+
31+
@staticmethod
32+
def check_config(configPath: Path = Path("config.json")):
33+
def load_config(configPath: Path):
34+
with open(configPath) as f:
35+
config = json.load(f)
36+
for key in config:
37+
os.environ[key] = config[key]
38+
rel_path: Path = Path(os.path.realpath(
39+
__file__)).parents[0] / configPath
40+
if rel_path.exists:
41+
load_config(rel_path)
42+
elif configPath.exists:
43+
load_config(configPath)
44+
else:
45+
logger.warning(
46+
"Could not load config file from: {}".format(configPath))
47+
48+
@staticmethod
49+
def api(query, from_study, to_study, url):
50+
url = url.format(query, from_study, to_study)
51+
response = requests.request("GET", url)
52+
return response.json()
53+
54+
def api_wrapper(self, query, from_study):
55+
return self.api(query, from_study, from_study+99, self.url_USA)
56+
57+
def getAllStudiesByQuery(self, query: str) -> list:
58+
logger.info("> STARTING scraping with '{}' keyword".format(query))
59+
studies: list = []
60+
from_study = 1
61+
temp = self.api_wrapper(query, from_study)
62+
nstudies = temp['FullStudiesResponse']['NStudiesFound']
63+
logger.info(
64+
"> {} studies found by '{}' keyword".format(nstudies, query))
65+
if nstudies > 0:
66+
studies = temp['FullStudiesResponse']['FullStudies']
67+
for study_index in range(from_study+100, nstudies, 100):
68+
temp = self.api_wrapper(query, study_index)
69+
studies.extend(temp['FullStudiesResponse']['FullStudies'])
70+
71+
return studies
72+
73+
@staticmethod
74+
def xls_handler(r):
75+
df = pd.DataFrame()
76+
with tempfile.NamedTemporaryFile("wb") as xls_file:
77+
xls_file.write(r.content)
78+
79+
try:
80+
book = xlrd.open_workbook(
81+
xls_file.name, encoding_override="utf-8")
82+
except:
83+
book = xlrd.open_workbook(
84+
xls_file.name, encoding_override="cp1251")
85+
86+
sh = book.sheet_by_index(0)
87+
with tempfile.NamedTemporaryFile("w") as csv_file:
88+
wr = csv.writer(csv_file, quoting=csv.QUOTE_ALL)
89+
90+
for rownum in range(sh.nrows):
91+
wr.writerow(sh.row_values(rownum))
92+
df = pd.read_csv(csv_file.name)
93+
csv_file.close()
94+
95+
xls_file.close()
96+
return df
97+
98+
@staticmethod
99+
def csvzip_handler(r):
100+
df = pd.DataFrame()
101+
with tempfile.NamedTemporaryFile("wb", suffix='.csv.zip') as file:
102+
file.write(r.content)
103+
df = pd.read_csv(file.name)
104+
file.close()
105+
return df
106+
107+
@staticmethod
108+
def urlToDF(url: str, respHandler) -> pd.DataFrame:
109+
r = requests.get(url, allow_redirects=True)
110+
return respHandler(r)
111+
112+
@staticmethod
113+
def _convert_US_studies(US_studies: dict) -> pd.DataFrame:
114+
list_of_US_studies: list = []
115+
for key in US_studies.keys():
116+
for study in US_studies[key]:
117+
temp_dict: dict = {}
118+
119+
temp_dict["trial_id"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["NCTId"]
120+
temp_dict["study_url"] = "https://clinicaltrials.gov/show/" + \
121+
temp_dict["trial_id"]
122+
123+
try:
124+
temp_dict["intervention"] = study["Study"]["ProtocolSection"]["ArmsInterventionsModule"][
125+
"ArmGroupList"]["ArmGroup"][0]["ArmGroupInterventionList"]["ArmGroupInterventionName"][0]
126+
except:
127+
temp_dict["intervention"] = ""
128+
try:
129+
temp_dict["study_type"] = study["Study"]["ProtocolSection"]["DesignModule"]["StudyType"]
130+
except:
131+
temp_dict["study_type"] = ""
132+
try:
133+
temp_dict["target_size"] = study["Study"]["ProtocolSection"]["DesignModule"]["EnrollmentInfo"]["EnrollmentCount"]
134+
except:
135+
temp_dict["target_size"] = ""
136+
try:
137+
if "OfficialTitle" in study["Study"]["ProtocolSection"]["IdentificationModule"].keys():
138+
temp_dict["public_title"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["OfficialTitle"]
139+
else:
140+
temp_dict["public_title"] = study["Study"]["ProtocolSection"]["IdentificationModule"]["BriefTitle"]
141+
except:
142+
temp_dict["public_title"] = ""
143+
list_of_US_studies.append(temp_dict)
144+
US_studies_df: pd.DataFrame = pd.DataFrame(list_of_US_studies)
145+
return US_studies_df
146+
147+
def _scrapeData(self):
148+
self.internationalstudies = self.urlToDF(
149+
self.url_international, self.xls_handler)
150+
self.drug_vocab_df = self.urlToDF(
151+
self.url_drugbank, self.csvzip_handler)
152+
self.all_US_studies_by_keyword: dict = {}
153+
for key in self.query_keywords:
154+
self.all_US_studies_by_keyword[key] = self.getAllStudiesByQuery(
155+
key)
156+
157+
def _filterData(self):
158+
self.drug_vocab_reduced = self.drug_vocab_df[[
159+
'Common name', 'Synonyms']]
160+
self.internationalstudies_reduced = self.internationalstudies[[
161+
'TrialID', 'Intervention', 'Study type', 'web address', 'Target size', "Public title"]]
162+
self.internationalstudies_reduced.columns = [col.replace(
163+
" ", "_").lower() for col in self.internationalstudies_reduced.columns]
164+
cols_to_replace: dict = {
165+
"trialid": "trial_id",
166+
"web_address": "study_url"
167+
}
168+
self.internationalstudies_reduced.columns = [cols_to_replace.get(
169+
n, n) for n in self.internationalstudies_reduced.columns]
170+
171+
self.drug_vocab: dict = {}
172+
for row in self.drug_vocab_reduced.to_dict('records'):
173+
self.drug_vocab[row['Common name'].lower().strip()] = list(
174+
filter(
175+
lambda x: x is not None and len(
176+
x) >= 3,
177+
list(map(lambda x: x.lower().strip() if x.lower().strip() != row['Common name'].lower().strip() else None,
178+
row["Synonyms"].split("|"))))) if isinstance(row["Synonyms"], str) else row["Synonyms"]
179+
180+
self.US_studies_df = self._convert_US_studies(
181+
self.all_US_studies_by_keyword)
182+
183+
self.all_studies_df = pd.concat(
184+
[self.US_studies_df, self.internationalstudies_reduced], sort=False, ignore_index=True)
185+
self.all_studies_df.drop_duplicates(subset="trial_id", inplace=True)
186+
self.all_studies_df.reset_index(drop=True, inplace=True)
187+
self.all_studies_df.fillna("", inplace=True)
188+
self.urls: list = list(self.all_studies_df["study_url"])
189+
logger.info("> {} distinct studies found".format(
190+
len(self.all_studies_df)))
191+
192+
def save_data_to_file(self):
193+
"""Saving data option for debug purposes"""
194+
logger.warning("Only Use it for debug purposes!!!")
195+
self.internationalstudies.to_csv("internationalstudies.csv")
196+
self.drug_vocab_df.to_csv("drug_vocab.csv")
197+
with open('all_US_studies_by_keyword.json', 'w', encoding='utf-8') as f:
198+
json.dump(self.all_US_studies_by_keyword,
199+
f, ensure_ascii=False, indent=4)
200+
201+
def auto_get_and_clean_data(self):
202+
self._scrapeData()
203+
self._filterData()
204+
205+
def create_drug_study_links(self):
206+
207+
drug_vocab = self.drug_vocab
208+
209+
self.drugs = list(drug_vocab.keys())
210+
drugs_and_syms: list = [item.lower() for item in self.drugs]
211+
self.synonyms: list = [item for key in drug_vocab.keys() if isinstance(
212+
drug_vocab[key], list) for item in drug_vocab[key]]
213+
214+
self.drug_synonym_rels = [(key, item, {}) for key in drug_vocab.keys(
215+
) if isinstance(drug_vocab[key], list) for item in drug_vocab[key]]
216+
217+
drugs_and_syms.extend(item.lower() for item in self.synonyms)
218+
ids_and_interventions: list = [(row["trial_id"], row["intervention"].lower(
219+
)) for row in self.all_studies_df.to_dict('records')]
220+
221+
logger.info("Creating links between {} studies and {} drugs and synonyms".format(
222+
len(ids_and_interventions), len(drugs_and_syms)))
223+
self.appeared_in_edges: list = [(drug, trial_id, {}) for drug in drugs_and_syms for trial_id, intervention in ids_and_interventions if bool(
224+
re.compile(r"\b%s\b" % re.escape(drug)).search(intervention))]
225+
226+
def create_url_study_links(self):
227+
self.url_points_at_study_edges: list = [
228+
(row["study_url"], row["trial_id"]) for row in self.all_studies_df.to_dict('records')]

‎modules/Neo4jDataAccess.py‎

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
7777
user.created_at = t.user_created_at,
7878
user.record_created_at = timestamp(),
7979
user.job_name = t.job_name,
80+
user.hydrated = 'FULL',
8081
user.job_id = t.job_id
8182
ON MATCH SET
8283
user.name = t.user_name,
@@ -88,6 +89,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
8889
user.created_at = t.user_created_at,
8990
user.record_updated_at = timestamp(),
9091
user.job_name = t.job_name,
92+
user.hydrated = 'FULL',
9193
user.job_id = t.job_id
9294
9395
//Add Reply to tweets if needed
@@ -154,6 +156,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
154156
user.mentioned_screen_name = t.user_screen_name,
155157
user.record_created_at = timestamp(),
156158
user.job_name = t.job_name,
159+
user.hydrated = 'PARTIAL',
157160
user.job_id = t.job_id
158161
WITH user, tweet
159162
MERGE (tweet)-[:MENTIONED]->(user)
@@ -191,6 +194,11 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
191194
RETURN tweet
192195
"""
193196

197+
self.fetch_account_status = """UNWIND $ids AS i
198+
MATCH (user:Account {id:i.id})
199+
RETURN user.id, user.hydrated
200+
"""
201+
194202
def __get_neo4j_graph(self, role_type):
195203
creds = None
196204
logging.debug('role_type: %s', role_type)
@@ -310,6 +318,33 @@ def get_tweet_hydrated_status_by_id(self, df: pd.DataFrame):
310318
raise Exception(
311319
'Parameter df must be a DataFrame with a column named "id" ')
312320

321+
# Get the status of a DataFrame of Tweets by id. Returns a dataframe with the hydrated status
322+
323+
# Get the status of a DataFrame of Account by id. Returns a dataframe with the hydrated status
324+
def get_account_hydrated_status_by_id(self, df: pd.DataFrame):
325+
if 'id' in df:
326+
graph = self.__get_neo4j_graph('reader')
327+
ids = []
328+
for index, row in df.iterrows():
329+
ids.append({'id': int(row['id'])})
330+
with graph.session() as session:
331+
result = session.run(self.fetch_account_status, ids=ids)
332+
res = pd.DataFrame([dict(record) for record in result])
333+
logging.debug('Response info: %s rows, %s columns: %s' %
334+
(len(res), len(res.columns), res.columns))
335+
if len(res) == 0:
336+
return df[['id']].assign(hydrated=None)
337+
else:
338+
res = res.rename(
339+
columns={'user.id': 'id', 'user.hydrated': 'hydrated'})
340+
# ensures hydrated=None if Neo4j does not answer for id
341+
res = df[['id']].merge(res, how='left', on='id')
342+
return res
343+
else:
344+
logging.debug('df columns %s', df.columns)
345+
raise Exception(
346+
'Parameter df must be a DataFrame with a column named "id" ')
347+
313348
# This saves the User and Tweet data right now
314349
def __save_df_to_graph(self, df, job_name, job_id=None):
315350
graph = self.__get_neo4j_graph('writer')

‎modules/tests/test_Neo4jDataAccess.py‎

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,26 @@ def setup_class(cls):
5555
except Exception as err:
5656
print(err)
5757

58+
# Now add some account data
59+
user_data = [{'account_id': 1, 'hydrated': 'FULL'},
60+
{'account_id': 2, 'hydrated': 'FULL'},
61+
{'account_id': 3},
62+
{'account_id': 4, 'hydrated': 'PARTIAL'},
63+
{'account_id': 5, 'hydrated': 'PARTIAL'},
64+
]
65+
66+
traversal = '''UNWIND $users AS u
67+
MERGE (account:Account {id:u.account_id})
68+
ON CREATE SET
69+
account.hydrated = u.hydrated
70+
'''
71+
try:
72+
with graph.session() as session:
73+
session.run(traversal, users=user_data)
74+
cls.ids = pd.DataFrame({'id': [1, 2, 3, 4, 5]})
75+
except Exception as err:
76+
print(err)
77+
5878
def test_get_tweet_hydrated_status_by_id(self):
5979
df = Neo4jDataAccess(
6080
neo4j_creds=self.creds).get_tweet_hydrated_status_by_id(self.ids)
@@ -66,6 +86,17 @@ def test_get_tweet_hydrated_status_by_id(self):
6686
assert df[df['id'] == 4]['hydrated'][3] == 'PARTIAL'
6787
assert df[df['id'] == 5]['hydrated'][4] == 'PARTIAL'
6888

89+
def test_get_account_hydrated_status_by_id(self):
90+
df = Neo4jDataAccess(
91+
neo4j_creds=self.creds).get_account_hydrated_status_by_id(self.ids)
92+
93+
assert len(df) == 5
94+
assert df[df['id'] == 1]['hydrated'][0] == 'FULL'
95+
assert df[df['id'] == 2]['hydrated'][1] == 'FULL'
96+
assert df[df['id'] == 3]['hydrated'][2] == None
97+
assert df[df['id'] == 4]['hydrated'][3] == 'PARTIAL'
98+
assert df[df['id'] == 5]['hydrated'][4] == 'PARTIAL'
99+
69100
def test_save_parquet_to_graph(self):
70101
filename = os.path.join(os.path.dirname(__file__),
71102
'data/2020_03_22_02_b1.snappy2.parquet')

0 commit comments

Comments
 (0)