Skip to content
Prev Previous commit
Next Next commit
feat(s3 writes)
  • Loading branch information
lmeyerov committed Sep 5, 2022
commit 76782f589e46e8fe1477a8fdefd8df215f8a1a3b
2 changes: 1 addition & 1 deletion infra/pipelines/docker/datastream-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get update \
&& rm -rf /var/lib/apt/lists/*

RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
pandas pyarrow urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161
pandas pyarrow s3fs urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161

#RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git
RUN pip install git+https://github.com/graphistry/twint.git
Expand Down
3 changes: 3 additions & 0 deletions infra/pipelines/docker/datastream-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ version: '3'

services:
data-stream:
image: graphistry/datastream-agent
build:
context: ../../../
dockerfile: infra/pipelines/docker/datastream-Dockerfile
Expand Down Expand Up @@ -38,6 +39,8 @@ services:
DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-}
DOMINO_SEARCH: ${DOMINO_SEARCH:-}
DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-}
DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-}
DOMINO_COMPRESSION: ${DOMINO_COMPRESSION:-}
logging:
options:
tag: 'ImageName:{{.ImageName}}/Name:{{.Name}}/ID:{{.ID}}/ImageFullID:{{.ImageFullID}}'
Expand Down
30 changes: 20 additions & 10 deletions infra/pipelines/docker/search_historic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,27 @@ DOMINO_STRIDE_SEC=${STRIDE_SEC:-30}
DOMINO_HISTORIC_STRIDE_SEC=${HISTORIC_STRIDE_SEC:-86400}
DOMINO_TWINT_STRIDE_SEC=${TWINT_STRIDE_SEC:-28800}
DOMINO_WRITE_FORMAT=${WRITE_FORMAT:-parquet}
DOMINO_S3_FILEPATH=${S3_FILEPATH:-dt-phase1}
DOMINO_COMPRESSION=${COMPRESSION:-snappy}
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}

docker-compose -f datastream-docker-compose.yml down -v
docker-compose -f datastream-docker-compose.yml build # --no-cache
docker-compose -f datastream-docker-compose.yml -p ${DOMINO_JOB_NAME} down -v
docker-compose -f datastream-docker-compose.yml build # --no-cache
JOB_FILE="search_historic.py" \
DOMINO_JOB_NAME=$DOMINO_JOB_NAME \
DOMINO_SEARCH=$DOMINO_SEARCH \
DOMINO_START_DATE=$DOMINO_START_DATE \
DOMINO_STRIDE_SEC=$DOMINO_STRIDE_SEC \
DOMINO_HISTORIC_STRIDE_SEC=$DOMINO_HISTORIC_STRIDE_SEC \
DOMINO_TWINT_STRIDE_SEC=$TWINT_STRIDE_SEC \
DOMINO_WRITE_FORMAT=$DOMINO_WRITE_FORMAT \
DOMINO_JOB_NAME=$DOMINO_JOB_NAME \
DOMINO_SEARCH=$DOMINO_SEARCH \
DOMINO_START_DATE=$DOMINO_START_DATE \
DOMINO_STRIDE_SEC=$DOMINO_STRIDE_SEC \
DOMINO_HISTORIC_STRIDE_SEC=$DOMINO_HISTORIC_STRIDE_SEC \
DOMINO_TWINT_STRIDE_SEC=$TWINT_STRIDE_SEC \
DOMINO_WRITE_FORMAT=$DOMINO_WRITE_FORMAT \
DOMINO_S3_FILEPATH=$DOMINO_S3_FILEPATH \
DOMINO_COMPRESSION=$DOMINO_COMPRESSION \
AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
docker-compose -f datastream-docker-compose.yml \
-p ${DOMINO_JOB_NAME} \
up --force-recreate data-stream $@
up \
data-stream \
$@
25 changes: 24 additions & 1 deletion jobs/search_historic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def env_non_empty(x: str):
search = os.environ['DOMINO_SEARCH'] if env_non_empty('DOMINO_SEARCH') else "covid OR corona OR virus OR pandemic"
write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None

if write_format == 'parquet_s3':
s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy'

output_path = f'/output/{job_name}'
os.makedirs(output_path, exist_ok=True)

Expand All @@ -65,7 +71,24 @@ def run_stream():
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
tp = TwintPool(is_tor=True)
fh = FirehoseJob(PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=False, writers={}, write_to_disk=write_format)
fh = FirehoseJob(
PARQUET_SAMPLE_RATE_TIME_S=30,
save_to_neo=False,
writers={},
write_to_disk=write_format,
write_opts=(
{
's3_filepath': s3_filepath,
's3fs_options': {
'key': AWS_ACCESS_KEY_ID,
'secret': AWS_SECRET_ACCESS_KEY
},
'compression': compression
}
if write_format == 'parquet_s3' else
{}
)
)

try:
for df in fh.search_time_range(
Expand Down
68 changes: 55 additions & 13 deletions modules/FirehoseJob.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
###

from asyncore import write
from collections import deque, defaultdict
import datetime, gc, os, string, sys, time, uuid
from typing import Literal, Optional
from typing import Any, Literal, Optional
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.fs
import pyarrow.parquet as pq
import s3fs
import simplejson as json # nan serialization
from twarc import Twarc

Expand Down Expand Up @@ -123,7 +126,8 @@ class FirehoseJob:

def __init__(self, creds=[], neo4j_creds=None, TWEETS_PER_PROCESS=100, TWEETS_PER_ROWGROUP=5000, save_to_neo=False,
PARQUET_SAMPLE_RATE_TIME_S=None, debug=False, BATCH_LEN=100, writers={'snappy': None},
write_to_disk: Optional[Literal['csv', 'json']] = None
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
write_opts: Optional[Any] = None
):
self.queue = deque()
self.writers = writers
Expand All @@ -136,6 +140,7 @@ def __init__(self, creds=[], neo4j_creds=None, TWEETS_PER_PROCESS=100, TWEETS_PE
self.timer = Timer()
self.debug = debug
self.write_to_disk = write_to_disk
self.write_opts = write_opts

self.twarc_pool = TwarcPool([
Twarc(o['consumer_key'], o['consumer_secret'], o['access_token'], o['access_token_secret'])
Expand Down Expand Up @@ -354,7 +359,7 @@ def pq_writer(self, table, job_name='generic_job'):

def flush(self, job_name="generic_job"):
try:
if self.current_table is None or self.current_table.num_rows == 0:
if not hasattr(self, 'current_table') or self.current_table is None or self.current_table.num_rows == 0:
return
logger.debug('writing to parquet then clearing current_table..')
deferred_pq_exn = None
Expand Down Expand Up @@ -695,7 +700,13 @@ def ingest_range(self, begin, end, job_name=None): # This method is where the m

###############################

def _maybe_write_batch(self, df, write_to_disk: Optional[Literal['csv', 'json', 'parquet']] = None, id: Optional[str] = None):
def _maybe_write_batch(
self,
df,
write_to_disk: Optional[Literal['csv', 'json', 'parquet']] = None,
id: Optional[str] = None,
write_opts = {}
):
write_to_disk = write_to_disk or self.write_to_disk

logger.info('_maybe_write_batch: write_to_disk=%s, id=%s', write_to_disk, id)
Expand All @@ -713,6 +724,31 @@ def _maybe_write_batch(self, df, write_to_disk: Optional[Literal['csv', 'json',
df.to_json(f'/output/{id}.json')
elif write_to_disk == 'parquet':
df.to_parquet(f'/output/{id}.parquet', compression='snappy')
elif write_to_disk == 'parquet_s3':

#s3_filepath = 'dt-phase1/data.parquet'
s3_filepath = write_opts['s3_filepath']
s3fs_options = write_opts['s3fs_options'] #key=S3_ACCESS_KEY, secret=S3_SECRET_KEY
compression = (
write_opts['compression']
if 'compression' in write_opts and len(write_opts['compression']) > 0 else
'snappy'
)

s3fs_instance = s3fs.S3FileSystem(**s3fs_options)
filesystem = pyarrow.fs.PyFileSystem(pa.fs.FSSpecHandler(s3fs_instance))

df_cleaned = df.infer_objects()

pq.write_to_dataset(
pa.Table.from_pandas(df_cleaned),
f'{s3_filepath}/{id}.parquet',
filesystem=filesystem,
use_dictionary=True,
compression=compression,
version="2.4",
)

else:
raise ValueError(f'unknown write_to_disk format: {write_to_disk}')

Expand All @@ -730,6 +766,7 @@ def search_time_range(self,
if tp is None:
tp = TwintPool(is_tor=True)
logger.info('start search_time_range: %s -> %s', Since, Until)
t_prev = time.perf_counter()
for df, t0, t1 in tp._get_term(Search=Search, Since=Since, Until=Until, **kwargs):
logger.info('hits %s to %s: %s', t0, t1, len(df))
if self.save_to_neo:
Expand All @@ -744,16 +781,21 @@ def search_time_range(self,

res = Neo4jDataAccess(self.neo4j_creds).save_twintdf_to_neo(chkd, job_name, job_id=None)
# df3 = Neo4jDataAccess(self.debug, self.neo4j_creds).save_df_to_graph(df2, job_name)
toc = time.perf_counter()
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')

logger.info('wrote to neo4j, # %s' % (len(res) if not (res is None) else 0))

self._maybe_write_batch(df, write_to_disk, f'{job_name}/{t0}_{t1}')
yield res
else:
self._maybe_write_batch(df, write_to_disk, f'{job_name}/{t0}_{t1}')
yield df

res = df
self._maybe_write_batch(
res,
write_to_disk,
f'{job_name}/{t0}_{t1}',
write_opts=kwargs.get('write_opts', self.write_opts)
)
t_iter = time.perf_counter()
logger.info(f'finished tp.get_term: {t_iter - t_prev:0.4f} seconds')
t_prev = t_iter
yield res

toc = time.perf_counter()
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
logger.info('done search_time_range')