Skip to content

Commit 205566a

Browse files
authored
Merge pull request #92 from TheDataRideAlongs/dev/profiles
2 parents f82a6cf + 4dcbbdd commit 205566a

6 files changed

Lines changed: 323 additions & 27 deletions

File tree

‎infra/pipelines/docker/datastream-docker-compose.yml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ services:
3838
DOMINO_END_DATE: ${DOMINO_END_DATE:-}
3939
DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-}
4040
DOMINO_SEARCH: ${DOMINO_SEARCH:-}
41+
DOMINO_USERNAMES: ${DOMINO_USERNAMES:-}
4142
DOMINO_FETCH_PROFILES: ${DOMINO_FETCH_PROFILES:-}
4243
DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-}
4344
DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/bash
2+
set -ex
3+
4+
DOMINO_JOB_NAME=${JOB_NAME:-historic_pfas_1}
5+
DOMINO_USERNAMES=${USERNAMES:-"a,b,c"}
6+
DOMINO_FETCH_PROFILES=${FETCH_PROFILES:-"false"}
7+
DOMINO_STRIDE_SEC=${STRIDE_SEC:-30}
8+
DOMINO_WRITE_FORMAT=${WRITE_FORMAT:-parquet}
9+
DOMINO_S3_FILEPATH=${S3_FILEPATH:-dt-phase1}
10+
DOMINO_COMPRESSION=${COMPRESSION:-snappy}
11+
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
12+
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}
13+
14+
echo "Using usernames: $DOMINO_USERNAMES"
15+
16+
docker-compose -f datastream-docker-compose.yml -p ${DOMINO_JOB_NAME} down -v
17+
docker-compose -f datastream-docker-compose.yml build # --no-cache
18+
JOB_FILE="search_timelines.py" \
19+
DOMINO_JOB_NAME=$DOMINO_JOB_NAME \
20+
DOMINO_USERNAMES=$DOMINO_USERNAMES \
21+
DOMINO_FETCH_PROFILES=$DOMINO_FETCH_PROFILES \
22+
DOMINO_STRIDE_SEC=$DOMINO_STRIDE_SEC \
23+
DOMINO_WRITE_FORMAT=$DOMINO_WRITE_FORMAT \
24+
DOMINO_S3_FILEPATH=$DOMINO_S3_FILEPATH \
25+
DOMINO_COMPRESSION=$DOMINO_COMPRESSION \
26+
AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
27+
AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
28+
docker-compose -f datastream-docker-compose.yml \
29+
-p ${DOMINO_JOB_NAME} \
30+
up \
31+
data-stream \
32+
$@

‎jobs/search_pfas_timelines.sh‎

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/bin/bash
2+
3+
USERNAMES=""
4+
while IFS= read -r LINE; do
5+
USERNAMES="$USERNAMES,$LINE"
6+
done < pfas_profiles
7+
8+
echo "Users: $USERNAMES"
9+
10+
#set -ex
11+
12+
cd ../infra/pipelines/docker/
13+
14+
JOB_NAME="pfas_timelines" \
15+
FETCH_PROFILES="true" \
16+
USERNAMES="$USERNAMES" \
17+
STRIDE_SEC="`python -c 'print(30 * 1)'`" \
18+
WRITE_FORMAT="parquet_s3" \
19+
S3_FILEPATH="dt-phase1" \
20+
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
21+
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
22+
./search_timelines.sh $@

