Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Brings Neo4jDataAccess changes in
  • Loading branch information
Barton Rhodes committed Apr 22, 2020
commit 9af0eca76eac8a87e179bf0279b39996bcb72849
170 changes: 109 additions & 61 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,35 @@
import time
import re

import enum

from datetime import datetime
import pandas as pd
from neo4j import GraphDatabase, basic_auth
from urllib.parse import urlparse
import logging

from .DfHelper import DfHelper

logger = logging.getLogger('Neo4jDataAccess')


class Neo4jDataAccess:
class NodeLabel(enum.Enum):
Tweet = 'Tweet'
Url = 'Url'
Account = 'Account'

class RelationshipLabel(enum.Enum):
TWEETED = 'TWEETED'
MENTIONED = 'MENTIONED'
QUOTED = 'QUOTED'
REPLIED = 'REPLIED'
RETWEETED = 'RETWEETED'
INCLUDES = 'INCLUDES'

class RoleType(enum.Enum):
READER = 'reader'
WRITER = 'writer'

def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"):
self.creds = neo4j_creds
Expand All @@ -23,9 +40,9 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
self.batch_size = batch_size
self.tweetsandaccounts = """
UNWIND $tweets AS t
//Add the Tweet
//Add the Tweet
MERGE (tweet:Tweet {id:t.tweet_id})
ON CREATE SET
ON CREATE SET
tweet.text = t.text,
tweet.created_at = t.tweet_created_at,
tweet.favorite_count = t.favorite_count,
Expand All @@ -36,7 +53,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
tweet.hashtags = t.hashtags,
tweet.hydrated = 'FULL',
tweet.type = t.tweet_type
ON MATCH SET
ON MATCH SET
tweet.text = t.text,
tweet.favorite_count = t.favorite_count,
tweet.retweet_count = t.retweet_count,
Expand All @@ -46,10 +63,10 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
tweet.hashtags = t.hashtags,
tweet.hydrated = 'FULL',
tweet.type = t.tweet_type

//Add Account
MERGE (user:Account {id:t.user_id})
ON CREATE SET
MERGE (user:Account {id:t.user_id})
ON CREATE SET
user.id = t.user_id,
user.name = t.name,
user.screen_name = t.user_screen_name,
Expand All @@ -61,7 +78,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.record_created_at = timestamp(),
user.job_name = t.job_name,
user.job_id = t.job_id
ON MATCH SET
ON MATCH SET
user.name = t.user_name,
user.screen_name = t.user_screen_name,
user.followers_count = t.user_followers_count,
Expand All @@ -71,31 +88,31 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.created_at = t.user_created_at,
user.record_updated_at = timestamp(),
user.job_name = t.job_name,
user.job_id = t.job_id
user.job_id = t.job_id

//Add Reply to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='REPLY' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.reply_tweet_id})
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='REPLY' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.reply_tweet_id})
ON CREATE SET retweet.id=t.reply_tweet_id,
retweet.record_created_at = timestamp(),
retweet.job_name = t.job_name,
retweet.job_id = t.job_id,
retweet.hydrated = 'PARTIAL'
)

//Add QUOTE_RETWEET to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='QUOTE_RETWEET' THEN [1] ELSE [] END |
MERGE (quoteTweet:Tweet {id:t.quoted_status_id})
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='QUOTE_RETWEET' THEN [1] ELSE [] END |
MERGE (quoteTweet:Tweet {id:t.quoted_status_id})
ON CREATE SET quoteTweet.id=t.quoted_status_id,
quoteTweet.record_created_at = timestamp(),
quoteTweet.job_name = t.job_name,
quoteTweet.job_id = t.job_id,
quoteTweet.hydrated = 'PARTIAL'
)

//Add RETWEET to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='RETWEET' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.retweet_id})
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='RETWEET' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.retweet_id})
ON CREATE SET retweet.id=t.retweet_id,
retweet.record_created_at = timestamp(),
retweet.job_name = t.job_name,
Expand All @@ -106,70 +123,70 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"

self.tweeted_rel = """UNWIND $tweets AS t
MATCH (user:Account {id:t.user_id})
MATCH (tweet:Tweet {id:t.tweet_id})
OPTIONAL MATCH (replied:Tweet {id:t.reply_tweet_id})
OPTIONAL MATCH (quoteTweet:Tweet {id:t.quoted_status_id})
OPTIONAL MATCH (retweet:Tweet {id:t.retweet_id})
MATCH (tweet:Tweet {id:t.tweet_id})
OPTIONAL MATCH (replied:Tweet {id:t.reply_tweet_id})
OPTIONAL MATCH (quoteTweet:Tweet {id:t.quoted_status_id})
OPTIONAL MATCH (retweet:Tweet {id:t.retweet_id})
WITH user, tweet, replied, quoteTweet, retweet

