Skip to content

Commit b1a3a46

Browse files
authored
Merge pull request #64 from TheDataRideAlongs/fasterNeo4jDrugInsertion
Faster neo4j drug, synonym and drug insertion
2 parents d725d65 + 5ba15db commit b1a3a46

5 files changed

Lines changed: 264 additions & 173 deletions

File tree

‎.gitignore‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,14 @@ cython_debug/
141141
# static files generated from Django application using `collectstatic`
142142
media
143143
static
144+
145+
# localKeys for dev
146+
localKeys
147+
148+
# local dev
149+
.Domino/
150+
.ipynb_checkpoints/
151+
.vscode/
152+
localKeys/
153+
modules/__pycache__/
154+

‎IngestDrugSynonymsWF.py‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44

55
import logging
66

7-
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
7+
logging.basicConfig(format='>>> %(message)s', level=logging.INFO)
88

99
drugSynonym = IngestDrugSynonyms()
1010
drugSynonym.auto_get_and_clean_data()
1111
drugSynonym.create_drug_study_links()
1212
drugSynonym.create_url_study_links()
1313

1414
neo4jBridge = DrugSynonymDataToNeo4j()
15-
neo4jBridge.merge_drugs_synonyms_and_link_between(drugSynonym.drug_vocab)
15+
16+
neo4jBridge.merge_drugs(drugSynonym.drugs)
17+
neo4jBridge.merge_synonyms(drugSynonym.synonyms)
18+
neo4jBridge.merge_drug_to_synonym_rels(drugSynonym.drug_synonym_rels)
19+
1620
neo4jBridge.merge_studies(drugSynonym.all_studies_df)
1721

1822
neo4jBridge.merge_drug_to_study_rels(drugSynonym.appeared_in_edges)

‎modules/DrugSynonymDataToNeo4j.py‎

Lines changed: 174 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from numpy import isnan
55
import logging
66
from urllib.parse import urlparse
7+
from progress.bar import Bar
78

89
logger = logging.getLogger('ds-neo4j')
910

@@ -29,26 +30,139 @@ def property_type_checker(property_value):
2930
def cypher_template_filler(cypher_template:str,data:dict) -> str:
3031
return cypher_template.format(**data).replace("\n","")
3132

33+
def generate_unwind_property_cypher(properties:list,unwind_iterator_item_name:str) -> str:
34+
resp:str = ""
35+
if properties != [] and properties[0] != {}:
36+
resp = "{"
37+
for key in properties[0].keys():
38+
resp += """{key}:{unwind_iterator_item_name}.{key},""".format(key=key,unwind_iterator_item_name=unwind_iterator_item_name)
39+
resp = resp[:-1] + "}"
40+
return resp
41+
3242
class DrugSynonymDataToNeo4j(object):
3343

3444
def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="letmein", encrypted=False):
3545
self._driver = GraphDatabase.driver(uri, auth=(user, password), encrypted=encrypted)
36-
self.study_triald_and_neo4j_id_pairs:dict = {}
37-
self.drug_or_synonym_name_and_neo4j_id_pairs:dict = {}
38-
self.url_and_neo4j_id_pairs:dict = {}
46+
self.id_store:dict = {}
3947

4048
def reset_id_store(self):
41-
self.study_triald_and_neo4j_id_pairs = {}
42-
self.drug_or_synonym_name_and_neo4j_id_pairs = {}
43-
self.url_and_neo4j_id_pairs = {}
49+
self.id_store:dict = {}
4450

4551
def close(self):
4652
self._driver.close()
53+
54+
def batch_node_merge_handler(self,raw_data,generate_nodes_list,generate_node_data, node_type:str, chunk_size = 1000):
55+
logger.info("Merging '{}' Job is Started to merge {} nodes".format(node_type,len(raw_data)))
56+
node_merging_func = self._batch_merge_nodes
57+
58+
node_ids:list = []
59+
60+
nodes_list:list = generate_nodes_list(raw_data)
61+
62+
nodes_data = generate_node_data(raw_data)
63+
properties = generate_unwind_property_cypher(nodes_data,unwind_iterator_item_name = "node")
64+
65+
with self._driver.session() as session:
66+
67+
with Bar("Loading '{}' nodes".format(node_type), fill='@', suffix='%(percent)d%%',max=len(nodes_list)) as bar:
68+
for i in range(0, len(nodes_data), chunk_size):
69+
nodes_data_slice = nodes_data[i:i + chunk_size]
70+
71+
node_ids.extend(session.write_transaction(node_merging_func, node_type, nodes_data_slice, properties))
72+
bar.next(chunk_size)
73+
74+
self.id_store.update({key:value for key,value in zip(nodes_list,node_ids)})
75+
76+
logger.info("Merging '{}' Job is >> Done << to merge {} nodes".format(node_type,len(node_ids)))
4777