‎jobs/search_timelines.py‎

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#!/usr/bin/env python
2+
# coding: utf-8
3+
4+
5+
import logging
6+
logger = logging.getLogger()
7+
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL
8+
9+
10+
11+
import json, os, pandas as pd, pendulum, sys
12+
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
13+
from ProjectDomino.FirehoseJob import FirehoseJob
14+
from ProjectDomino.TwintPool import TwintPool
15+
from prefect.environments.storage import S3
16+
from prefect import context, Flow, task
17+
from prefect.schedules import IntervalSchedule
18+
from datetime import timedelta, datetime
19+
from prefect.engine.executors import DaskExecutor
20+
21+
22+
S3_BUCKET = "wzy-project-domino"
23+
24+
pd.set_option('display.max_colwidth', None)
25+
pd.set_option('display.max_rows', 500)
26+
pd.set_option('display.max_columns', 500)
27+
pd.set_option('display.width', 1000)
28+
29+
def env_non_empty(x: str):
30+
return x in os.environ and os.environ[x]
31+
32+
def str_to_bool (x: str):
33+
if x in ['True', 'true', '1', 'TRUE']:
34+
return True
35+
elif x in ['False', 'false', '0', 'FALSE']:
36+
return False
37+
else:
38+
raise ValueError('Cannot convert to bool: ' + x)
39+
40+
stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
41+
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
42+
write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None
43+
fetch_profiles = str_to_bool(os.environ['DOMINO_FETCH_PROFILES']) if env_non_empty('DOMINO_FETCH_PROFILES') else False
44+
usernames_raw = os.environ['DOMINO_USERNAMES'] if env_non_empty('DOMINO_USERNAMES') else None
45+
if usernames_raw is None:
46+
raise ValueError('DOMINO_USERNAMES is not set, expected comma-delimited str')
47+
usernames = usernames_raw.split(',')
48+
usernames = [ x for x in usernames if len(x) > 0 ]
49+
50+
if write_format == 'parquet_s3':
51+
s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None
52+
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
53+
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
54+
compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy'
55+
56+
output_path = f'/output/{job_name}'
57+
os.makedirs(f'{output_path}/tweets', exist_ok=True)
58+
os.makedirs(f'{output_path}/profiles', exist_ok=True)
59+
os.makedirs(f'{output_path}/timelines', exist_ok=True)
60+
61+
62+
# FIXME unsafe when distributed
63+
usernames_queue = usernames.copy()
64+
pending = 0
65+
66+
@task(log_stdout=True, skip_on_upstream_skip=True, max_retries=3, retry_delay=timedelta(seconds=30))
67+
def run_stream():
68+
69+
global pending
70+
71+
if len(usernames_queue) == 0 and pending == 0:
72+
logger.info(f'Successfully processed all usernames ({len(usernames)}), exiting')
73+
sys.exit(0)
74+
75+
if len(usernames_queue) == 0:
76+
logger.info(f'No more usernames to process, but {pending} jobs are still pending')
77+
return
78+
79+
pending += 1
80+
username = usernames_queue.pop(0)
81+
82+
try:
83+
84+
tp = TwintPool(is_tor=True)
85+
fh = FirehoseJob(
86+
PARQUET_SAMPLE_RATE_TIME_S=30,
87+
save_to_neo=False,
88+
tp=tp,
89+
writers={},
90+
write_to_disk=write_format,
91+
write_opts=(
92+
{
93+
's3_filepath': s3_filepath,
94+
's3fs_options': {
95+
'key': AWS_ACCESS_KEY_ID,
96+
'secret': AWS_SECRET_ACCESS_KEY
97+
},
98+
'compression': compression
99+
}
100+
if write_format == 'parquet_s3' else
101+
{}
102+
)
103+
)
104+
105+
try:
106+
for df in fh.get_timelines(
107+
usernames=[username],
108+
job_name=job_name,
109+
fetch_profiles = fetch_profiles
110+
):
111+
print('got: %s', df.shape if df is not None else 'None')
112+
except Exception as e:
113+
logger.error("job exception", exc_info=True)
114+
raise e
115+
except:
116+
logger.error("task exception, reinserting user", exc_info=True)
117+
usernames_queue.insert(0, username)
118+
pending -= 1
119+
print("task finished")
120+
121+
122+
schedule_opts = {
123+
'interval': timedelta(seconds=stride_sec),
124+
'start_date': pendulum.parse('2019-01-01 00:00:00')
125+
}
126+
logger.info(f'Schedule options: {schedule_opts}')
127+
logger.info(f'Task settings: stride_sec={stride_sec}')
128+
129+
schedule = IntervalSchedule(**schedule_opts)
130+
storage = S3(bucket=S3_BUCKET)
131+
132+
#with Flow("covid-19 stream-single") as flow:
133+
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
134+
with Flow(f"{job_name} stream", schedule=schedule) as flow:
135+
run_stream()
136+
flow.run()
137+

‎modules/FirehoseJob.py‎

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
###
22

33
from asyncore import write
4+
from codecs import ignore_errors
45
from collections import deque, defaultdict
56
import datetime, gc, os, string, sys, time, uuid
7+
from datetime import date
68
from typing import Any, Literal, Optional
79
import numpy as np
810
import pandas as pd
@@ -12,6 +14,7 @@
1214
import s3fs
1315
import simplejson as json # nan serialization
1416
from twarc import Twarc
17+
from twint.user import SuspendedUser
1518

