Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
73adb1b
Merge pull request #10 from TheDataRideAlongs/fix(50m)
lmeyerov Mar 31, 2020
ec478f3
refactor(print): now via Python logger
lmeyerov Apr 1, 2020
eb4e7c6
fix(fh missing cols): handle
lmeyerov Apr 1, 2020
771c1fb
feat(fixed fh arrow schema): and jsonify nested cols bc pq writer can…
lmeyerov Apr 1, 2020
5c1d0ff
Merge pull request #11 from TheDataRideAlongs/dev/fix-fh
lmeyerov Apr 1, 2020
4bab6db
Changed the print() statements for actual pythong logging statements
bechbd Apr 1, 2020
fc851da
Merge pull request #12 from TheDataRideAlongs/logging_fix
bechbd Apr 1, 2020
262e8d8
addig gitignore
007vasy Apr 2, 2020
f2b6543
Merge branch 'master' into Issue-#1
007vasy Apr 2, 2020
c331889
prints changed to logger
007vasy Apr 2, 2020
cecb571
Added methods to get_from_neo and get_tweets_by_id
bechbd Apr 2, 2020
1436b8d
Merge pull request #14 from TheDataRideAlongs/logging_fix
bechbd Apr 2, 2020
207d32c
Use pandas when cudf does not exist
ZiyaoWei Apr 2, 2020
14cb467
Merge pull request #15 from TheDataRideAlongs/wzy/no_gpu
ZiyaoWei Apr 2, 2020
f1426b6
Prototype rehydrate pipeline
ZiyaoWei Apr 2, 2020
f14dffe
Fix bug
ZiyaoWei Apr 2, 2020
325db35
docs(README): add calendar
lmeyerov Apr 2, 2020
c48ef17
Merge branch 'master' into Issue-#1
007vasy Apr 2, 2020
3c24774
Fixed issue with limit on get_from_neo, added timeout parameter, and …
bechbd Apr 2, 2020
a323358
Merge pull request #13 from TheDataRideAlongs/Issue-#1
007vasy Apr 2, 2020
83e7dcd
Merge branch 'master' into switch_python_neo_driver
bechbd Apr 2, 2020
ba2ea00
Merge pull request #17 from TheDataRideAlongs/switch_python_neo_driver
bechbd Apr 2, 2020
83a2294
docs(tightening)
lmeyerov Apr 3, 2020
328c997
Skip when there is no data
ZiyaoWei Apr 2, 2020
b72eb75
Fix logging, add parameter for saving to Neo4j
ZiyaoWei Apr 3, 2020
256df48
Merge pull request #20 from TheDataRideAlongs/wzy/rehydratePipeline
ZiyaoWei Apr 3, 2020
4c2b1ab
docs(issue tracker): link gh projects on README
lmeyerov Apr 3, 2020
a89528a
docs(volunteers): Add legal
lmeyerov Apr 3, 2020
8ec9061
docs(README): project tracker links
lmeyerov Apr 3, 2020
80c8622
Got initial version of the unit tests working for Neo
bechbd Apr 3, 2020
609b95a
Add docker-compose.yml for prefect UI
ZiyaoWei Apr 4, 2020
8e7a88b
Merge pull request #37 from TheDataRideAlongs/wzy/dockerizePrefect
ZiyaoWei Apr 4, 2020
dea4f03
fixed getting interlnational trials, with utf8 encoding
007vasy Apr 6, 2020
55958d2
urls to config
007vasy Apr 6, 2020
51ab12f
removed consol useage from scraping data
007vasy Apr 6, 2020
22d418a
Dockerize pipeline and add instructions
ZiyaoWei Apr 5, 2020
70f196e
Merge pull request #40 from TheDataRideAlongs/wzy/dockerizePipelines
ZiyaoWei Apr 6, 2020
07409e4
confortable neo4j import setup
007vasy Apr 6, 2020
8cf111b
confortable edge inserting into neo4j
007vasy Apr 6, 2020
4e6c3ec
flexible insertion into neo4j
007vasy Apr 6, 2020
a4aa10f
add config
007vasy Apr 6, 2020
f1831f6
Made minor tweaks to get the prefect ui stuff to run correctly on the…
bechbd Apr 7, 2020
4a4970e
Merge pull request #48 from TheDataRideAlongs/update-prefect-ui-files
bechbd Apr 7, 2020
c96adf3
data scraping into class
007vasy Apr 7, 2020
c9c89f1
updated gitignore
007vasy Apr 7, 2020
3f6f5c5
all drugs and synonyms are imported
007vasy Apr 7, 2020
52be179
cleanup
007vasy Apr 7, 2020
6e554ef
refactor
007vasy Apr 7, 2020
7a030d6
filtering international studies
007vasy Apr 7, 2020
31db102
docs(README.md): infra links
lmeyerov Apr 7, 2020
530c087
Merge pull request #52 from TheDataRideAlongs/add_neo_unit_tests
bechbd Apr 8, 2020
9b137f1
Added metrics configuration
bechbd Apr 8, 2020
d73186b
Added metrics configuration
bechbd Apr 8, 2020
0d7e2e5
drug analysis
007vasy Apr 8, 2020
6df6b15
table merging WIP
007vasy Apr 9, 2020
1567927
studies normalized into one table from 2 different sources
007vasy Apr 9, 2020
98c9e19
#39 - Added method to allow for adding enrichment properties to a node
bechbd Apr 9, 2020
2a9afb0
studies to neo4j is done
007vasy Apr 9, 2020
4c17c8c
WF
007vasy Apr 9, 2020
6c4bc2b
study import fix
007vasy Apr 9, 2020
46fdf2b
drug-study links
007vasy Apr 9, 2020
9aa0a13
cudf set up
007vasy Apr 9, 2020
212a9e2
dict to cypher property code merge for make it available for others
007vasy Apr 9, 2020
d3f626e
Merge pull request #56 from TheDataRideAlongs/DictToCypherProperties
bechbd Apr 9, 2020
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,4 @@ static
# Env files
.DominoEnv/
.vscode/
data/
48 changes: 31 additions & 17 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import ast
import json
import time
import re