4878
@staticmethod
49-
def _merge_node(tx, node_type, properties:Optional[dict] = None):
79+
def generate_drug_and_synonym_edge_props(raw_data:list) -> list:
80+
return [prop for fro,to,prop in raw_data]
81+
82+
@staticmethod
83+
def generate_drug_and_synonym_edge_list_data(raw_data:list,id_store:dict) -> list:
84+
return [dict({"from_id":id_store[fro],"to_id":id_store[to]},**prop) for fro,to,prop in raw_data]
85+
86+
def batch_edge_merge_handler(self, raw_data, generate_edge_data, generate_edge_props, edge_type:str, chunk_size = 1000):
87+
logger.info("Merging '{}' Job is Started to merge {} edges".format(edge_type,len(raw_data)))
88+
edge_merging_func = self._batch_merge_edges
89+
90+
edges_data = generate_edge_data(raw_data, self.id_store)
91+
properties = generate_unwind_property_cypher(generate_edge_props(raw_data),unwind_iterator_item_name = "edge")
92+
93+
with self._driver.session() as session:
94+
95+
with Bar("Loading '{}' edges".format(edge_type), fill='@', suffix='%(percent)d%%',max=len(edges_data)) as bar:
96+
for i in range(0, len(edges_data), chunk_size):
97+
edges_data_slice = edges_data[i:i + chunk_size]
98+
99+
session.write_transaction(edge_merging_func, edge_type, edges_data_slice, properties)
100+
bar.next(chunk_size)
101+
102+
logger.info("Merging '{}' Job is >> Done << to merge {} edges".format(edge_type,len(edges_data)))
103+
104+
def merge_drug_to_synonym_rels(self,drug_synonym_rels):
105+
self.batch_edge_merge_handler(drug_synonym_rels,self.generate_drug_and_synonym_edge_list_data,self.generate_drug_and_synonym_edge_props,edge_type="KNOWN_AS")
106+
107+
@staticmethod
108+
def generate_drug_nodes_list(drugs:list) -> list:
109+
return drugs
110+
111+
@staticmethod
112+
def generate_drug_node_data(drugs:list) -> list:
113+
return [{"name":drug} for drug in drugs]
114+
115+
def merge_drugs(self,drug_vocab):
116+
self.batch_node_merge_handler(drug_vocab,self.generate_drug_nodes_list,self.generate_drug_node_data,node_type="Drug")
117+
118+
@staticmethod
119+
def generate_synonym_nodes_list(synonyms:list) -> list:
120+
return synonyms
121+
122+
@staticmethod
123+
def generate_synonym_node_data(synonyms:list) -> list:
124+
return [{"name":synonym} for synonym in synonyms]
125+
126+
def merge_synonyms(self,drug_vocab):
127+
self.batch_node_merge_handler(drug_vocab,self.generate_synonym_nodes_list,self.generate_synonym_node_data,node_type="Synonym")
128+
129+
@staticmethod
130+
def _batch_merge_nodes(tx, node_type, nodes_data_slice:dict, properties:str):
131+
data:dict = {
132+
"node_type":node_type,
133+
"properties":properties
134+
}
135+
base_cypher = """
136+
UNWIND $nodes as node
137+
MERGE (n:{node_type} {properties})
138+
RETURN id(n) as id
139+
"""
140+
141+
result = tx.run(cypher_template_filler(base_cypher,data),nodes=nodes_data_slice)
142+
return [int(item["id"]) for item in result]
143+
144+
@staticmethod
145+
def _batch_merge_edges(tx, edge_type, edges_data_slice:dict, properties:str, direction = ">"):
146+
data:dict = {
147+
"edge_type":edge_type,
148+
"properties":properties,
149+
"direction":direction
150+
}
151+
base_cypher = """
152+
UNWIND $edges as edge
153+
MATCH (from)
154+
WHERE ID(from) = edge.from_id
155+
MATCH (to)
156+
WHERE ID(to) = edge.to_id
157+
MERGE (from)-[r:{edge_type} {properties}]-{direction}(to)
158+
RETURN id(r) as id
159+
"""
50160

161+
result = tx.run(cypher_template_filler(base_cypher,data),edges=edges_data_slice)
162+
return [int(item["id"]) for item in result]
51163

164+
@staticmethod
165+
def _merge_node(tx, node_type, properties:Optional[dict] = None):
52166

