-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathDrugSynonymDataToNeo4j.py
More file actions
218 lines (184 loc) · 9.31 KB
/
DrugSynonymDataToNeo4j.py
File metadata and controls
218 lines (184 loc) · 9.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
from neo4j import GraphDatabase
from typing import Optional
from pandas import DataFrame
from numpy import isnan
import logging
from urllib.parse import urlparse
logger = logging.getLogger('ds-neo4j')
def dict_to_property_str(properties: Optional[dict] = None) -> str:
def property_type_checker(property_value):
if isinstance(property_value, int) or isinstance(property_value, float):
pass
elif isinstance(property_value, str):
property_value = '''"''' + \
property_value.replace('"', r"\"") + '''"'''
elif not property_value:
property_value = "''"
return property_value
resp: str = ""
if properties:
resp = "{"
for key in properties.keys():
resp += """{key}:{value},""".format(key=key,
value=property_type_checker(properties[key]))
resp = resp[:-1] + "}"
return resp
def cypher_template_filler(cypher_template: str, data: dict) -> str:
return cypher_template.format(**data).replace("\n", "")
class DrugSynonymDataToNeo4j(object):
def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="letmein", encrypted=False):
self._driver = GraphDatabase.driver(uri, auth=(user, password), encrypted=encrypted)
self.study_triald_and_neo4j_id_pairs:dict = {}
self.drug_or_synonym_name_and_neo4j_id_pairs:dict = {}
self.url_and_neo4j_id_pairs:dict = {}
def reset_id_store(self):
self.study_triald_and_neo4j_id_pairs = {}
self.drug_or_synonym_name_and_neo4j_id_pairs = {}
self.url_and_neo4j_id_pairs = {}
def close(self):
self._driver.close()
@staticmethod
def _merge_node(tx, node_type, properties:Optional[dict] = None):
data:dict = {
"node_type":node_type,
"properties":dict_to_property_str(properties)
}
base_cypher = """
MERGE (n:{node_type} {properties})
ON CREATE SET n.record_created_at=timestamp()
ON MATCH SET n.record_updated_at=timestamp()
RETURN id(n)
"""
result = tx.run(cypher_template_filler(base_cypher,data))
return result.single()[0]
@staticmethod
def _merge_edge(tx, from_id, to_id, edge_type, properties:Optional[dict] = None, direction = ">"):
if not direction in [">",""]:
raise ValueError
data:dict = {
"from_id":int(from_id),
"to_id":int(to_id),
"edge_type":edge_type,
"direction":direction,
"properties":dict_to_property_str(properties)
}
base_cypher = """
MATCH (from)
WHERE ID(from) = {from_id}
MATCH (to)
WHERE ID(to) = {to_id}
MERGE (from)-[r:{edge_type} {properties}]-{direction}(to)
RETURN id(r)
"""
result = tx.run(cypher_template_filler(base_cypher,data))
return result.single()[0]
def merge_studies(self,studies:DataFrame):
node_merging_func = self._merge_node
with self._driver.session() as session:
logger.info("> Merging Studies Job is Started")
count_node = 0
prev_count_node = 0
for study in studies.to_dict('records'):
node_type = "Study"
properties:dict = study
study_id = session.write_transaction(node_merging_func, node_type, properties)
self.study_triald_and_neo4j_id_pairs[study["trial_id"]] = study_id
count_node += 1
if count_node > prev_count_node + 1000:
prev_count_node = count_node
logger.info("> {} nodes already merged".format(count_node))
logger.info("> Merging Studies Job is >> Done << with {} nodes merged".format(count_node))
def merge_drugs_synonyms_and_link_between(self,drug_vocab):
node_merging_func = self._merge_node
edge_merging_func = self._merge_edge
with self._driver.session() as session:
logger.info("> Merging Drugs and Synonyms Job is Started to merge {} drugs with synonyms".format(len(drug_vocab)))
count_node = 0
count_edge = 0
prev_count_node = 0
prev_count_edge = 0
for drug in drug_vocab.keys():
node_type = "Drug"
properties:dict = {
"name":drug
}
drug_id = session.write_transaction(node_merging_func, node_type, properties)
self.drug_or_synonym_name_and_neo4j_id_pairs[drug] = drug_id
count_node += 1
if isinstance(drug_vocab[drug],list):
for synonym in drug_vocab[drug]:
node_type = "Synonym"
properties: dict = {
"name": synonym
}
synonym_id = session.write_transaction(node_merging_func, node_type, properties)
self.drug_or_synonym_name_and_neo4j_id_pairs[synonym] = synonym_id
count_node += 1
edge_type = "KNOWN_AS"
session.write_transaction(
edge_merging_func, drug_id, synonym_id, edge_type)
count_edge += 1
if count_node > prev_count_node + 1000 or count_edge > prev_count_edge + 1000:
prev_count_node = count_node
prev_count_edge = count_edge
logger.info("> {} nodes and {} edges already merged".format(count_node,count_edge))
logger.info("> Merging Drugs and Synonyms Job is >> Done << with {} nodes and {} edges merged".format(count_node,count_edge))
def merge_drug_to_study_rels(self,edges:list):
edge_merging_func = self._merge_edge
if self.study_triald_and_neo4j_id_pairs != {} and self.drug_or_synonym_name_and_neo4j_id_pairs != {}:
study_id_lookup:dict = self.study_triald_and_neo4j_id_pairs
drug_id_lookup:dict = self.drug_or_synonym_name_and_neo4j_id_pairs
with self._driver.session() as session:
logger.info("> Merging connections of Drugs&Synonyms to Studies Job is Started with {} edges to merge".format(len(edges)))
edge_type = "APPEARED_IN"
edge_ids:list = [session.write_transaction(edge_merging_func, drug_id_lookup[drug], study_id_lookup[trial_id], edge_type) for drug, trial_id in edges]
logger.info("> Merging connections of Drugs&Synonyms to Studies Job is Finished with {} edges merged".format(len(edge_ids)))
else:
logger.warning("No Neo4j ID information is available for merging connections of Drugs&Synonyms to Studies")
def merge_url(self,urls:list):
node_merging_func = self._merge_node
with self._driver.session() as session:
logger.info("> Merging Urls Job is Started")
count_node = 0
prev_count_node = 0
for url in urls:
node_type = "Url"
properties:dict = self._parse_url(url)
url_id = session.write_transaction(node_merging_func, node_type, properties)
self.url_and_neo4j_id_pairs[url] = url_id
count_node += 1
if count_node > prev_count_node + 1000:
prev_count_node = count_node
logger.info("> {} nodes already merged".format(count_node))
logger.info("> Merging Url Job is >> Done << with {} nodes merged".format(count_node))
def merge_url_to_study_rels(self,edges:list):
edge_merging_func = self._merge_edge
if self.study_triald_and_neo4j_id_pairs != {} and self.url_and_neo4j_id_pairs != {}:
study_id_lookup:dict = self.study_triald_and_neo4j_id_pairs
url_id_lookup:dict = self.url_and_neo4j_id_pairs
with self._driver.session() as session:
logger.info("> Merging connections of Urls to Studies Job is Started with {} edges to merge".format(len(edges)))
edge_type = "POINTS_AT"
edge_ids:list = [session.write_transaction(edge_merging_func, url_id_lookup[url], study_id_lookup[trial_id], edge_type) for url, trial_id in edges]
logger.info("> Merging connections of Urls to Studies Job is Finished with {} edges merged".format(len(edge_ids)))
else:
logger.warning("No Neo4j ID information is available for Merging connections of Drugs&Synonyms to Studies")
@staticmethod
def _parse_url(url:str):
parsed = urlparse(url)
return {
'tweet_id': '',
'url': url,
'job_id': '',
'job_name': '',
'schema': parsed.scheme,
'netloc': parsed.netloc,
'path': parsed.path,
'params': parsed.params,
'query': parsed.query,
'fragment': parsed.fragment,
'username': parsed.username,
'password': parsed.password,
'hostname': parsed.hostname,
'port': parsed.port,
}