Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
82a8994
Add files via upload
webcoderz Aug 6, 2020
3ebadee
Add files via upload
webcoderz Aug 25, 2020
1c29387
Delete Twint.py
webcoderz Aug 25, 2020
72a89bc
Add files via upload
webcoderz Aug 25, 2020
0375220
Update TwintPool.py
webcoderz Aug 25, 2020
8022be8
Update Neo4jDataAccess.py
webcoderz Aug 25, 2020
5dfff0c
Update Neo4jDataAccess.py
webcoderz Aug 25, 2020
b8716fc
Update Neo4jDataAccess.py
webcoderz Aug 25, 2020
18208d0
Update TwintPool.py
webcoderz Aug 26, 2020
728be4d
Update Neo4jDataAccess.py
webcoderz Aug 26, 2020
3444bd7
Update Neo4jDataAccess.py
webcoderz Aug 26, 2020
ffd8380
Update Neo4jDataAccess.py
webcoderz Aug 26, 2020
2592c86
Update Neo4jDataAccess.py
webcoderz Aug 26, 2020
3266160
Update Neo4jDataAccess.py
webcoderz Aug 26, 2020
03eed35
Update Neo4jDataAccess.py
webcoderz Aug 26, 2020
a763035
refining transforms and url//mention parser and combining all into on…
webcoderz Aug 27, 2020
26ed14e
refining transforms and url//mention parser and combining all into on…
webcoderz Aug 27, 2020
b447d3d
refining transforms and url//mention parser and combining all into on…
webcoderz Aug 27, 2020
c0f7abb
mention df user screen name changed to user screen name mentioned for…
webcoderz Aug 27, 2020
5b9c54b
mention parser fix to check for value
webcoderz Aug 27, 2020
7722c16
mention parser fix
webcoderz Aug 27, 2020
f38aabe
mention parser fix
webcoderz Aug 28, 2020
8a049bf
mention parser fix
webcoderz Aug 28, 2020
8cb94d3
writer/checker improvement
webcoderz Aug 28, 2020
37dd17f
writer/checker improvement
webcoderz Aug 28, 2020
3e94537
adding base code in with twint stuff.
webcoderz Aug 28, 2020
d6d8d2e
adding base code in with twint stuff.
webcoderz Aug 28, 2020
e131944
fixes
webcoderz Aug 28, 2020
ea59846
fixes
webcoderz Aug 28, 2020
6e7af81
fixes
webcoderz Aug 28, 2020
dc79b9c
writer fixes
webcoderz Aug 28, 2020
38891c7
moved checker to twint pool
webcoderz Aug 28, 2020
8ae31dc
batch twint writer
webcoderz Aug 28, 2020
3bcf695
batch twint writer
webcoderz Aug 28, 2020
0a5af00
batch twint writer
webcoderz Aug 28, 2020
0d7a113
batch twint writer
webcoderz Aug 28, 2020
7d2146d
added writer into save_twintdf_to_neo
webcoderz Aug 28, 2020
00ecf12
added writer into save_twintdf_to_neo
webcoderz Aug 29, 2020
731b56a
neo access into check hydrate func
webcoderz Aug 29, 2020
4f4bda6
neo access into check hydrate func
webcoderz Aug 29, 2020
a2ff5fc
neo access into check hydrate func
webcoderz Aug 29, 2020
21d6c60
neo access into check hydrate func
webcoderz Aug 29, 2020
8abdaf0
removed antipattern from writer
webcoderz Aug 29, 2020
4f96717
removed antipattern from writer
webcoderz Aug 29, 2020
d5ef5bf
removed antipattern from writer
webcoderz Aug 29, 2020
86660ac
removed antipattern from writer
webcoderz Aug 29, 2020
d01ee54
removed antipattern from writer
webcoderz Aug 29, 2020
ce0a846
removed antipattern from writer
webcoderz Aug 29, 2020
4c3b8b9
removed antipattern from writer
webcoderz Aug 29, 2020
18d6f62
writer fix
webcoderz Aug 29, 2020
02d5576
fixes
webcoderz Aug 29, 2020
4ead80e
fixes
webcoderz Aug 29, 2020
be81682
fixes
webcoderz Aug 29, 2020
97250f4
fixes
webcoderz Aug 29, 2020
be70e4d
added fxs for url and tweet enrichment dfs for format for save enrich…
webcoderz Aug 29, 2020
1fec64a
added fxs for url and tweet enrichment dfs for format for save enrich…
webcoderz Aug 29, 2020
0b7da2c
fixes for session to accept mentions params as a list
webcoderz Aug 29, 2020
887a471
fixes for session to accept mentions params as a list
webcoderz Aug 29, 2020
d12defb
fixes for session to accept mentions params as df.stack()
webcoderz Aug 29, 2020
6a53a36
fixes for session to accept mentions params as df.stack()
webcoderz Aug 29, 2020
c2c0209
fixes for session to accept mentions params as df.stack()
webcoderz Aug 29, 2020
8de17f9
fixes for session to accept mentions params as df.stack()
webcoderz Aug 29, 2020
7db3453
fixes for session to accept mentions params as df.stack()
webcoderz Aug 29, 2020
d38b875
fixes for session to accept mentions params as df.stack()
webcoderz Aug 29, 2020
e261299
cleanup
webcoderz Aug 30, 2020
1f8099a
cleanup
webcoderz Aug 30, 2020
c07b519
cleanup
webcoderz Aug 30, 2020
6dd9d06
cleanup
webcoderz Aug 30, 2020
4a1bfb2
cleanup
webcoderz Aug 30, 2020
1d6f613
cleanup
webcoderz Aug 30, 2020
80fed48
removed antipattern mentions parser
webcoderz Aug 30, 2020
7babff0
removed antipattern mentions parser
webcoderz Aug 30, 2020
0c60471
removed antipattern mentions parser
webcoderz Aug 30, 2020
c78211b
self.graph.session() to graph.session() in twint writer,
webcoderz Aug 30, 2020
d8a3afe
twint writer fix,
webcoderz Aug 30, 2020
fb62b3b
twint writer fix,
webcoderz Aug 31, 2020
b62a9aa
twint writer fix,
webcoderz Aug 31, 2020
b8712c5
twint writer fix,
webcoderz Aug 31, 2020
d2b8477
twint writer fix,
webcoderz Aug 31, 2020
6cef783
proxy config to none
webcoderz Aug 31, 2020
7e16b14
proxy config to none
webcoderz Aug 31, 2020
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 186 additions & 12 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import time
import re
from datetime import datetime