53167
data:dict = {
54168
"node_type":node_type,
@@ -84,114 +198,51 @@ def _merge_edge(tx, from_id, to_id, edge_type, properties:Optional[dict] = None,
84198
"""
85199
result = tx.run(cypher_template_filler(base_cypher,data))
86200
return result.single()[0]
201+
202+
@staticmethod
203+
def generate_study_nodes_list(studies:DataFrame) -> list:
204+
return studies["trial_id"]
205+
206+
@staticmethod
207+
def generate_study_node_data(studies:DataFrame) -> list:
208+
return studies.to_dict('records')
87209

88210
def merge_studies(self,studies:DataFrame):
89-
node_merging_func = self._merge_node
90-
with self._driver.session() as session:
91-
logger.info("> Merging Studies Job is Started")
92-
count_node = 0
93-
prev_count_node = 0
94-
95-
for study in studies.to_dict('records'):
96-
node_type = "Study"
97-
properties:dict = study
98-
study_id = session.write_transaction(node_merging_func, node_type, properties)
99-
self.study_triald_and_neo4j_id_pairs[study["trial_id"]] = study_id
100-
count_node += 1
101-
if count_node > prev_count_node + 1000:
102-
prev_count_node = count_node
103-
logger.info("> {} nodes already merged".format(count_node))
104-
105-
logger.info("> Merging Studies Job is >> Done << with {} nodes merged".format(count_node))
106-
107-
def merge_drugs_synonyms_and_link_between(self,drug_vocab):
108-
node_merging_func = self._merge_node
109-
edge_merging_func = self._merge_edge
110-
with self._driver.session() as session:
111-
logger.info("> Merging Drugs and Synonyms Job is Started to merge {} drugs with synonyms".format(len(drug_vocab)))
112-
count_node = 0
113-
count_edge = 0
114-
prev_count_node = 0
115-
prev_count_edge = 0
116-
117-
for drug in drug_vocab.keys():
118-
node_type = "Drug"
119-
properties:dict = {
120-
"name":drug
121-
}
122-
123-
drug_id = session.write_transaction(node_merging_func, node_type, properties)
124-
self.drug_or_synonym_name_and_neo4j_id_pairs[drug] = drug_id
125-
count_node += 1
126-
if isinstance(drug_vocab[drug],list):
127-
for synonym in drug_vocab[drug]:
128-
node_type = "Synonym"
129-
properties:dict = {
130-
"name":synonym
131-
}
132-
synonym_id = session.write_transaction(node_merging_func, node_type, properties)
133-
self.drug_or_synonym_name_and_neo4j_id_pairs[synonym] = synonym_id
134-
count_node += 1
135-
136-
edge_type = "KNOWN_AS"
137-
session.write_transaction(edge_merging_func, drug_id, synonym_id, edge_type)
138-
count_edge += 1
139-
140-
if count_node > prev_count_node + 1000 or count_edge > prev_count_edge + 1000:
141-
prev_count_node = count_node
142-
prev_count_edge = count_edge
143-
logger.info("> {} nodes and {} edges already merged".format(count_node,count_edge))
144-
145-
logger.info("> Merging Drugs and Synonyms Job is >> Done << with {} nodes and {} edges merged".format(count_node,count_edge))
211+
self.batch_node_merge_handler(studies,self.generate_study_nodes_list,self.generate_study_node_data,node_type="Study")
212+
213+
@staticmethod
214+
def generate_drug_to_study_edge_props(raw_data:list) -> list:
215+
return [prop for fro,to,prop in raw_data]
216+
217+
@staticmethod
218+
def generate_drug_to_study_list_data(raw_data:list,id_store:dict) -> list:
219+
return [dict({"from_id":id_store[fro],"to_id":id_store[to]},**prop) for fro,to,prop in raw_data]
146220

147221
def merge_drug_to_study_rels(self,edges:list):
148-
edge_merging_func = self._merge_edge
149-
150-
if self.study_triald_and_neo4j_id_pairs != {} and self.drug_or_synonym_name_and_neo4j_id_pairs != {}:
151-
study_id_lookup:dict = self.study_triald_and_neo4j_id_pairs
152-
drug_id_lookup:dict = self.drug_or_synonym_name_and_neo4j_id_pairs
153-
with self._driver.session() as session:
154-
logger.info("> Merging connections of Drugs&Synonyms to Studies Job is Started with {} edges to merge".format(len(edges)))
155-
edge_type = "APPEARED_IN"
156-
edge_ids:list = [session.write_transaction(edge_merging_func, drug_id_lookup[drug], study_id_lookup[trial_id], edge_type) for drug, trial_id in edges]
157-
logger.info("> Merging connections of Drugs&Synonyms to Studies Job is Finished with {} edges merged".format(len(edge_ids)))
158-
else:
159-
logger.warning("No Neo4j ID information is available for merging connections of Drugs&Synonyms to Studies")
222+
self.batch_edge_merge_handler(edges,self.generate_drug_to_study_list_data,self.generate_drug_to_study_edge_props,edge_type="APPEARED_IN")
223+
224+
@staticmethod
225+
def generate_url_nodes_list(urls:list) -> list:
226+
return urls
227+
228+
def generate_url_node_data(self,urls:list) -> list:
229+
return [self._parse_url(url) for url in urls]
230+
160231

161232
def merge_url(self,urls:list):
162-
node_merging_func = self._merge_node
163-
with self._driver.session() as session:
164-
logger.info("> Merging Urls Job is Started")
165-
count_node = 0
166-
prev_count_node = 0
167-
168-
for url in urls:
169-
node_type = "Url"
170-
properties:dict = self._parse_url(url)
171-
url_id = session.write_transaction(node_merging_func, node_type, properties)
172-
self.url_and_neo4j_id_pairs[url] = url_id
173-
count_node += 1
174-
175-
if count_node > prev_count_node + 1000:
176-
prev_count_node = count_node
177-
logger.info("> {} nodes already merged".format(count_node))
178-
179-
logger.info("> Merging Url Job is >> Done << with {} nodes merged".format(count_node))
233+
self.batch_node_merge_handler(urls,self.generate_url_nodes_list,self.generate_url_node_data,node_type="Url")
234+
235+
@staticmethod
236+
def generate_url_to_study_edge_props(raw_data:list) -> list:
237+
return [prop for fro,to,prop in raw_data]
238+
239+
@staticmethod
240+
def generate_url_to_study_list_data(raw_data:list,id_store:dict) -> list:
241+
return [dict({"from_id":id_store[fro],"to_id":id_store[to]},**prop) for fro,to,prop in raw_data]
180242

181243
def merge_url_to_study_rels(self,edges:list):
182-
edge_merging_func = self._merge_edge
183-
184-
if self.study_triald_and_neo4j_id_pairs != {} and self.url_and_neo4j_id_pairs != {}:
185-
study_id_lookup:dict = self.study_triald_and_neo4j_id_pairs
186-
url_id_lookup:dict = self.url_and_neo4j_id_pairs
187-
with self._driver.session() as session:
188-
logger.info("> Merging connections of Urls to Studies Job is Started with {} edges to merge".format(len(edges)))
189-
edge_type = "POINTS_AT"
190-
edge_ids:list = [session.write_transaction(edge_merging_func, url_id_lookup[url], study_id_lookup[trial_id], edge_type) for url, trial_id in edges]
191-
logger.info("> Merging connections of Urls to Studies Job is Finished with {} edges merged".format(len(edge_ids)))
192-
else:
193-
logger.warning("No Neo4j ID information is available for Merging connections of Drugs&Synonyms to Studies")
194-
244+
self.batch_edge_merge_handler(edges,self.generate_url_to_study_list_data,self.generate_url_to_study_edge_props,edge_type="POINTS_AT")
245+
195246
@staticmethod
196247
def _parse_url(url:str):
197248
parsed = urlparse(url)
@@ -200,14 +251,19 @@ def _parse_url(url:str):
200251
'url': url,
201252
'job_id': '',
202253
'job_name': '',
203-
'schema': parsed.scheme,
204-
'netloc': parsed.netloc,
205-
'path': parsed.path,
206-
'params': parsed.params,
207-
'query': parsed.query,
208-
'fragment': parsed.fragment,
209-
'username': parsed.username,
210-
'password': parsed.password,
211-
'hostname': parsed.hostname,
212-
'port': parsed.port,
213-
}
254+
'schema': parsed.scheme if parsed.scheme else '',
255+
'netloc': parsed.netloc if parsed.netloc else '',
256+
'path': parsed.path if parsed.path else '',
257+
'params': parsed.params if parsed.params else '',
258+
'query': parsed.query if parsed.query else '',
259+
'fragment': parsed.fragment if parsed.fragment else '',
260+
'username': parsed.username if parsed.username else '',
261+
'password': parsed.password if parsed.password else '',
262+
'hostname': parsed.hostname if parsed.hostname else '',
263+
'port': parsed.port if parsed.port else '',
264+
}
265+
266+
267+
268+
269+

0 commit comments

Comments
 (0)