Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 29 additions & 44 deletions modules/FirehoseJob.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
###




from collections import deque, defaultdict
import datetime, gc, os, string, sys, time, uuid
import numpy as np
Expand All @@ -12,6 +15,8 @@
from .Timer import Timer
from .TwarcPool import TwarcPool
from .Neo4jDataAccess import Neo4jDataAccess
from .StatusArrow import KNOWN_FIELDS
from .Twint import TwintPool

import logging
logger = logging.getLogger('fh')
Expand Down Expand Up @@ -103,50 +108,6 @@
# 'url': pa.string()
# }))})

### When dtype -> arrow ambiguious, override
KNOWN_FIELDS = [
[0, 'contributors', pa.string()],
[1, 'coordinates', pa.string()],
[2, 'created_at', pa.string()],

#[3, 'display_text_range', pa.list_(pa.int64())],
[3, 'display_text_range', pa.string()],

[4, 'entities', pa.string()],
[5, 'extended_entities', pa.string()], #extended_entities_t ],
[7, 'favorited', pa.bool_()],
[8, 'favorite_count', pa.int64()],
[9, 'full_text', pa.string()],
[10, 'geo', pa.string()],
[11, 'id', pa.int64() ],
[12, 'id_str', pa.string() ],
[13, 'in_reply_to_screen_name', pa.string() ],
[14, 'in_reply_to_status_id', pa.int64() ],
[15, 'in_reply_to_status_id_str', pa.string() ],
[16, 'in_reply_to_user_id', pa.int64() ],
[17, 'in_reply_to_user_id_str', pa.string() ],
[18, 'is_quote_status', pa.bool_() ],
[19, 'lang', pa.string() ],
[20, 'place', pa.string()],
[21, 'possibly_sensitive', pa.bool_()],
[22, 'quoted_status', pa.string()],
[23, 'quoted_status_id', pa.int64()],
[24, 'quoted_status_id_str', pa.string()],
[25, 'quoted_status_permalink', pa.string()],
[26, 'retweet_count', pa.int64()],
[27, 'retweeted', pa.bool_()],
[28, 'retweeted_status', pa.string()],
[29, 'scopes', pa.string()],
[30, 'source', pa.string()],
[31, 'truncated', pa.bool_()],
[32, 'user', pa.string()],

#[33, 'withheld_in_countries', pa.list_(pa.string())],
[33, 'withheld_in_countries', pa.string()],

#[34, 'followers', pa.struct({'followers': pa.bool_()})]
[34, 'followers', pa.string()]
]

#############################

Expand Down Expand Up @@ -741,3 +702,27 @@ def ingest_range(self, begin, end, job_name=None): # This method is where the m
for i in range(0, self.TWEETS_PER_PROCESS):
ids_to_process.append(self.queue.popleft())
self.process_ids(ids_to_process, job_name)

###############################

def search_time_range(self,
Search="COVID",
Since="2020-01-01 20:00:00",
Until="2020-01-01 21:00:00",
job_name=None,
**kwargs):
if job_name is None:
job_name = "search_%s" % Search
tp = TwintPool()
for df,t0,t1 in tp._get_term(Search=Search, Since=Since, Until=Until, **kwargs):
logger.debug('hits %s to %s: %s', t0, t1, len(df))
df2 = tp.twint_df_to_neo4j_df(df)
if self.save_to_neo:
logger.debug('writing to neo4j')
df3 = Neo4jDataAccess(self.debug, self.neo4j_creds).save_df_to_graph(df2, job_name)
logger.debug('wrote to neo4j, # ', len(df3))
yield df3
else:
yield df2
logger.debug('done')

49 changes: 49 additions & 0 deletions modules/StatusArrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pyarrow as pa

### When dtype -> arrow ambiguious, override
KNOWN_FIELDS = [
[0, 'contributors', pa.string()],
[1, 'coordinates', pa.string()],
[2, 'created_at', pa.string()],

#[3, 'display_text_range', pa.list_(pa.int64())],
[3, 'display_text_range', pa.string()],

[4, 'entities', pa.string()],
[5, 'extended_entities', pa.string()], #extended_entities_t ],
[7, 'favorited', pa.bool_()],
[8, 'favorite_count', pa.int64()],
[9, 'full_text', pa.string()],
[10, 'geo', pa.string()],
[11, 'id', pa.int64() ],
[12, 'id_str', pa.string() ],
[13, 'in_reply_to_screen_name', pa.string() ],
[14, 'in_reply_to_status_id', pa.int64() ],
[15, 'in_reply_to_status_id_str', pa.string() ],
[16, 'in_reply_to_user_id', pa.int64() ],
[17, 'in_reply_to_user_id_str', pa.string() ],
[18, 'is_quote_status', pa.bool_() ],
[19, 'lang', pa.string() ],
[20, 'place', pa.string()],
[21, 'possibly_sensitive', pa.bool_()],
[22, 'quoted_status', pa.string()],
[23, 'quoted_status_id', pa.int64()],
[24, 'quoted_status_id_str', pa.string()],
[25, 'quoted_status_permalink', pa.string()],
[26, 'retweet_count', pa.int64()],
[27, 'retweeted', pa.bool_()],
[28, 'retweeted_status', pa.string()],
[29, 'scopes', pa.string()],
[30, 'source', pa.string()],
[31, 'truncated', pa.bool_()],
[32, 'user', pa.string()],

#[33, 'withheld_in_countries', pa.list_(pa.string())],
[33, 'withheld_in_countries', pa.string()],

#[34, 'followers', pa.struct({'followers': pa.bool_()})]
[34, 'followers', pa.string()]
]