Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ def __save_df_to_graph(self, df, job_name, job_id=None):
for index, row in df.iterrows():
# determine the type of tweet
tweet_type = 'TWEET'
if row["in_reply_to_status_id"] is not None and row["in_reply_to_status_id"] > 0:
if row['tweet_type_twint']:
tweet_type = row['tweet_type_twint']
elif row["in_reply_to_status_id"] is not None and row["in_reply_to_status_id"] > 0:
tweet_type = "REPLY"
elif "quoted_status_id" in row and row["quoted_status_id"] is not None and row["quoted_status_id"] > 0:
tweet_type = "QUOTE_RETWEET"
Expand All @@ -347,8 +349,11 @@ def __save_df_to_graph(self, df, job_name, job_id=None):
'user_created_at': pd.Timestamp(row['user_created_at'], unit='s').to_pydatetime(),
'user_profile_image_url': row['user_profile_image_url'],
'reply_tweet_id': row['in_reply_to_status_id'],
'conversation_id': row['conversation_id'] if 'conversation_id' in row else None,
'quoted_status_id': row['quoted_status_id'],
'retweet_id': row['retweet_id'] if 'retweet_id' in row else None,
'geo': row['geo'] if 'geo' in row else None,
'ingest_method': row['ingest_method']
})
except Exception as e:
logging.error('params.append exn', e)
Expand Down Expand Up @@ -437,4 +442,4 @@ def __parse_urls(self, row, url_params, job_name, job_id=None):
logging.error(inst.args) # arguments stored in .args
# __str__ allows args to be printed directly,
logging.error(inst)
return url_params
return url_params
117 changes: 117 additions & 0 deletions modules/Twint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pyarrow as pa
import twint
from urlextract import URLExtractor
from datetime import datetime, timedelta


class TwintPool:
def __init__(self, fh_job=None, job_name="noname"):
self.fh = fh_job
self.config = twint.Config()
self.config.Limit = 100
self.config.Pandas = True
self.config.User_full = True
self.config.Hide_output = True

def twint_loop(self, since, until, stride_sec=600, limit=None):
def get_unix_time(time_str):
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")

since = get_unix_time(since)
until = get_unix_time(until)
t = since
tweets_returned = 0

while t < until and (not tweets_returned or tweets_returned < limit):
t0 = t
t1 = t + timedelta(seconds=stride_sec)
self.config.Since = str(t0)
self.config.Until = str(t1)
twint.run.Search(self.config)
tweets_returned += len(twint.storage.panda.Tweets_df)
yield (twint.storage.panda.Tweets_df, t0, t1)
t = t1

def _get_term(
self,
Search="IngSoc",
Since="1984-04-20 13:00:00",
Until="1984-04-20 13:30:00",
stride_sec=600,
**kwargs
):
self.config.Search = Search
self.config.Retweets = True
for k, v in kwargs.items():
setattr(self.config, k, v)
# self.config.Search = term
for df, t0, t1 in self.twint_loop(Since, Until, stride_sec, self.config.Limit):
yield (df, t0, t1)


def _get_timeline(self, username="lmeyerov"):
self.config.Username = username
self.config.Retweets = True
#self.config.Search = term
twint.run.Search(self.config)
tweets_df = twint.storage.panda.Tweets_df
return tweets_df

def twint_df_to_neo4j_df(self, df):
neo4j_df = df.rename(
columns={
"id": "status_id",
"tweet": "full_text",
"created_at": "created_at", # needs to be datetime
"nlikes": "favorite_count",
"nretweets": "retweet_count",
"user_id_str": "user_id",
"username": "user_name",
"name": "user_screen_name",
}
)

def row_to_tweet_type(row):
if row["quote_url"] is None or row["quote_url"] == "":
return "QUOTE_RETWEET"
elif row["retweet"]:
return "RETWEET"
elif row["id"] == row["conversation_id"]:
return "TWEET"
elif row["id"] != row["conversation_id"]:
return "REPLY"
else:
raise ("wat")

def row_to_quoted_status_id(row):
if row["quote_url"] and len(row["quote_url"]) > 0:
return row["quote_url"].split("/")[-1]
else:
return None

def row_tweet_to_urls(row):
extractor = URLExtract()
return list(extractor.gen_urls(row["tweet"]))

neo4j_df["user_location"] = None
neo4j_df["tweet_type_twint"] = df.apply(row_to_tweet_type, axis=1)
neo4j_df["hashtags"] = df["hashtags"].apply(
lambda x: [{"text": ht} for ht in x]
)
neo4j_df["user_followers_count"] = None
neo4j_df["user_friends_count"] = None
neo4j_df["user_created_at"] = None
neo4j_df["user_profile_image_url"] = None
neo4j_df["in_reply_to_status_id"] = None
neo4j_df["user_mentions"] = [] # Todo
# neo4j_df['retweet_id'] is suspiciously empty (always)

neo4j_df["quoted_status_id"] = df.apply(row_to_quoted_status_id, axis=1)
neo4j_df["urls"] = df.apply(row_tweet_to_urls, axis=1)

neo4j_df["ingest_method"] = 'twint'

return neo4j_df

def to_arrow(self, tweets_df):
pass