MERGE (user)-[r:TWEETED]->(tweet)
FOREACH(ignoreMe IN CASE WHEN tweet.type='REPLY' AND replied.id>0 THEN [1] ELSE [] END |

FOREACH(ignoreMe IN CASE WHEN tweet.type='REPLY' AND replied.id>0 THEN [1] ELSE [] END |
MERGE (tweet)-[:REPLYED]->(replied)
)
FOREACH(ignoreMe IN CASE WHEN tweet.type='QUOTE_RETWEET' AND quoteTweet.id>0 THEN [1] ELSE [] END |

FOREACH(ignoreMe IN CASE WHEN tweet.type='QUOTE_RETWEET' AND quoteTweet.id>0 THEN [1] ELSE [] END |
MERGE (tweet)-[:QUOTED]->(quoteTweet)
)
FOREACH(ignoreMe IN CASE WHEN tweet.type='RETWEET' AND retweet.id>0 THEN [1] ELSE [] END |

FOREACH(ignoreMe IN CASE WHEN tweet.type='RETWEET' AND retweet.id>0 THEN [1] ELSE [] END |
MERGE (tweet)-[:RETWEETED]->(retweet)
)

"""

self.mentions = """UNWIND $mentions AS t
self.mentions = """UNWIND $mentions AS t
MATCH (tweet:Tweet {id:t.tweet_id})
MERGE (user:Account {id:t.user_id})
ON CREATE SET
MERGE (user:Account {id:t.user_id})
ON CREATE SET
user.id = t.user_id,
user.mentioned_name = t.name,
user.mentioned_screen_name = t.user_screen_name,
user.record_created_at = timestamp(),
user.job_name = t.job_name,
user.job_id = t.job_id
WITH user, tweet
MERGE (tweet)-[:MENTIONED]->(user)
WITH user, tweet
MERGE (tweet)-[:MENTIONED]->(user)
"""

self.urls = """UNWIND $urls AS t
self.urls = """UNWIND $urls AS t
MATCH (tweet:Tweet {id:t.tweet_id})
MERGE (url:Url {full_url:t.url})
ON CREATE SET
MERGE (url:Url {full_url:t.url})
ON CREATE SET
url.full_url = t.url,
url.job_name = t.job_name,
url.job_id = t.job_id,
url.record_created_at = timestamp(),
url.schema=t.scheme,
url.netloc=t.netloc,
url.path=t.path,
url.params=t.params,
url.query=t.query,
url.fragment=t.fragment,
url.username=t.username,
url.password=t.password,
url.hostname=t.hostname,
url.schema=t.scheme,
url.netloc=t.netloc,
url.path=t.path,
url.params=t.params,
url.query=t.query,
url.fragment=t.fragment,
url.username=t.username,
url.password=t.password,
url.hostname=t.hostname,
url.port=t.port
WITH url, tweet
MERGE (tweet)-[:INCLUDES]->(url)
WITH url, tweet
MERGE (tweet)-[:INCLUDES]->(url)
"""

self.fetch_tweet_status = """UNWIND $ids AS i
self.fetch_tweet_status = """UNWIND $ids AS i
MATCH (tweet:Tweet {id:i.id})
RETURN tweet.id, tweet.hydrated
"""

self.fetch_tweet = """UNWIND $ids AS i
self.fetch_tweet = """UNWIND $ids AS i
MATCH (tweet:Tweet {id:i.id})
RETURN tweet
"""
Expand All @@ -193,18 +210,25 @@ def __get_neo4j_graph(self, role_type):
self.graph = None
return self.graph

def get_from_neo(self, cypher, limit=1000):
def get_neo4j_graph(self, role_type: RoleType):
if not isinstance(role_type, self.RoleType):
raise TypeError('The role_type parameter is not of type RoleType')
return self.__get_neo4j_graph(role_type.value)

def get_from_neo(self, cypher: str, limit=1000):
graph = self.__get_neo4j_graph('reader')
# If the limit isn't set in the traversal then add it
if not re.search('LIMIT', cypher, re.IGNORECASE):
# If the limit isn't set in the traversal, and it isn't None, then add it
if limit and not re.search('LIMIT', cypher, re.IGNORECASE):
cypher = cypher + " LIMIT " + str(limit)
with graph.session() as session:
result = session.run(cypher, timeout=self.timeout)
df = pd.DataFrame([dict(record) for record in result])
rdf = df.head(limit)
return rdf
if not limit:
return df
else:
return df.head(limit)

def get_tweet_by_id(self, df, cols=[]):
def get_tweet_by_id(self, df: pd.DataFrame, cols=[]):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
Expand All @@ -228,17 +252,41 @@ def get_tweet_by_id(self, df, cols=[]):
pdf = pdf.append(props, ignore_index=True)
return pdf
else:
logging.debug('df columns %s', df.columns)
raise Exception(
raise TypeError(
'Parameter df must be a DataFrame with a column named "id" ')

def save_parquet_df_to_graph(self, df, job_name, job_id=None):
def save_enrichment_df_to_graph(self, label: NodeLabel, df: pd.DataFrame, job_name: str, job_id=None):
if not isinstance(label, self.NodeLabel):
raise TypeError('The label parameter is not of type NodeType')

if not isinstance(df, pd.DataFrame):
raise TypeError(
'The df parameter is not of type Pandas.DataFrame')

idColName = 'full_url' if label == self.NodeLabel.Url else 'id'
statement = 'UNWIND $rows AS t'
statement += ' MERGE (n:' + label.value + \
' {' + idColName + ':t.' + idColName + '}) ' + \
' SET '

props = []
for column in df:
if not column == idColName:
props.append(f' n.{column} = t.{column} ')

statement += ','.join(props)
graph = self.__get_neo4j_graph('writer')
with graph.session() as session:
result = session.run(
statement, rows=df.to_dict(orient='records'), timeout=self.timeout)

def save_parquet_df_to_graph(self, df: pd.DataFrame, job_name: str, job_id=None):
pdf = DfHelper().normalize_parquet_dataframe(df)
logging.info('Saving to Neo4j')
self.__save_df_to_graph(pdf, job_name)

# Get the status of a DataFrame of Tweets by id. Returns a dataframe with the hydrated status
def get_tweet_hydrated_status_by_id(self, df):
def get_tweet_hydrated_status_by_id(self, df: pd.DataFrame):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
Expand Down Expand Up @@ -393,4 +441,4 @@ def __parse_urls(self, row, url_params, job_name, job_id=None):
logging.error(inst.args) # arguments stored in .args
# __str__ allows args to be printed directly,
logging.error(inst)
return url_params
return url_params