Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d6dcaff
fixing search username function due to twint deprecation
webcoderz Aug 18, 2020
a80fd10
Merge branch 'master' of https://github.com/TheDataRideAlongs/Project…
webcoderz Aug 18, 2020
1952dec
Merge branch 'master' of https://github.com/TheDataRideAlongs/Project…
webcoderz Dec 26, 2020
dace204
user info, replies, and user timeline inline with twint and bugfix fo…
webcoderz Dec 26, 2020
a21425d
user info, replies, and user timeline inline with twint and bugfix fo…
webcoderz Dec 26, 2020
22703c3
user info, replies, and user timeline inline with twint and bugfix fo…
webcoderz Dec 26, 2020
1b25c1d
user info, replies, and user timeline inline with twint and bugfix fo…
webcoderz Dec 26, 2020
7be53b0
user info, replies, and user timeline inline with twint and bugfix fo…
webcoderz Dec 26, 2020
c15740e
changed user_Created_At twint df inferrence inline with twints user c…
webcoderz Dec 27, 2020
89ed4d6
changed user_Created_At twint df inferrence inline with twints user c…
webcoderz Dec 27, 2020
f6f2923
changed user_Created_At twint df inferrence inline with twints user c…
webcoderz Dec 27, 2020
7ea6ca5
adding quo
webcoderz Dec 28, 2020
53367f3
removed user created at from initial conversion
webcoderz Dec 28, 2020
b967545
Update Neo4jDataAccess.py
webcoderz Dec 28, 2020
604e5a6
enrich_user_tl_and_info function to grab a users timeline and info an…
webcoderz Dec 28, 2020
d57ee6d
enrich_user_tl_and_info function to grab a users timeline and info an…
webcoderz Dec 28, 2020
c5d6096
enrich_user_tl_and_info function to grab a users timeline and info an…
webcoderz Dec 28, 2020
dbfa95e
enrich_user_tl_and_info function to grab a users timeline and info an…
webcoderz Dec 28, 2020
08838d4
enrich_user_tl_and_info function to grab a users timeline and info an…
webcoderz Dec 28, 2020
a39bdc5
enrich_user_tl_and_info function to grab a users timeline and info an…
webcoderz Dec 28, 2020
91eeed0
timestamp to date time conversion for neo
webcoderz Dec 28, 2020
8b1ae3d
timestamp to date time conversion for neo
webcoderz Dec 28, 2020
6b81244
timestamp to date time conversion for neo
webcoderz Dec 28, 2020
cebae1f
timestamp to date time conversion for neo
webcoderz Dec 30, 2020
43c2ac8
timeline writer for enrich_user_tl_and_info functionality.
webcoderz Dec 30, 2020
72843ab
timeline writer for enrich_user_tl_and_info functionality.
webcoderz Dec 30, 2020
f6a89b8
timeline writer for enrich_user_tl_and_info functionality.
webcoderz Dec 31, 2020
f5b70c9
timeline writer for enrich_user_tl_and_info functionality.
webcoderz Jan 3, 2021
21cd16f
adding flag to pull retweets
webcoderz Jan 19, 2021
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 119 additions & 6 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,104 @@ def __enrich_usr_info(self, df):
dfs = pd.concat(lst).drop_duplicates(subset=["id"])
return dfs

def enrich_user_tl_and_info(self, username, job_name, job_id, limit, include_profile_fetch=False):
if include_profile_fetch:
twintdftic = time.perf_counter()
df = TwintPool().twint_df_to_neo4j_df(TwintPool()._get_user_timeline(username=username, limit=limit))

df.drop(df.columns[df.columns.str.contains('unnamed', case=False)], axis=1, inplace=True)
# df=df.stack().droplevel(level=0)
global_tic = time.perf_counter()
params = []
tic = time.perf_counter()
logger.debug('df columns %s', df.columns)

