Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infra/pipelines/docker/datastream-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ services:
DOMINO_END_DATE: ${DOMINO_END_DATE:-}
DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-}
DOMINO_SEARCH: ${DOMINO_SEARCH:-}
DOMINO_USERNAMES: ${DOMINO_USERNAMES:-}
DOMINO_FETCH_PROFILES: ${DOMINO_FETCH_PROFILES:-}
DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-}
DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-}
Expand Down
32 changes: 32 additions & 0 deletions infra/pipelines/docker/search_timelines.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
set -ex

DOMINO_JOB_NAME=${JOB_NAME:-historic_pfas_1}
DOMINO_USERNAMES=${USERNAMES:-"a,b,c"}
DOMINO_FETCH_PROFILES=${FETCH_PROFILES:-"false"}
DOMINO_STRIDE_SEC=${STRIDE_SEC:-30}
DOMINO_WRITE_FORMAT=${WRITE_FORMAT:-parquet}
DOMINO_S3_FILEPATH=${S3_FILEPATH:-dt-phase1}
DOMINO_COMPRESSION=${COMPRESSION:-snappy}
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}

echo "Using usernames: $DOMINO_USERNAMES"

docker-compose -f datastream-docker-compose.yml -p ${DOMINO_JOB_NAME} down -v
docker-compose -f datastream-docker-compose.yml build # --no-cache
JOB_FILE="search_timelines.py" \
DOMINO_JOB_NAME=$DOMINO_JOB_NAME \
DOMINO_USERNAMES=$DOMINO_USERNAMES \
DOMINO_FETCH_PROFILES=$DOMINO_FETCH_PROFILES \
DOMINO_STRIDE_SEC=$DOMINO_STRIDE_SEC \
DOMINO_WRITE_FORMAT=$DOMINO_WRITE_FORMAT \
DOMINO_S3_FILEPATH=$DOMINO_S3_FILEPATH \
DOMINO_COMPRESSION=$DOMINO_COMPRESSION \
AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
docker-compose -f datastream-docker-compose.yml \
-p ${DOMINO_JOB_NAME} \
up \
data-stream \
$@
22 changes: 22 additions & 0 deletions jobs/search_pfas_timelines.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

USERNAMES=""
while IFS= read -r LINE; do
USERNAMES="$USERNAMES,$LINE"
done < pfas_profiles

echo "Users: $USERNAMES"

#set -ex

cd ../infra/pipelines/docker/

JOB_NAME="pfas_timelines" \
FETCH_PROFILES="true" \
USERNAMES="$USERNAMES" \
STRIDE_SEC="`python -c 'print(30 * 1)'`" \
WRITE_FORMAT="parquet_s3" \
S3_FILEPATH="dt-phase1" \
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
./search_timelines.sh $@
137 changes: 137 additions & 0 deletions jobs/search_timelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python
# coding: utf-8


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL



import json, os, pandas as pd, pendulum, sys
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from prefect.engine.executors import DaskExecutor


S3_BUCKET = "wzy-project-domino"

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

def env_non_empty(x: str):
return x in os.environ and os.environ[x]

def str_to_bool (x: str):
if x in ['True', 'true', '1', 'TRUE']:
return True
elif x in ['False', 'false', '0', 'FALSE']:
return False
else:
raise ValueError('Cannot convert to bool: ' + x)

stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None
fetch_profiles = str_to_bool(os.environ['DOMINO_FETCH_PROFILES']) if env_non_empty('DOMINO_FETCH_PROFILES') else False
usernames_raw = os.environ['DOMINO_USERNAMES'] if env_non_empty('DOMINO_USERNAMES') else None
if usernames_raw is None:
raise ValueError('DOMINO_USERNAMES is not set, expected comma-delimited str')
usernames = usernames_raw.split(',')
usernames = [ x for x in usernames if len(x) > 0 ]

if write_format == 'parquet_s3':
s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy'

output_path = f'/output/{job_name}'
os.makedirs(f'{output_path}/tweets', exist_ok=True)
os.makedirs(f'{output_path}/profiles', exist_ok=True)
os.makedirs(f'{output_path}/timelines', exist_ok=True)


# FIXME unsafe when distributed
usernames_queue = usernames.copy()
pending = 0

@task(log_stdout=True, skip_on_upstream_skip=True, max_retries=3, retry_delay=timedelta(seconds=30))
def run_stream():

global pending

if len(usernames_queue) == 0 and pending == 0:
logger.info(f'Successfully processed all usernames ({len(usernames)}), exiting')
sys.exit(0)

if len(usernames_queue) == 0:
logger.info(f'No more usernames to process, but {pending} jobs are still pending')
return

pending += 1
username = usernames_queue.pop(0)

try:

tp = TwintPool(is_tor=True)
fh = FirehoseJob(
PARQUET_SAMPLE_RATE_TIME_S=30,
save_to_neo=False,
tp=tp,
writers={},
write_to_disk=write_format,
write_opts=(
{
's3_filepath': s3_filepath,
's3fs_options': {
'key': AWS_ACCESS_KEY_ID,
'secret': AWS_SECRET_ACCESS_KEY
},
'compression': compression
}
if write_format == 'parquet_s3' else
{}
)
)

