Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,4 @@ static
# Env files
.DominoEnv/
.vscode/
data/
48 changes: 31 additions & 17 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import ast
import json
import time
import re

from datetime import datetime
import pandas as pd
from py2neo import Graph
from neo4j import GraphDatabase, basic_auth
from urllib.parse import urlparse
import logging

Expand All @@ -15,9 +16,10 @@

class Neo4jDataAccess:

def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"):
self.creds = neo4j_creds
self.debug = debug
self.timeout = timeout
self.batch_size = batch_size
self.tweetsandaccounts = """
UNWIND $tweets AS t
Expand Down Expand Up @@ -184,25 +186,34 @@ def __get_neo4j_graph(self, role_type):
if len(res):
logging.debug("creds %s", res)
creds = res[0]["creds"]
self.graph = Graph(
host=creds['host'], port=creds['port'], user=creds['user'], password=creds['password'])
uri = f'bolt://{creds["host"]}:{creds["port"]}'
self.graph = GraphDatabase.driver(
uri, auth=basic_auth(creds['user'], creds['password']), encrypted=False)
else:
self.graph = None
return self.graph

def get_from_neo(self, cypher, limit=100):
def get_from_neo(self, cypher, limit=1000):
graph = self.__get_neo4j_graph('reader')
df = graph.run(cypher).to_data_frame()
return df.head(limit)
# If the limit isn't set in the traversal then add it
if not re.search('LIMIT', cypher, re.IGNORECASE):
cypher = cypher + " LIMIT " + str(limit)
with graph.session() as session:
result = session.run(cypher, timeout=self.timeout)
df = pd.DataFrame([dict(record) for record in result])
rdf = df.head(limit)
return rdf

def get_tweet_by_id(self, df, cols=[]):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
res = graph.run(self.fetch_tweet, ids=ids).to_data_frame()

with graph.session() as session:
result = session.run(
self.fetch_tweet, ids=ids, timeout=self.timeout)
res = pd.DataFrame([dict(record) for record in result])
logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
pdf = pd.DataFrame()
Expand Down Expand Up @@ -233,8 +244,9 @@ def get_tweet_hydrated_status_by_id(self, df):
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
res = graph.run(self.fetch_tweet_status, ids=ids).to_data_frame()

with graph.session() as session:
result = session.run(self.fetch_tweet_status, ids=ids)
res = pd.DataFrame([dict(record) for record in result])
logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
if len(res) == 0:
Expand Down Expand Up @@ -327,12 +339,14 @@ def __save_df_to_graph(self, df, job_name, job_id=None):

def __write_to_neo(self, params, url_params, mention_params):
try:
tx = self.graph.begin(autocommit=False)
tx.run(self.tweetsandaccounts, tweets=params)
tx.run(self.tweeted_rel, tweets=params)
tx.run(self.mentions, mentions=mention_params)
tx.run(self.urls, urls=url_params)
tx.commit()
with self.graph.session() as session:
session.run(self.tweetsandaccounts,
tweets=params, timeout=self.timeout)
session.run(self.tweeted_rel, tweets=params,
timeout=self.timeout)
session.run(self.mentions, mentions=mention_params,
timeout=self.timeout)
session.run(self.urls, urls=url_params, timeout=self.timeout)
except Exception as inst:
logging.error('Neo4j Transaction error')
logging.error(type(inst)) # the exception instance
Expand Down
65 changes: 65 additions & 0 deletions test_neo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pandas as pd
import logging
from modules.Neo4jDataAccess import Neo4jDataAccess

df = pd.DataFrame({'id': [1219746154621165568,
1219746203652456448,
1219746235038474243,
1219746508955967488,
1219746544955453441]
})
# DEBUG, INFO, WARNING, ERROR, CRITICAL
logging.getLogger().setLevel(logging.WARNING)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_colwidth', None)


# Test save_parquet_df_to_graph
print('----')
print('Testing Parquet Save')
tdf = pd.read_parquet(
'./data/2020_03_22_02_b1.snappy2.parquet', engine='pyarrow')
Neo4jDataAccess().save_parquet_df_to_graph(tdf, 'dave')
print('----')

# Test get_tweet_hydrated_status_by_id
print('----')
print('Testing get_tweet_hydrated_status_by_id')
df = Neo4jDataAccess().get_tweet_hydrated_status_by_id(df)
print(df)

# Test get_tweet_by_id
print('----')
print('Testing get_tweet_by_id')
df = Neo4jDataAccess().get_tweet_by_id(df)
print(df)
print('----')
print('Testing get_tweet_by_id for cols')
df = Neo4jDataAccess().get_tweet_by_id(
df, cols=['id', 'job_name', 'created_at'])
print('Column check: ' + df.columns)
print('----')

# Test get_from_neo
print('----')
print('Testing get_from_neo for Limit 1')
df = Neo4jDataAccess().get_from_neo(
'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text LIMIT 5')
print(df)
print('----')

print('----')
print('Testing get_from_neo for Limit 1')
df = Neo4jDataAccess().get_from_neo(
'MATCH (n:Tweet) WHERE n.hydrated=\'FULL\' RETURN n.id, n.text LIMIT 5', limit=1)
print(df)
print('----')

print('----')
print('Testing get_from_neo with limit only')
df = Neo4jDataAccess().get_from_neo(
'MATCH (n:Tweet) RETURN n', limit=1)
print(df)
print('----')

print('Done')