1619
from .Timer import Timer
1720
from .TwarcPool import TwarcPool
@@ -816,15 +819,25 @@ def search_user_info_by_name(self, df, tp = None) -> Optional[pd.DataFrame]:
816819
tp = tp or self.tp or TwintPool(is_tor=True)
817820
user_names = df[[col]].drop_duplicates()[col].to_list()
818821
unseen_user_names = [ user_name for user_name in user_names if user_name not in self._enriched_users ]
819-
820-
lst = [tp._get_user_info(username=user) for user in unseen_user_names]
822+
823+
if len(unseen_user_names) == 0:
824+
logger.debug('skipping search_user_info_by_name, all user names already enriched: %s / %s',
825+
len(unseen_user_names), len(user_names))
826+
return None
827+
828+
lst = [tp._get_user_info(username=user, ignore_errors=True) for user in unseen_user_names]
829+
lst = [x for x in lst if x is not None]
830+
if len(lst) == 0 or all([len(x) == 0 for x in lst]):
831+
logger.debug('ending search_user_info_by_name, no user info found for search of %s of %s users', len(unseen_user_names), len(user_names))
832+
return None
833+
821834
dfs = pd.concat(lst).drop_duplicates(subset=["id"])
822835

823836
seen_user_names = dfs['username'].to_list()
824837
for user in seen_user_names:
825838
self._enriched_users.add(user)
826839

827-
print('search_user_info_by_name cache hit rate (%s / %s) and twint hydration rate (%s / %s)' % (
840+
logger.debug('search_user_info_by_name cache hit rate (%s / %s) and twint hydration rate (%s / %s)' % (
828841
len(user_names) - len(seen_user_names), len(user_names),
829842
len(seen_user_names), len(unseen_user_names)
830843
))
@@ -837,7 +850,7 @@ def search_time_range(self,
837850
Until="2020-01-01 21:00:00",
838851
job_name=None,
839852
tp=None,
840-
write_to_disk: Optional[Literal['csv', 'json']] = None,
853+
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
841854
fetch_profiles: bool = False,
842855
**kwargs):
843856
tic = time.perf_counter()
@@ -892,3 +905,63 @@ def search_time_range(self,
892905
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
893906
logger.info('done search_time_range')
894907

908+
def get_timelines(self,
909+
usernames,
910+
job_name=None,
911+
tp=None,
912+
write_to_disk: Optional[Literal['csv', 'json', 'parquet', 'parquet_s3']] = None,
913+
fetch_profiles: bool = False,
914+
**kwargs):
915+
tic = time.perf_counter()
916+
if job_name is None:
917+
job_name = "timeline_%s" % user
918+
tp = tp or self.tp or TwintPool(is_tor=True)
919+
for user in usernames:
920+
logger.info('start user: %s', user)
921+
t_prev = time.perf_counter()
922+
now = date.today().strftime('%Y-%m-%d%H:%M:%S')
923+
tp.reset_config()
924+
try:
925+
df = tp._get_timeline(username=user, **kwargs)
926+
user_exists = True
927+
except SuspendedUser:
928+
logger.info(f'User {user} is suspended')
929+
df = None
930+
user_exists = False
931+
if df is not None:
932+
self._maybe_write_batch(
933+
df,
934+
write_to_disk,
935+
f'{job_name}/timelines/{user}',
936+
write_opts=kwargs.get('write_opts', self.write_opts)
937+
)
938+
939+
t_iter = time.perf_counter()
940+
logger.info(f'finished tp._get_timeline ({user}): {t_iter - t_prev:0.4f} seconds')
941+
t_prev = t_iter
942+
943+
if fetch_profiles:
944+
if user_exists:
945+
tp.reset_config()
946+
users_df = self.search_user_info_by_name(pd.DataFrame({'username': [user]}), tp)
947+
else:
948+
users_df = pd.DataFrame({'username': [user], 'suspended': [True]})
949+
if users_df is not None:
950+
self._maybe_write_batch(
951+
users_df,
952+
write_to_disk,
953+
f'{job_name}/profiles/{now}',
954+
write_opts=kwargs.get('write_opts', self.write_opts)
955+
)
956+
t_iter = time.perf_counter()
957+
logger.info(f'finished tp.search_user_info_by_name: {t_iter - t_prev:0.4f} seconds')
958+
t_prev = t_iter
959+
960+
yield df
961+
962+
toc = time.perf_counter()
963+
logger.info(f'finished twint loop in: {toc - tic:0.4f} seconds')
964+
logger.info('done get_timelines')
965+
966+
967+

0 commit comments

Comments
 (0)