try:
for df in fh.get_timelines(
usernames=[username],
job_name=job_name,
fetch_profiles = fetch_profiles
):
print('got: %s', df.shape if df is not None else 'None')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
except:
logger.error("task exception, reinserting user", exc_info=True)
usernames_queue.insert(0, username)
pending -= 1
print("task finished")


schedule_opts = {
'interval': timedelta(seconds=stride_sec),
'start_date': pendulum.parse('2019-01-01 00:00:00')
}
logger.info(f'Schedule options: {schedule_opts}')
logger.info(f'Task settings: stride_sec={stride_sec}')

schedule = IntervalSchedule(**schedule_opts)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow(f"{job_name} stream", schedule=schedule) as flow:
run_stream()
flow.run()

81 changes: 77 additions & 4 deletions modules/FirehoseJob.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
###

from asyncore import write
from codecs import ignore_errors
from collections import deque, defaultdict
import datetime, gc, os, string, sys, time, uuid
from datetime import date
from typing import Any, Literal, Optional
import numpy as np
import pandas as pd
Expand All @@ -12,6 +14,7 @@
import s3fs
import simplejson as json # nan serialization
from twarc import Twarc
from twint.user import SuspendedUser

from .Timer import Timer
from .TwarcPool import TwarcPool
Expand Down Expand Up @@ -816,15 +819,25 @@ def search_user_info_by_name(self, df, tp = None) -> Optional[pd.DataFrame]:
tp = tp or self.tp or TwintPool(is_tor=True)
user_names = df[[col]].drop_duplicates()[col].to_list()
unseen_user_names = [ user_name for user_name in user_names if user_name not in self._enriched_users ]

lst = [tp._get_user_info(username=user) for user in unseen_user_names]

if len(unseen_user_names) == 0:
logger.debug('skipping search_user_info_by_name, all user names already enriched: %s / %s',
len(unseen_user_names), len(user_names))
return None

lst = [tp._get_user_info(username=user, ignore_errors=True) for user in unseen_user_names]
lst = [x for x in lst if x is not None]
if len(lst) == 0 or all([len(x) == 0 for x in lst]):
logger.debug('ending search_user_info_by_name, no user info found for search of %s of %s users', len(unseen_user_names), len(user_names))
return None

dfs = pd.concat(lst).drop_duplicates(subset=["id"])

seen_user_names = dfs['username'].to_list()
for user in seen_user_names:
self._enriched_users.add(user)

print('search_user_info_by_name cache hit rate (%s / %s) and twint hydration rate (%s / %s)' % (
logger.debug('search_user_info_by_name cache hit rate (%s / %s) and twint hydration rate (%s / %s)' % (
len(user_names) - len(seen_user_names), len(user_names),
len(seen_user_names), len(unseen_user_names)
))
Expand All @@ -837,7 +850,7 @@ def search_time_range(self,
Until="2020-01-01 21:00:00",
job_name=None,
tp=None,
write_to_disk: Optional[Literal['csv', 'json']] = None,
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
fetch_profiles: bool = False,
**kwargs):
tic = time.perf_counter()
Expand Down Expand Up @@ -892,3 +905,63 @@ def search_time_range(self,
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
logger.info('done search_time_range')

def get_timelines(self,
usernames,
job_name=None,
tp=None,
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
fetch_profiles: bool = False,
**kwargs):
tic = time.perf_counter()
if job_name is None:
job_name = "timeline_%s" % user
tp = tp or self.tp or TwintPool(is_tor=True)
for user in usernames:
logger.info('start user: %s', user)
t_prev = time.perf_counter()
now = date.today().strftime('%Y-%m-%d%H:%M:%S')
tp.reset_config()
try:
df = tp._get_timeline(username=user, **kwargs)
user_exists = True
except SuspendedUser:
logger.info(f'User {user} is suspended')
df = None
user_exists = False
if df is not None:
self._maybe_write_batch(
df,
write_to_disk,
f'{job_name}/timelines/{user}',
write_opts=kwargs.get('write_opts', self.write_opts)
)

t_iter = time.perf_counter()
logger.info(f'finished tp._get_timeline ({user}): {t_iter - t_prev:0.4f} seconds')
t_prev = t_iter

if fetch_profiles:
if user_exists:
tp.reset_config()
users_df = self.search_user_info_by_name(pd.DataFrame({'username': [user]}), tp)
else:
users_df = pd.DataFrame({'username': [user], 'suspended': [True]})
if users_df is not None:
self._maybe_write_batch(
users_df,
write_to_disk,
f'{job_name}/profiles/{now}',
write_opts=kwargs.get('write_opts', self.write_opts)
)
t_iter = time.perf_counter()
logger.info(f'finished tp.search_user_info_by_name: {t_iter - t_prev:0.4f} seconds')
t_prev = t_iter

yield df

toc = time.perf_counter()
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
logger.info('done get_timelines')



Loading