import enum

Expand All @@ -11,7 +12,7 @@
from urllib.parse import urlparse
import logging
from .DfHelper import DfHelper

from .TwintPool import TwintPool
logger = logging.getLogger('Neo4jDataAccess')


Expand Down Expand Up @@ -63,7 +64,6 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
tweet.hashtags = t.hashtags,
tweet.hydrated = 'FULL',
tweet.type = t.tweet_type

//Add Account
MERGE (user:Account {id:t.user_id})
ON CREATE SET
Expand All @@ -77,6 +77,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.created_at = t.user_created_at,
user.record_created_at = timestamp(),
user.job_name = t.job_name,
user.hydrated = 'FULL',
user.job_id = t.job_id
ON MATCH SET
user.name = t.user_name,
Expand All @@ -88,8 +89,8 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.created_at = t.user_created_at,
user.record_updated_at = timestamp(),
user.job_name = t.job_name,
user.hydrated = 'FULL',
user.job_id = t.job_id

//Add Reply to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='REPLY' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.reply_tweet_id})
Expand All @@ -99,7 +100,6 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
retweet.job_id = t.job_id,
retweet.hydrated = 'PARTIAL'
)

//Add QUOTE_RETWEET to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='QUOTE_RETWEET' THEN [1] ELSE [] END |
MERGE (quoteTweet:Tweet {id:t.quoted_status_id})
Expand All @@ -109,7 +109,6 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
quoteTweet.job_id = t.job_id,
quoteTweet.hydrated = 'PARTIAL'
)