from datetime import datetime
import pandas as pd
from py2neo import Graph
from neo4j import GraphDatabase, basic_auth
from urllib.parse import urlparse
import logging

Expand All @@ -15,9 +16,10 @@

class Neo4jDataAccess:

def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"):
self.creds = neo4j_creds
self.debug = debug
self.timeout = timeout
self.batch_size = batch_size
self.tweetsandaccounts = """
UNWIND $tweets AS t
Expand Down Expand Up @@ -184,25 +186,34 @@ def __get_neo4j_graph(self, role_type):
if len(res):
logging.debug("creds %s", res)
creds = res[0]["creds"]
self.graph = Graph(
host=creds['host'], port=creds['port'], user=creds['user'], password=creds['password'])
uri = f'bolt://{creds["host"]}:{creds["port"]}'
self.graph = GraphDatabase.driver(
uri, auth=basic_auth(creds['user'], creds['password']), encrypted=False)
else:
self.graph = None
return self.graph

def get_from_neo(self, cypher, limit=100):
def get_from_neo(self, cypher, limit=1000):
graph = self.__get_neo4j_graph('reader')
df = graph.run(cypher).to_data_frame()
return df.head(limit)
# If the limit isn't set in the traversal then add it
if not re.search('LIMIT', cypher, re.IGNORECASE):
cypher = cypher + " LIMIT " + str(limit)
with graph.session() as session:
result = session.run(cypher, timeout=self.timeout)
df = pd.DataFrame([dict(record) for record in result])
rdf = df.head(limit)
return rdf

def get_tweet_by_id(self, df, cols=[]):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
res = graph.run(self.fetch_tweet, ids=ids).to_data_frame()

with graph.session() as session:
result = session.run(
self.fetch_tweet, ids=ids, timeout=self.timeout)
res = pd.DataFrame([dict(record) for record in result])
logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
pdf = pd.DataFrame()
Expand Down Expand Up @@ -233,8 +244,9 @@ def get_tweet_hydrated_status_by_id(self, df):
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
res = graph.run(self.fetch_tweet_status, ids=ids).to_data_frame()

with graph.session() as session:
result = session.run(self.fetch_tweet_status, ids=ids)
res = pd.DataFrame([dict(record) for record in result])
logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
if len(res) == 0:
Expand Down Expand Up @@ -327,12 +339,14 @@ def __save_df_to_graph(self, df, job_name, job_id=None):

def __write_to_neo(self, params, url_params, mention_params):
try:
tx = self.graph.begin(autocommit=False)
tx.run(self.tweetsandaccounts, tweets=params)
tx.run(self.tweeted_rel, tweets=params)
tx.run(self.mentions, mentions=mention_params)
tx.run(self.urls, urls=url_params)
tx.commit()
with self.graph.session() as session:
session.run(self.tweetsandaccounts,
tweets=params, timeout=self.timeout)
session.run(self.tweeted_rel, tweets=params,
timeout=self.timeout)
session.run(self.mentions, mentions=mention_params,
timeout=self.timeout)
session.run(self.urls, urls=url_params, timeout=self.timeout)
except Exception as inst:
logging.error('Neo4j Transaction error')
logging.error(type(inst)) # the exception instance
Expand Down
65 changes: 65 additions & 0 deletions test_neo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pandas as pd
import logging
from modules.Neo4jDataAccess import Neo4jDataAccess

df = pd.DataFrame({'id': [1219746154621165568,
1219746203652456448,
1219746235038474243,
1219746508955967488,
1219746544955453441]
})
# DEBUG, INFO, WARNING, ERROR, CRITICAL
logging.getLogger().setLevel(logging.WARNING)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_colwidth', None)


# Test save_parquet_df_to_graph
print('----')
print('Testing Parquet Save')
tdf = pd.read_parquet(
'./data/2020_03_22_02_b1.snappy2.parquet', engine='pyarrow')
Neo4jDataAccess().save_parquet_df_to_graph(tdf, 'dave')
print('----')

# Test get_tweet_hydrated_status_by_id
print('----')
print('Testing get_tweet_hydrated_status_by_id')
df = Neo4jDataAccess().get_tweet_hydrated_status_by_id(df)
print(df)

# Test get_tweet_by_id
print('----')
print('Testing get_tweet_by_id')
df = Neo4jDataAccess().get_tweet_by_id(df)
print(df)
print('----')
print('Testing get_tweet_by_id for cols')
df = Neo4jDataAccess().get_tweet_by_id(
df, cols=['id', 'job_name', 'created_at'])
print('Column check: ' + df.columns)
print('----')

# Test get_from_neo
print('----')
print('Testing get_from_neo for Limit 1')
df = Neo4jDataAccess().get_from_neo(
'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text LIMIT 5')
print(df)
print('----')

print('----')
print('Testing get_from_neo for Limit 1')
df = Neo4jDataAccess().get_from_neo(
'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text LIMIT 5', limit=1)
print(df)
print('----')

print('----')
print('Testing get_from_neo with limit only')
df = Neo4jDataAccess().get_from_neo(
'MATCH (n:Tweet) RETURN n', limit=1)
print(df)
print('----')

print('Done')