logger.error('HERE I AM!')
itertic = time.perf_counter()
for index, row in df.iterrows():
# determine the type of tweet
tweet_type = 'TWEET'
if row['tweet_type_twint']:
tweet_type = row['tweet_type_twint']
elif row["in_reply_to_status_id"] is not None and row["in_reply_to_status_id"] > 0:
tweet_type = "REPLY"
elif "quoted_status_id" in row and row["quoted_status_id"] is not None and row["quoted_status_id"] > 0:
tweet_type = "QUOTE_RETWEET"
elif "retweet_id" in row and row["retweet_id"] is not None and row["retweet_id"] > 0:
tweet_type = "RETWEET"
try:
params.append(pd.DataFrame([{'tweet_id': int(row['status_id']),
'text': row['full_text'],
'created_at': str(pd.to_datetime(row['created_at'])),
'favorite_count': row['favorite_count'],
'retweet_count': row['retweet_count'],
'type': tweet_type,
'job_id': job_id,
'job_name': job_name,
'hashtags': self.__normalize_hashtags(row['hashtags']),
'user_id': row['user_id'],
'user_name': row['user_name'],
'user_location': row[
'user_location'] if 'user_location' in row else None,
'user_screen_name': row['user_screen_name'],
'user_followers_count': row[
'user_followers_count'] if 'user_followers_count' in row else None,
'user_friends_count': row[
'user_friends_count'] if 'user_friends_count' in row else None,
'user_profile_image_url': row[
'user_profile_image_url'] if 'user_profile_image_url' in row else None,
'reply_tweet_id': row[
'in_reply_to_status_id'] if 'in_reply_to_status_id' in row else None,
'conversation_id': row[
'conversation_id'] if 'conversation_id' in row else None,
'quoted_status_id': row[
'quoted_status_id'] if 'quoted_status_id' in row else None,
'retweet_id': row['retweet_id'] if 'retweet_id' in row else None,
'geo': row['geo'] if 'geo' in row else None,
}]))

except Exception as e:
logger.error('params.append exn', e)
logger.error('row', row)
raise e
itertoc = time.perf_counter()
logger.info(f'finished iterating twint df and determining tweet type {itertoc - itertic:0.4f} seconds')
params_df = pd.concat(params, ignore_index=True, sort=False)

# acct info and merge
usr_df = self.__tweetdf_to_neo_account_df(TwintPool()._get_user_info(username=username), job_name=job_name)
params_df['tmp'] = 1
usr_df['tmp'] = 1

params_df = pd.merge(params_df, usr_df, on=['tmp'])
params_df = params_df.drop('tmp', axis=1)
twintdftoc = time.perf_counter()
logger.info(f'finished twint dataframe to neo4j conversion stage 1 {twintdftoc - twintdftic:0.4f} seconds')

# url parser
urltic = time.perf_counter()
url_df = self.__urldf_to_neodf(self.__parse_urls_twint(df, job_name, job_id))
urltoc = time.perf_counter()
logger.info(f'finished parsing urls in: {urltoc - urltic:0.4f} seconds')

# mention parser
menttic = time.perf_counter()
mention_df = self.__parse_mentions_twint(df, job_name, job_id)
menttoc = time.perf_counter()
logger.info(f'finished parsing mentions in: {menttoc - menttic:0.4f} seconds')

# neo write
df["hydrated"] = "FULL"
res = {"mentions": mention_df, "urls": url_df, "params": params_df}
toc = time.perf_counter()
logger.info(f'finished data enrichments in: {toc - tic:0.4f} seconds writing to neo4j now..')
self.write_twint_enriched_tweetdf_to_neo(res, job_name, job_id)

else:

self.save_twintdf_to_neo(TwintPool()._get_user_timeline(username=username, limit=limit), job_name=job_name,
job_id=job_id)

