1+ import pyarrow as pa
2+ import twint
3+ from urlextract import URLExtract
4+ from datetime import datetime , timedelta
5+
6+ import logging
7+ logger = logging .getLogger ()
8+
9+ class TwintPool :
10+
11+ def __init__ (self , fh_job = None , job_name = 'noname' ):
12+ self .fh = fh_job
13+ self .config = twint .Config ()
14+ self .config .Limit = 100000
15+ self .config .Pandas = True
16+ self .config .User_full = True
17+ self .config .Hide_output = True
18+
19+
20+ def twint_loop (self , since , until , stride_sec = 600 , limit = None ):
21+ def get_unix_time (time_str ):
22+ return datetime .strptime (time_str , '%Y-%m-%d %H:%M:%S' )
23+ since = get_unix_time (since )
24+ until = get_unix_time (until )
25+ t = since
26+ tweets_returned = 0
27+
28+ while t < until and (not tweets_returned or tweets_returned < limit ):
29+ t0 = t
30+ t1 = t + timedelta (seconds = stride_sec )
31+ self .config .Since = str (t0 )
32+ self .config .Until = str (t1 )
33+ logger .debug ('Search step: %s-%s' , t0 , t1 )
34+ twint .run .Search (self .config )
35+ tweets_returned += len (twint .storage .panda .Tweets_df )
36+ if len (twint .storage .panda .Tweets_df ) > 0 :
37+ logger .debug ('Search hit, len %s' , len (twint .storage .panda .Tweets_df ))
38+ yield (twint .storage .panda .Tweets_df , t0 , t1 )
39+ else :
40+ logger .debug ('not hits on %s - %s, continuing' , t0 , t1 )
41+ t = t1
42+ logger .debug ('twint_loop done, hits: %s' , tweets_returned )
43+
44+
45+
46+ def _get_term (self , Search = "IngSoc" , Since = "1984-04-20 13:00:00" , Until = "1984-04-20 13:30:00" , stride_sec = 600 , ** kwargs ):
47+ self .config .Search = Search
48+ self .config .Retweets = True
49+ for k ,v in kwargs .items ():
50+ setattr (self .config , k , v )
51+ #self.config.Search = term
52+ logger .debug ('Search seq: %s-%s of %s' , Search , Since , Until )
53+ for df ,t0 ,t1 in self .twint_loop (Since , Until , stride_sec , self .config .Limit ):
54+ yield (df , t0 , t1 )
55+
56+ def _get_timeline (self , username = "lmeyerov" ):
57+ self .config .Username = username
58+ self .config .Retweets = True
59+ #self.config.Search = term
60+ twint .run .Search (self .config )
61+ tweets_df = twint .storage .panda .Tweets_df
62+ return tweets_df
63+
64+ def twint_df_to_neo4j_df (self , df ):
65+ neo4j_df = df .rename (columns = {
66+ 'id' : 'status_id' ,
67+ 'tweet' : 'full_text' ,
68+ 'created_at' : 'created_at' , # needs to be datetime
69+ 'nlikes' : 'favorite_count' ,
70+ 'nretweets' : 'retweet_count' ,
71+ 'user_id_str' : 'user_id' ,
72+ 'username' : 'user_name' ,
73+ 'name' : 'user_screen_name'
74+ })
75+
76+
77+ def row_to_tweet_type (row ):
78+ if row ['quote_url' ] is None or row ['quote_url' ] == '' :
79+ return "QUOTE_RETWEET"
80+ elif row ['retweet' ]:
81+ return "RETWEET"
82+ elif row ['id' ] == row ['conversation_id' ]:
83+ return "TWEET"
84+ elif row ['id' ] != row ['conversation_id' ]:
85+ return "REPLY"
86+ else :
87+ raise ('wat' )
88+
89+ def row_to_quoted_status_id (row ):
90+ if row ['quote_url' ] and len (row ['quote_url' ]) > 0 :
91+ return row ['quote_url' ].split ('/' )[- 1 ]
92+ else :
93+ return None
94+
95+ def row_tweet_to_urls (row ):
96+ extractor = URLExtract ()
97+ return list (extractor .gen_urls (row ['tweet' ]))
98+
99+ neo4j_df ['user_location' ] = None
100+ neo4j_df ['tweet_type_twint' ] = df .apply (row_to_tweet_type , axis = 1 )
101+ neo4j_df ['hashtags' ] = df ['hashtags' ].apply (lambda x : [{'text' : ht } for ht in x ])
102+ neo4j_df ['user_followers_count' ] = None
103+ neo4j_df ['user_friends_count' ] = None
104+ neo4j_df ['user_created_at' ] = None
105+ neo4j_df ['user_profile_image_url' ] = None
106+ neo4j_df ['reply_tweet_id' ] = None
107+ neo4j_df ['user_mentions' ] = df ['tweet' ].str .findall ('@[\w]+' )
108+ #neo4j_df['retweet_id'] is suspiciously empty (always)
109+ neo4j_df ['retweeted_status' ] = None
110+
111+ neo4j_df ['created_at' ] = (neo4j_df ['created_at' ] / 1000 ).apply (lambda n : datetime .fromtimestamp (n ))
112+
113+ neo4j_df ['quoted_status_id' ] = df .apply (row_to_quoted_status_id , axis = 1 )
114+ neo4j_df ['is_quote_status' ] = neo4j_df ['quoted_status_id' ] != None
115+ neo4j_df ['in_reply_to_status_id' ] = False
116+ neo4j_df ['urls' ] = df .apply (row_tweet_to_urls , axis = 1 )
117+
118+ return neo4j_df
119+
120+
121+ def to_arrow (self , tweets_df ):
122+ pass
123+
124+
125+
0 commit comments