Skip to content
Next Next commit
feat(write to disk): even without neo4j
  • Loading branch information
lmeyerov committed Sep 5, 2022
commit a5029536394eb51d16e7fbf4f3355ca87453a825
26 changes: 25 additions & 1 deletion modules/FirehoseJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ class FirehoseJob:
DROP_COLS = DROP_COLS

def __init__(self, creds=[], neo4j_creds=None, TWEETS_PER_PROCESS=100, TWEETS_PER_ROWGROUP=5000, save_to_neo=False,
PARQUET_SAMPLE_RATE_TIME_S=None, debug=False, BATCH_LEN=100, writers={'snappy': None}):
PARQUET_SAMPLE_RATE_TIME_S=None, debug=False, BATCH_LEN=100, writers={'snappy': None},
write_to_disk: Optional[Literal['csv', 'json']] = None
):
self.queue = deque()
self.writers = writers
self.last_write_epoch = ''
Expand All @@ -132,6 +134,7 @@ def __init__(self, creds=[], neo4j_creds=None, TWEETS_PER_PROCESS=100, TWEETS_PE
])
self.timer = Timer()
self.debug = debug
self.write_to_disk = write_to_disk

self.twarc_pool = TwarcPool([
Twarc(o['consumer_key'], o['consumer_secret'], o['access_token'], o['access_token_secret'])
Expand Down Expand Up @@ -691,12 +694,30 @@ def ingest_range(self, begin, end, job_name=None): # This method is where the m

###############################

def _maybe_write_batch(self, df, write_to_disk: Optional[Literal['csv', 'json']] = None, id: Optional[str] = None):
write_to_disk = write_to_disk or self.write_to_disk

logger.info('_maybe_write_batch: write_to_disk=%s, id=%s', write_to_disk, id)

if write_to_disk is None:
return
if id is None:
raise ValueError('need id to write to disk')

logger.info(f'writing batch {id} to disk: shape {df.shape}')

if write_to_disk == 'csv':
df.to_csv(f'/output/{id}.csv')
elif write_to_disk == 'json':
df.to_json(f'/output/{id}.json')

def search_time_range(self,
Search="COVID",
Since="2020-01-01 20:00:00",
Until="2020-01-01 21:00:00",
job_name=None,
tp=None,
write_to_disk: Optional[Literal['csv', 'json']] = None,
**kwargs):
tic = time.perf_counter()
if job_name is None:
Expand All @@ -723,8 +744,11 @@ def search_time_range(self,

logger.info('wrote to neo4j, # %s' % (len(res) if not (res is None) else 0))

self._maybe_write_batch(df, write_to_disk, f'{job_name}_{t0}_{t1}')
yield res
else:
self._maybe_write_batch(df, write_to_disk, f'{job_name}_{t0}_{t1}')
yield df

logger.info('done search_time_range')