Skip to content

Commit 256df48

Browse files
authored
Merge pull request #20 from TheDataRideAlongs/wzy/rehydratePipeline
Rehydrate pipeline
2 parents 83a2294 + b72eb75 commit 256df48

1 file changed

Lines changed: 202 additions & 0 deletions

File tree

‎Pipeline.py‎

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
from prefect import Flow, Client, task
2+
from prefect.tasks.shell import ShellTask
3+
import arrow, ast, graphistry, json, os, pprint
4+
import pandas as pd
5+
import numpy as np
6+
from pathlib import Path
7+
from modules.FirehoseJob import FirehoseJob
8+
from datetime import timedelta, datetime
9+
from prefect.schedules import IntervalSchedule
10+
import prefect
11+
from prefect.engine.signals import ENDRUN
12+
from prefect.engine.state import Skipped
13+
14+
@task(log_stdout=True, skip_on_upstream_skip=True)
15+
def load_creds():
16+
with open('twittercreds.json') as json_file:
17+
creds = json.load(json_file)
18+
print([{k: "zzz" for k in creds[0].keys()}])
19+
return creds
20+
21+
@task(log_stdout=True, skip_on_upstream_skip=True)
22+
def load_path():
23+
data_dirs = ['COVID-19-TweetIDs/2020-01', 'COVID-19-TweetIDs/2020-02', 'COVID-19-TweetIDs/2020-03']
24+
25+
timestamp = None
26+
if 'backfill_timestamp' in prefect.context:
27+
timestamp = arrow.get(prefect.context['backfill_timestamp'])
28+
else:
29+
timestamp = prefect.context['scheduled_start_time']
30+
print('TIMESTAMP = ', timestamp)
31+
suffix = timestamp.strftime('%Y-%m-%d-%H')
32+
33+
for data_dir in data_dirs:
34+
if os.path.isdir(data_dir):
35+
for path in Path(data_dir).iterdir():
36+
if path.name.endswith('{}.txt'.format(suffix)):
37+
print(path)
38+
return str(path)
39+
else:
40+
print('WARNING: not a dir', data_dir)
41+
# TODO: (wzy) Figure out how to cancel this gracefully
42+
raise ENDRUN(state=Skipped())
43+
44+
@task(log_stdout=True, skip_on_upstream_skip=True)
45+
def clean_timeline_tweets(pdf):
46+
return pdf.rename(columns={'id': 'status_id', 'id_str': 'status_id_str'})
47+
48+
@task(log_stdout=True, skip_on_upstream_skip=True)
49+
def clean_datetimes(pdf):
50+
print('cleaning datetimes...')
51+
pdf = pdf.assign(created_at=pd.to_datetime(pdf['created_at']))
52+
pdf = pdf.assign(created_date=pdf['created_at'].apply(lambda dt: dt.timestamp()))
53+
print(' ...cleaned')
54+
return pdf
55+
56+
#some reason always False
57+
#this seems to match full_text[:2] == 'RT'
58+
@task(log_stdout=True, skip_on_upstream_skip=True)
59+
def clean_retweeted(pdf):
60+
return pdf.assign(retweeted=pdf['retweeted_status'] != 'None')
61+
62+
def update_to_type(row):
63+
if row['is_quote_status']:
64+
return 'retweet_quote'
65+
if row['retweeted']:
66+
return 'retweet'
67+
if row['in_reply_to_status_id'] is not None and row['in_reply_to_status_id'] > 0:
68+
return 'reply'
69+
return 'original'
70+
71+
@task(log_stdout=True, skip_on_upstream_skip=True)
72+
def tag_status_type(pdf):
73+
##only materialize required fields..
74+
print('tagging status...')
75+
pdf2 = pdf\
76+
.assign(status_type=pdf[['is_quote_status', 'retweeted', 'in_reply_to_status_id']].apply(update_to_type, axis=1))
77+
print(' ...tagged')
78+
return pdf2
79+
80+
def try_load(s):
81+
try:
82+
out = ast.literal_eval(s)
83+
return {
84+
k if type(k) == str else str(k): out[k]
85+
for k in out.keys()
86+
}
87+
except:
88+
if s != 0.0:
89+
print('bad s', s)
90+
return {}
91+
92+
def flatten_status_col(pdf, col, status_type, prefix):
93+
print('flattening %s...' % col)
94+
print(' ', pdf.columns)
95+
#retweet_status -> hash -> lookup json for hash -> pull out id/created_at/user_id
96+
pdf_hashed = pdf.assign(hashed=pdf[col].apply(hash))
97+
retweets = pdf_hashed[ pdf_hashed['status_type'] == status_type ][['hashed', col]]\
98+
.drop_duplicates('hashed').reset_index(drop=True)
99+
retweets_flattened = pd.io.json.json_normalize(
100+
retweets[col].replace("(").replace(")")\
101+
.apply(try_load))
102+
print(' ... fixing dates')
103+
created_at_datetime = pd.to_datetime(retweets_flattened['created_at'])
104+
created_at = np.full_like(created_at_datetime, np.nan, dtype=np.float64)
105+
created_at[created_at_datetime.notnull()] = created_at_datetime[created_at_datetime.notnull()].apply(lambda dt: dt.timestamp())
106+
retweets_flattened = retweets_flattened.assign(
107+
created_at = created_at,
108+
user_id = retweets_flattened['user.id'])
109+
retweets = retweets[['hashed']]\
110+
.assign(**{
111+
prefix + c: retweets_flattened[c]
112+
for c in retweets_flattened if c in ['id', 'created_at', 'user_id']
113+
})
114+
print(' ... remerging')
115+
pdf_with_flat_retweets = pdf_hashed.merge(retweets, on='hashed', how='left').drop(columns='hashed')
116+
print(' ...flattened', pdf_with_flat_retweets.shape)
117+
return pdf_with_flat_retweets
118+
119+
@task(log_stdout=True, skip_on_upstream_skip=True)
120+
def flatten_retweets(pdf):
121+
print('flattening retweets...')
122+
pdf2 = flatten_status_col(pdf, 'retweeted_status', 'retweet', 'retweet_')
123+
print(' ...flattened', pdf2.shape)
124+
return pdf2
125+
126+
@task(log_stdout=True, skip_on_upstream_skip=True)
127+
def flatten_quotes(pdf):
128+
print('flattening quotes...')
129+
pdf2 = flatten_status_col(pdf, 'quoted_status', 'retweet_quote', 'quote_')
130+
print(' ...flattened', pdf2.shape)
131+
return pdf2
132+
133+
@task(log_stdout=True, skip_on_upstream_skip=True)
134+
def flatten_users(pdf):
135+
print('flattening users')
136+
pdf_user_cols = pd.io.json.json_normalize(pdf['user'].replace("(").replace(")").apply(ast.literal_eval))
137+
pdf2 = pdf.assign(**{
138+
'user_' + c: pdf_user_cols[c]
139+
for c in pdf_user_cols if c in [
140+
'id', 'screen_name', 'created_at', 'followers_count', 'friends_count', 'favourites_count',
141+
'utc_offset', 'time_zone', 'verified', 'statuses_count', 'profile_image_url',
142+
'name', 'description'
143+
]})
144+
print(' ... fixing dates')
145+
pdf2 = pdf2.assign(user_created_at=pd.to_datetime(pdf2['user_created_at']).apply(lambda dt: dt.timestamp()))
146+
print(' ...flattened')
147+
return pdf2
148+
149+
@task(log_stdout=True, skip_on_upstream_skip=True)
150+
def load_tweets(creds, path):
151+
print(path)
152+
fh = FirehoseJob(creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=prefect.context.get('save_to_neo', False))
153+
cnt = 0
154+
data = []
155+
for arr in fh.process_id_file(path, job_name="500m_COVID-REHYDRATE"):
156+
data.append(arr.to_pandas())
157+
print('{}/{}'.format(len(data), len(arr)))
158+
cnt += len(arr)
159+
print('TOTAL: ' + str(cnt))
160+
data = pd.concat(data, ignore_index=True, sort=False)
161+
if len(data) == 0:
162+
raise ENDRUN(state=Skipped())
163+
return data
164+
165+
@task(log_stdout=True, skip_on_upstream_skip=True)
166+
def sample(tweets):
167+
print('responses shape', tweets.shape)
168+
print(tweets.columns)
169+
print(tweets.sample(5))
170+
171+
schedule = IntervalSchedule(
172+
# start_date=datetime(2020, 1, 20),
173+
# interval=timedelta(hours=1),
174+
start_date=datetime.now() + timedelta(seconds=1),
175+
interval=timedelta(hours=1),
176+
)
177+
178+
# with Flow("Rehydration Pipeline", schedule=schedule) as flow:
179+
with Flow("Rehydration Pipeline") as flow:
180+
creds = load_creds()
181+
path_list = load_path()
182+
tweets = load_tweets(creds, path_list)
183+
tweets = clean_timeline_tweets(tweets)
184+
tweets = clean_datetimes(tweets)
185+
tweets = clean_retweeted(tweets)
186+
tweets = tag_status_type(tweets)
187+
tweets = flatten_retweets(tweets)
188+
tweets = flatten_quotes(tweets)
189+
tweets = flatten_users(tweets)
190+
191+
sample(tweets)
192+
193+
LOCAL_MODE = False
194+
195+
if LOCAL_MODE:
196+
with prefect.context(
197+
backfill_timestamp=datetime(2020, 2, 6, 9),
198+
):
199+
flow.run()
200+
else:
201+
flow.register(project_name="rehydrate")
202+
flow.run_agent()

0 commit comments

Comments
 (0)