def save_twintdf_to_neo(self, df, job_name, job_id=None):
if (df is None) or (len(df) == 0):
logger.info('Empty df for neo conversion, skip')
Expand Down Expand Up @@ -520,8 +618,6 @@ def save_twintdf_to_neo(self, df, job_name, job_id=None):
'user_followers_count'] if 'user_followers_count' in row else None,
'user_friends_count': row[
'user_friends_count'] if 'user_friends_count' in row else None,
'user_created_at': pd.to_datetime(
df['user_created_at']) if 'user_created_at' in row else None,
'user_profile_image_url': row[
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

join_datetime seems to be in https://github.com/twintproject/twint/blob/master/twint/storage/panda.py#L125 , but the rest of the fields here aren't... surprised this works? but guessing manual tests were fine, so will just note it here in case need to revisit

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i removed it this morning , forgot i did account enrichments at a diff stage in the transformation when i committed this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to do a round trip into neo today on everything

'user_profile_image_url'] if 'user_profile_image_url' in row else None,
'reply_tweet_id': row[
Expand Down Expand Up @@ -667,9 +763,13 @@ def __tweetdf_to_neo_account_df(self, df, job_name):
acctdf['screen_name'] = df['username']
acctdf['friends_count'] = df["following"]
acctdf['followers_count'] = df["followers"]
acctdf['user_created_at'] = df["join_datetime"].apply(lambda n: str(pd.to_datetime(n)))

#acctdf['user_created_at'].apply(lambda n: datetime.fromtimestamp(n))
acctdf['job_name'] = str(job_name)
return acctdf


def write_twint_enriched_tweetdf_to_neo(self, res, job_name, job_id):
graph = self.__get_neo4j_graph('writer')
global_tic = time.perf_counter()
Expand All @@ -693,14 +793,27 @@ def write_twint_enriched_tweetdf_to_neo(self, res, job_name, job_id):
timeout=self.timeout)
logger.debug('writing tweet relationships')
session.run(self.tweeted_rel, tweets=df.to_dict(orient='records'), timeout=self.timeout)
toc = time.perf_counter()
logger.info(f'Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds')
except Exception as inst:
logging.error('Neo4j Transaction error')
toc = time.perf_counter()
logger.info(f'Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds')
except Exception as inst:
logging.error('//////////////')
logging.error('Neo4j Transaction error', exc_info=True)
logging.error(type(inst)) # the exception instance
logging.error(inst.args) # arguments stored in .args
# __str__ allows args to be printed directly,
logging.error(inst)
logging.error('--------------')
logging.error('KEY: %s', key)
if key == 'mentions':
logger.error('MENTIONS: %s', self.mentions)
logger.error('df_with_mentions: %s', df_with_mentions)
elif key == 'urls':
logger.error('URL: %s', self.NodeLabel.Url)
elif key == 'params':
logger.error('tweetsandaccounts: %s', self.tweetsandaccounts)
logger.error('tweeted_rel: %s', self.tweeted_rel)
logging.error('df: %s', df)
logging.error('//////////////')
raise inst


Expand Down
30 changes: 23 additions & 7 deletions modules/TwintPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, is_tor=False):
self.config.Verified = None
self.config.Username = None
self.config.User_full = None
self.config.Retweets = True
if is_tor:
self.config.Proxy_host = 'localhost'
self.config.Proxy_port = "9050"
Expand Down Expand Up @@ -71,18 +72,31 @@ def _get_term(self, Search="IngSoc", Since="1984-04-20 13:00:00", Until="1984-04
toc = time.perf_counter()
logger.info(f'finished searching for tweets in: {toc - tic:0.4f} seconds')

def _get_timeline(self, username, limit):

def _get_Replies_to_user(self,username, limit):
self.config.Retweets = True
self.config.Search = "from:" + username
self.config.Search = username
self.config.Limit = limit
self.config.Pandas = True
self.config.Hide_output = True
twint.run.Search(self.config)
tweets_df = twint.storage.panda.Tweets_df
return tweets_df
return twint.storage.panda.Tweets_df

def _get_user_timeline(self,username,limit):
self.config.Profile_full = True
self.config.Retweets = True
self.config.Limit = limit
self.config.Username = username
self.config.Pandas = True
self.config.Hide_output = True
twint.run.Profile(self.config)
return twint.storage.panda.Tweets_df

def _get_user_info(self, username):
def _get_user_info(self,username):
self.config.User_full = True
self.config.Username = username
self.config.Pandas = True
self.config.Hide_output=True
twint.run.Lookup(self.config)
return twint.storage.panda.User_df

Expand Down Expand Up @@ -147,14 +161,16 @@ def row_tweet_to_urls(row):
neo4j_df['hashtags'] = df['hashtags'].apply(lambda x: [{'text': ht} for ht in x])
neo4j_df['user_followers_count'] = None
neo4j_df['user_friends_count'] = None
# neo4j_df['user_created_at'] = None
#neo4j_df['date'] = df['date']
#neo4j_df['user_created_at'] = None
neo4j_df['user_profile_image_url'] = None
neo4j_df['reply_tweet_id'] = None
neo4j_df['user_mentions'] = df['tweet'].str.findall('@[\w]+')
# neo4j_df['retweet_id'] is suspiciously empty (always)
neo4j_df['retweeted_status'] = None
neo4j_df['conversation_id'] = df['conversation_id'] # FIXME no-op?
neo4j_df['created_at'] = (neo4j_df['created_at'] / 1000).apply(lambda n: datetime.fromtimestamp(n))
#neo4j_df['created_at'] = (df['created_at'] / 1000).apply(lambda n: datetime.fromtimestamp(n))
neo4j_df['created_at'] = df['date'].apply(lambda n: str(pd.to_datetime(n)))

# neo4j_df['quoted_status_id'] = df.apply(row_to_quoted_status_id, axis=1)
# neo4j_df['is_quote_status'] = neo4j_df['quoted_status_id'] != None
Expand Down