Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 65 additions & 16 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
tweet.favorite_count = t.favorite_count,
tweet.retweet_count = t.retweet_count,
tweet.record_created_at = timestamp(),
tweet.job_name = t.job_name,
tweet.job_id = t.job_id,
tweet.hashtags = t.hashtags,
tweet.hydrated = 'FULL',
Expand All @@ -38,6 +39,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
tweet.favorite_count = t.favorite_count,
tweet.retweet_count = t.retweet_count,
tweet.record_updated_at = timestamp(),
tweet.job_name = t.job_name,
tweet.job_id = t.job_id,
tweet.hashtags = t.hashtags,
tweet.hydrated = 'FULL',
Expand All @@ -55,7 +57,8 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
user.user_profile_image_url = t.user_profile_image_url,
user.created_at = t.user_created_at,
user.record_created_at = timestamp(),
user.job_name = t.job_id
user.job_name = t.job_name,
user.job_id = t.job_id
ON MATCH SET
user.name = t.user_name,
user.screen_name = t.user_screen_name,
Expand All @@ -65,13 +68,15 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
user.location = t.user_location,
user.created_at = t.user_created_at,
user.record_updated_at = timestamp(),
user.job_name = t.job_id
user.job_name = t.job_name,
user.job_id = t.job_id

//Add Reply to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='REPLY' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.reply_tweet_id})
ON CREATE SET retweet.id=t.reply_tweet_id,
retweet.record_created_at = timestamp(),
retweet.job_name = t.job_name,
retweet.job_id = t.job_id,
retweet.hydrated = 'PARTIAL'
)
Expand All @@ -81,6 +86,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
MERGE (quoteTweet:Tweet {id:t.quoted_status_id})
ON CREATE SET quoteTweet.id=t.quoted_status_id,
quoteTweet.record_created_at = timestamp(),
quoteTweet.job_name = t.job_name,
quoteTweet.job_id = t.job_id,
quoteTweet.hydrated = 'PARTIAL'
)
Expand All @@ -90,6 +96,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
MERGE (retweet:Tweet {id:t.retweet_id})
ON CREATE SET retweet.id=t.retweet_id,
retweet.record_created_at = timestamp(),
retweet.job_name = t.job_name,
retweet.job_id = t.job_id,
retweet.hydrated = 'PARTIAL'
)
Expand Down Expand Up @@ -127,7 +134,8 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
user.mentioned_name = t.name,
user.mentioned_screen_name = t.user_screen_name,
user.record_created_at = timestamp(),
user.job_name = t.job_id
user.job_name = t.job_name,
user.job_id = t.job_id
WITH user, tweet
MERGE (tweet)-[:MENTIONED]->(user)
"""
Expand All @@ -137,7 +145,8 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
MERGE (url:Url {full_url:t.url})
ON CREATE SET
url.full_url = t.url,
url.job_name = t.job_id,
url.job_name = t.job_name,
url.job_id = t.job_id,
url.record_created_at = timestamp(),
url.schema=t.scheme,
url.netloc=t.netloc,
Expand All @@ -158,6 +167,11 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000):
RETURN tweet.id, tweet.hydrated
"""

self.fetch_tweet = """UNWIND $ids AS i
MATCH (tweet:Tweet {id:i.id})
RETURN tweet
"""

def __get_neo4j_graph(self, role_type):
creds = None
logging.debug('role_type: %s', role_type)
Expand All @@ -176,13 +190,44 @@ def __get_neo4j_graph(self, role_type):
self.graph = None
return self.graph

def save_parquet_df_to_graph(self, df, job_name):
def get_from_neo(self, cypher, limit=100):
graph = self.__get_neo4j_graph('reader')
df = graph.run(cypher).to_data_frame()
return df.head(limit)

def get_tweet_by_id(self, df, cols=[]):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
res = graph.run(self.fetch_tweet, ids=ids).to_data_frame()

logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
pdf = pd.DataFrame()
for r in res.iterrows():
props = {}
for k in r[1]['tweet'].keys():
if cols:
if k in cols:
props.update({k: r[1]['tweet'][k]})
else:
props.update({k: r[1]['tweet'][k]})
pdf = pdf.append(props, ignore_index=True)
return pdf
else:
logging.debug('df columns %s', df.columns)
raise Exception(
'Parameter df must be a DataFrame with a column named "id" ')

def save_parquet_df_to_graph(self, df, job_name, job_id=None):
pdf = DfHelper().normalize_parquet_dataframe(df)
logging.info('Saving to Neo4j')
self.__save_df_to_graph(pdf, job_name)

# Get the status of a DataFrame of Tweets by id. Returns a dataframe with the hydrated status
def get_tweet_hydrated_status_by_id(self, df, job_name='generic_job'):
def get_tweet_hydrated_status_by_id(self, df):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
Expand All @@ -206,7 +251,7 @@ def get_tweet_hydrated_status_by_id(self, df, job_name='generic_job'):
'Parameter df must be a DataFrame with a column named "id" ')

# This saves the User and Tweet data right now
def __save_df_to_graph(self, df, job_name):
def __save_df_to_graph(self, df, job_name, job_id=None):
graph = self.__get_neo4j_graph('writer')
global_tic = time.perf_counter()
params = []
Expand All @@ -230,7 +275,8 @@ def __save_df_to_graph(self, df, job_name):
'favorite_count': row['favorite_count'],
'retweet_count': row['retweet_count'],
'tweet_type': tweet_type,
'job_id': job_name,
'job_id': job_id,
'job_name': job_name,
'hashtags': self.__normalize_hashtags(row['hashtags']),
'user_id': row['user_id'],
'user_name': row['user_name'],
Expand All @@ -251,7 +297,8 @@ def __save_df_to_graph(self, df, job_name):

# if there are urls then populate the url_params
if row['urls']:
url_params = self.__parse_urls(row, url_params, job_name)
url_params = self.__parse_urls(
row, url_params, job_name, job_id)
# if there are user_mentions then populate the mentions_params
if row['user_mentions']:
for m in row['user_mentions']:
Expand All @@ -260,13 +307,14 @@ def __save_df_to_graph(self, df, job_name):
'user_id': m['id'],
'user_name': m['name'],
'user_screen_name': m['screen_name'],
'job_id': job_name,
'job_id': job_id,
'job_name': job_name,
})
if index % self.batch_size == 0 and index > 0:
self.__write_to_neo(params, url_params, mention_params)
toc = time.perf_counter()
logging.info(
f"Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds")
f'Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds')
params = []
mention_params = []
url_params = []
Expand Down Expand Up @@ -302,14 +350,15 @@ def __normalize_hashtags(self, value):
else:
return None

def __parse_urls(self, row, url_params, job_name):
def __parse_urls(self, row, url_params, job_name, job_id=None):
for u in row['urls']:
try:
parsed = urlparse(u['expanded_url'])
url_params.append({
'tweet_id': row['status_id'],
'url': u['expanded_url'],
'job_id': job_name,
'job_id': job_id,
'job_name': job_name,
'schema': parsed.scheme,
'netloc': parsed.netloc,
'path': parsed.path,
Expand All @@ -322,8 +371,8 @@ def __parse_urls(self, row, url_params, job_name):
'port': parsed.port,
})
except Exception as inst:
print(type(inst)) # the exception instance
print(inst.args) # arguments stored in .args
logging.error(type(inst)) # the exception instance
logging.error(inst.args) # arguments stored in .args
# __str__ allows args to be printed directly,
print(inst)
logging.error(inst)
return url_params