//Add RETWEET to tweets if needed
FOREACH(ignoreMe IN CASE WHEN t.tweet_type='RETWEET' THEN [1] ELSE [] END |
MERGE (retweet:Tweet {id:t.retweet_id})
Expand All @@ -128,21 +127,16 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
OPTIONAL MATCH (quoteTweet:Tweet {id:t.quoted_status_id})
OPTIONAL MATCH (retweet:Tweet {id:t.retweet_id})
WITH user, tweet, replied, quoteTweet, retweet

MERGE (user)-[r:TWEETED]->(tweet)

FOREACH(ignoreMe IN CASE WHEN tweet.type='REPLY' AND replied.id>0 THEN [1] ELSE [] END |
MERGE (tweet)-[:REPLYED]->(replied)
)

FOREACH(ignoreMe IN CASE WHEN tweet.type='QUOTE_RETWEET' AND quoteTweet.id>0 THEN [1] ELSE [] END |
MERGE (tweet)-[:QUOTED]->(quoteTweet)
)

FOREACH(ignoreMe IN CASE WHEN tweet.type='RETWEET' AND retweet.id>0 THEN [1] ELSE [] END |
MERGE (tweet)-[:RETWEETED]->(retweet)
)

"""

self.mentions = """UNWIND $mentions AS t
Expand All @@ -154,6 +148,7 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
user.mentioned_screen_name = t.user_screen_name,
user.record_created_at = timestamp(),
user.job_name = t.job_name,
user.hydrated = 'PARTIAL',
user.job_id = t.job_id
WITH user, tweet
MERGE (tweet)-[:MENTIONED]->(user)
Expand Down Expand Up @@ -191,6 +186,11 @@ def __init__(self, debug=False, neo4j_creds=None, batch_size=2000, timeout="60s"
RETURN tweet
"""

self.fetch_account_status = """UNWIND $ids AS i
MATCH (user:Account {id:i.id})
RETURN user.id, user.hydrated
"""

def __get_neo4j_graph(self, role_type):
creds = None
logging.debug('role_type: %s', role_type)
Expand Down Expand Up @@ -255,7 +255,7 @@ def get_tweet_by_id(self, df: pd.DataFrame, cols=[]):
raise TypeError(
'Parameter df must be a DataFrame with a column named "id" ')

def save_enrichment_df_to_graph(self, label: NodeLabel, df: pd.DataFrame, job_name: str, job_id=None):
def __save_enrichment_df_to_graph(self, label: NodeLabel, df: pd.DataFrame, job_name: str, job_id=None):
if not isinstance(label, self.NodeLabel):
raise TypeError('The label parameter is not of type NodeType')

Expand Down Expand Up @@ -310,6 +310,33 @@ def get_tweet_hydrated_status_by_id(self, df: pd.DataFrame):
raise Exception(
'Parameter df must be a DataFrame with a column named "id" ')

# Get the status of a DataFrame of Tweets by id. Returns a dataframe with the hydrated status

# Get the status of a DataFrame of Account by id. Returns a dataframe with the hydrated status
def get_account_hydrated_status_by_id(self, df: pd.DataFrame):
if 'id' in df:
graph = self.__get_neo4j_graph('reader')
ids = []
for index, row in df.iterrows():
ids.append({'id': int(row['id'])})
with graph.session() as session:
result = session.run(self.fetch_account_status, ids=ids)
res = pd.DataFrame([dict(record) for record in result])
logging.debug('Response info: %s rows, %s columns: %s' %
(len(res), len(res.columns), res.columns))
if len(res) == 0:
return df[['id']].assign(hydrated=None)
else:
res = res.rename(
columns={'user.id': 'id', 'user.hydrated': 'hydrated'})
# ensures hydrated=None if Neo4j does not answer for id
res = df[['id']].merge(res, how='left', on='id')
return res
else:
logging.debug('df columns %s', df.columns)
raise Exception(
'Parameter df must be a DataFrame with a column named "id" ')

# This saves the User and Tweet data right now
def __save_df_to_graph(self, df, job_name, job_id=None):
graph = self.__get_neo4j_graph('writer')
Expand Down Expand Up @@ -442,4 +469,151 @@ def __parse_urls(self, row, url_params, job_name, job_id=None):
logging.error(inst.args) # arguments stored in .args
# __str__ allows args to be printed directly,
logging.error(inst)
return url_params
return url_params

def save_twintdf_to_neo(self, df, job_name, job_id=None):
df = TwintPool().twint_df_to_neo4j_df(df)
df.drop(df.columns[df.columns.str.contains('unnamed', case=False)], axis=1, inplace=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnamed suggests there is a bug before this line is reached..

# df=df.stack().droplevel(level=0)
graph = self.__get_neo4j_graph('writer')
global_tic = time.perf_counter()
params = []
mention_params = []
url_params = []
tic = time.perf_counter()
logger.debug('df columns %s', df.columns)
for index, row in df.iterrows():
# determine the type of tweet
tweet_type = 'TWEET'
if row['tweet_type_twint']:
tweet_type = row['tweet_type_twint']
elif row["in_reply_to_status_id"] is not None and row["in_reply_to_status_id"] > 0:
tweet_type = "REPLY"
elif "quoted_status_id" in row and row["quoted_status_id"] is not None and row["quoted_status_id"] > 0:
tweet_type = "QUOTE_RETWEET"
elif "retweet_id" in row and row["retweet_id"] is not None and row["retweet_id"] > 0:
tweet_type = "RETWEET"
try:
params.append(pd.DataFrame([{'id': int(row['status_id']),
'text': row['full_text'],
'created_at': str(pd.to_datetime(row['created_at'])),
'favorite_count': row['favorite_count'],
'retweet_count': row['retweet_count'],
'type': tweet_type,
'job_id': job_id,
'job_name': job_name,
'hashtags': self.__normalize_hashtags(row['hashtags']),
'user_id': row['user_id'],
'user_name': row['user_name'],
'user_location': row['user_location'] if 'user_location' in row else None,
'user_screen_name': row['user_screen_name'],
'user_followers_count': row[
'user_followers_count'] if 'user_followers_count' in row else None,
'user_friends_count': row['user_friends_count'] if 'user_friends_count' in row else None,
'user_created_at': pd.to_datetime(
df['user_created_at']) if 'user_created_at' in row else None,
'user_profile_image_url': row[
'user_profile_image_url'] if 'user_profile_image_url' in row else None,
'reply_tweet_id': row[
'in_reply_to_status_id'] if 'in_reply_to_status_id' in row else None,
'conversation_id': row['conversation_id'] if 'conversation_id' in row else None,
'quoted_status_id': row['quoted_status_id'] if 'quoted_status_id' in row else None,
'retweet_id': row['retweet_id'] if 'retweet_id' in row else None,
'geo': row['geo'] if 'geo' in row else None,
}]))

except Exception as e:
logger.error('params.append exn', e)
logger.error('row', row)
raise e
params_df = self.__tweetdf_to_neodf(pd.concat(params, ignore_index=True, sort=False))
url_df = self.__urldf_to_neodf(self.__parse_urls_twint(df, job_name, job_id))
mention_df = self.__parse_mentions_twint(df, job_name, job_id)
res = {"mentions":mention_df,"urls":url_df,"params":params_df}
self.write_twint_enriched_tweetdf_to_neo(res, job_name, job_id)


def __parse_urls_twint(self, df, job_name, job_id):
counter = 0
url_params_lst = []
try:
for index, row in df.iterrows():
if row["urls"]:
urls = [url for url in row["urls"]]
parsed = urlparse(urls[counter])
url_params_lst.append(pd.DataFrame([{
'id': int(row["status_id"]),
'full_url': urls[counter],
'job_id': job_id,
'job_name': job_name,
'schema': parsed.scheme,
'netloc': parsed.netloc,
'path': parsed.path,
'params': parsed.params,
'query': parsed.query,
'fragment': parsed.fragment,
'username': parsed.username,
'password': parsed.password,
'hostname': parsed.hostname,
'port': parsed.port}]))
except Exception as e:
logging.error('params.append exn', e)
logging.error('row', row)
raise e
url_df=pd.concat(url_params_lst, ignore_index=True, sort=False)
counter += 1
return url_df

def __parse_mentions_twint(self, df, job_name, job_id=None):
counter = 0
mention_lst = []
mentions = [x for x in df['user_mentions'].to_list()]
ids = [i for i in df["status_id"].to_list()]
for m in mentions:
mention_lst.append(pd.DataFrame([{
'id': ids[counter],
'user_screen_name': m,
'job_id': job_id,
'job_name': job_name
}]))
counter += 1
mention_df = pd.concat(mention_lst, ignore_index=True, sort=False)
return mention_df

def __urldf_to_neodf(self,df):
neourldf = df[[ 'id','full_url','job_name','schema','netloc',
'path','params','query','fragment','username',
'password','hostname','port']]
neourldf['record_created at']= str(datetime.now())
return neourldf

def __tweetdf_to_neodf(self, df):
neotweetdf = df[['id','text','created_at','favorite_count','retweet_count',
'job_name','hashtags','type','conversation_id']]
neotweetdf['hydrated'] = 'FULL'
neotweetdf['record_created at'] = str(datetime.now())
return neotweetdf

def write_twint_enriched_tweetdf_to_neo(self, res, job_name, job_id):
graph = self.__get_neo4j_graph('writer')
global_tic = time.perf_counter()
tic = time.perf_counter()
for key in list(res.keys()):
df = res[key]
#if len(df.index) % self.batch_size == 0 and len(df.index) > 0:
try:
if key == 'mentions':
with graph.session() as session:
session.run(self.mentions, mentions=df.to_dict(orient='records') ,timeout=self.timeout)
elif key == 'urls':
self.__save_enrichment_df_to_graph(self.NodeLabel.Url, df, job_name, job_id)
elif key == 'params':
self.__save_enrichment_df_to_graph(self.NodeLabel.Tweet, df, job_name, job_id)
toc = time.perf_counter()
logging.info(f'Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds')
tic = time.perf_counter()
except Exception as inst:
logging.error(type(inst)) # the exception instance
logging.error(inst.args) # arguments stored in .args
# __str__ allows args to be printed directly,
logging.error(inst)
Loading