Skip to content

Commit 09a1ead

Browse files
authored
Merge pull request #90 from TheDataRideAlongs/dev/fix-twint
2 parents 1573349 + 31e584f commit 09a1ead

15 files changed

Lines changed: 416 additions & 23 deletions

‎.gitignore‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,4 @@ __pycache__
167167
.idea
168168

169169
neo4jcreds.json
170+
output/

‎infra/pipelines/docker/Dockerfile‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
FROM graphistry/graphistry-blazing:v2.29.3
1+
#FROM graphistry/graphistry-blazing:v2.29.3
2+
3+
FROM python:3.8
24

35
RUN export DEBIAN_FRONTEND=noninteractive \
46
&& apt-get update \
57
&& apt-get install -y --no-install-recommends supervisor \
68
&& apt-get clean \
79
&& rm -rf /var/lib/apt/lists/*
810

9-
RUN source activate rapids \
10-
&& pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
11+
RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
1112
&& ( prefect agent install local > supervisord.conf )
1213

1314
COPY . .
1415

1516
#TODO find cleaner way to avoid talking to cloud server
16-
RUN source activate rapids && prefect backend server
17+
RUN prefect backend server
1718

1819
CMD ["./infra/pipelines/docker/entrypoint.sh"]

‎infra/pipelines/docker/datastream-Dockerfile‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
FROM python:3.7
1+
FROM python:3.8
22
RUN apt-get update \
33
&& apt-get install -y --no-install-recommends git vim tor \
44
&& apt-get clean \
55
&& rm -rf /var/lib/apt/lists/*
66

77
RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
8-
pandas pyarrow urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161
8+
pandas pyarrow s3fs urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161
99

1010
#RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git
11-
RUN pip install git+https://github.com/twintproject/twint.git
11+
RUN pip install git+https://github.com/graphistry/twint.git
1212
#git+https://github.com/lmeyerov/twint.git@patch-1#egg=twint
1313
#RUN pip install git+https://github.com/himanshudabas/twint.git@twint-fixes#egg=twint
1414

‎infra/pipelines/docker/datastream-docker-compose.yml‎

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@ version: '3'
1111

1212
services:
1313
data-stream:
14+
image: graphistry/datastream-agent
1415
build:
1516
context: ../../../
1617
dockerfile: infra/pipelines/docker/datastream-Dockerfile
1718
tty: true
1819
network_mode: 'bridge'
1920
volumes:
2021
- /home/lmeyerov2/neo4jcreds.json:/secrets/neo4jcreds.json:ro
21-
- ./../../../jobs:/app:ro
22+
- ./../../../jobs:/app:cached
23+
- ./../../../output:/output
2224
environment:
2325
JOB_FILE: ${JOB_FILE:-search_by_date_job.py}
2426
TOPIC: ${TOPIC:-covid}
@@ -28,6 +30,17 @@ services:
2830
PREFECT__SERVER__UI__PORT: ${PREFECT__SERVER__UI__PORT:-8080}
2931
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
3032
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
33+
DOMINO_START_DATE: ${DOMINO_START_DATE:-}
34+
DOMINO_DELAY_SEC: ${DOMINO_DELAY_SEC:-}
35+
DOMINO_STRIDE_SEC: ${DOMINO_STRIDE_SEC:-}
36+
DOMINO_HISTORIC_STRIDE_SEC: ${DOMINO_HISTORIC_STRIDE_SEC:-}
37+
DOMINO_TWINT_STRIDE_SEC: ${DOMINO_TWINT_STRIDE_SEC:-}
38+
DOMINO_END_DATE: ${DOMINO_END_DATE:-}
39+
DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-}
40+
DOMINO_SEARCH: ${DOMINO_SEARCH:-}
41+
DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-}
42+
DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-}
43+
DOMINO_COMPRESSION: ${DOMINO_COMPRESSION:-}
3144
logging:
3245
options:
3346
tag: 'ImageName:{{.ImageName}}/Name:{{.Name}}/ID:{{.ID}}/ImageFullID:{{.ImageFullID}}'

‎infra/pipelines/docker/datastream-entrypoint.sh‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
set -ex
33

4-
echo "Starting JOB_FILE ${JOB_FILE}, maybe TOPIC ${TOPIC}"
4+
echo "Starting JOB_FILE ${JOB_FILE}"
55

66
service tor start
77

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
#!/bin/bash
22

33
echo "Starting prefect executor daemon in foreground"
4-
source activate rapids && supervisord --nodaemon
4+
supervisord --nodaemon

‎infra/pipelines/docker/nonrapids-Dockerfile‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
FROM python:3.7
1+
FROM python:3.8
22
RUN apt-get update \
33
&& apt-get install -y --no-install-recommends supervisor \
44
&& apt-get clean \
55
&& rm -rf /var/lib/apt/lists/*
66

77
COPY . .
88
RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
9-
pandas git+https://github.com/twintproject/twint.git@origin/master#egg=twint \
9+
pandas git+https://github.com/graphistry/twint.git@origin/master#egg=twint \
1010
&& ( prefect agent install local > supervisord.conf )
1111
RUN prefect backend server
1212
RUN ["chmod","+x","./infra/pipelines/docker/nonrapids-entrypoint.sh"]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#!/bin/bash
2+
set -ex
3+
4+
DOMINO_JOB_NAME=${JOB_NAME:-historic_pfas_1}
5+
DOMINO_SEARCH=${SEARCH:-"pfas"}
6+
DOMINO_START_DATE=${START_DATE:-2022-01-01 00:00:00}
7+
DOMINO_STRIDE_SEC=${STRIDE_SEC:-30}
8+
DOMINO_HISTORIC_STRIDE_SEC=${HISTORIC_STRIDE_SEC:-86400}
9+
DOMINO_TWINT_STRIDE_SEC=${TWINT_STRIDE_SEC:-28800}
10+
DOMINO_WRITE_FORMAT=${WRITE_FORMAT:-parquet}
11+
DOMINO_S3_FILEPATH=${S3_FILEPATH:-dt-phase1}
12+
DOMINO_COMPRESSION=${COMPRESSION:-snappy}
13+
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
14+
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}
15+
16+
docker-compose -f datastream-docker-compose.yml -p ${DOMINO_JOB_NAME} down -v
17+
docker-compose -f datastream-docker-compose.yml build # --no-cache
18+
JOB_FILE="search_historic.py" \
19+
DOMINO_JOB_NAME=$DOMINO_JOB_NAME \
20+
DOMINO_SEARCH=$DOMINO_SEARCH \
21+
DOMINO_START_DATE=$DOMINO_START_DATE \
22+
DOMINO_STRIDE_SEC=$DOMINO_STRIDE_SEC \
23+
DOMINO_HISTORIC_STRIDE_SEC=$DOMINO_HISTORIC_STRIDE_SEC \
24+
DOMINO_TWINT_STRIDE_SEC=$TWINT_STRIDE_SEC \
25+
DOMINO_WRITE_FORMAT=$DOMINO_WRITE_FORMAT \
26+
DOMINO_S3_FILEPATH=$DOMINO_S3_FILEPATH \
27+
DOMINO_COMPRESSION=$DOMINO_COMPRESSION \
28+
AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
29+
AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
30+
docker-compose -f datastream-docker-compose.yml \
31+
-p ${DOMINO_JOB_NAME} \
32+
up \
33+
data-stream \
34+
$@

‎jobs/search_historic.py‎

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#!/usr/bin/env python
2+
# coding: utf-8
3+
4+
5+
import logging
6+
logger = logging.getLogger()
7+
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL
8+
9+
10+
11+
import json, os, pandas as pd, pendulum
12+
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
13+
from ProjectDomino.FirehoseJob import FirehoseJob
14+
from ProjectDomino.TwintPool import TwintPool
15+
from prefect.environments.storage import S3
16+
from prefect import context, Flow, task
17+
from prefect.schedules import IntervalSchedule
18+
from datetime import timedelta, datetime
19+
from prefect.engine.executors import DaskExecutor
20+
21+
22+
23+
S3_BUCKET = "wzy-project-domino"
24+
25+
pd.set_option('display.max_colwidth', None)
26+
pd.set_option('display.max_rows', 500)
27+
pd.set_option('display.max_columns', 500)
28+
pd.set_option('display.width', 1000)
29+
30+
def env_non_empty(x: str):
31+
return x in os.environ and os.environ[x]
32+
33+
stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
34+
historic_stride_sec = int(os.environ['DOMINO_HISTORIC_STRIDE_SEC']) if env_non_empty('DOMINO_HISTORIC_STRIDE_SEC') else 60 * 60 * 24
35+
twint_stride_sec = int(os.environ['DOMINO_TWINT_STRIDE_SEC']) if env_non_empty('DOMINO_TWINT_STRIDE_SEC') else round(historic_stride_sec/2)
36+
delay_sec = int(os.environ['DOMINO_DELAY_SEC']) if env_non_empty('DOMINO_DELAY_SEC') else 60
37+
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
38+
start_date = pendulum.parse(os.environ['DOMINO_START_DATE']) if env_non_empty('DOMINO_START_DATE') else datetime.datetime.now() - datetime.timedelta(days=365)
39+
search = os.environ['DOMINO_SEARCH'] if env_non_empty('DOMINO_SEARCH') else "covid OR corona OR virus OR pandemic"
40+
write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None
41+
42+
if write_format == 'parquet_s3':
43+
s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None
44+
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
45+
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
46+
compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy'
47+
48+
output_path = f'/output/{job_name}'
49+
os.makedirs(output_path, exist_ok=True)
50+
51+
52+
# FIXME unsafe when distributed
53+
task_num = -1
54+
55+
@task(log_stdout=True, skip_on_upstream_skip=True, max_retries=3, retry_delay=timedelta(seconds=30))
56+
def run_stream():
57+
58+
global task_num
59+
task_num = task_num + 1
60+
61+
start = start_date + timedelta(seconds=task_num * historic_stride_sec)
62+
current = start + timedelta(seconds=historic_stride_sec)
63+
print('------------------------')
64+
print('task %s with start %s: %s to %s', task_num, start_date, start, current)
65+
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
66+
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
67+
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
68+
#2020-10-10 16:07:30
69+
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
70+
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
71+
#2020-10-05 17:00:30-2020-10-05 17:01:00
72+
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
73+
tp = TwintPool(is_tor=True)
74+
fh = FirehoseJob(
75+
PARQUET_SAMPLE_RATE_TIME_S=30,
76+
save_to_neo=False,
77+
writers={},
78+
write_to_disk=write_format,
79+
write_opts=(
80+
{
81+
's3_filepath': s3_filepath,
82+
's3fs_options': {
83+
'key': AWS_ACCESS_KEY_ID,
84+
'secret': AWS_SECRET_ACCESS_KEY
85+
},
86+
'compression': compression
87+
}
88+
if write_format == 'parquet_s3' else
89+
{}
90+
)
91+
)
92+
93+
try:
94+
for df in fh.search_time_range(
95+
tp=tp,
96+
Search=search,
97+
Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"),
98+
Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"),
99+
job_name=job_name,
100+
Limit=10000000,
101+
stride_sec=twint_stride_sec
102+
):
103+
print('got: %s', df.shape if df is not None else 'None')
104+
except Exception as e:
105+
logger.error("job exception", exc_info=True)
106+
raise e
107+
print("task finished")
108+
109+
110+
schedule_opts = {
111+
'interval': timedelta(seconds=stride_sec),
112+
'start_date': pendulum.parse('2019-01-01 00:00:00')
113+
}
114+
logger.info(f'Schedule options: {schedule_opts}')
115+
logger.info(f'Task settings: stride_sec={stride_sec}, \
116+
historic_stride_sec={historic_stride_sec}, \
117+
twint_stride_sec={twint_stride_sec} \
118+
start_date={start_date}, \
119+
search={search}')
120+
121+
schedule = IntervalSchedule(**schedule_opts)
122+
storage = S3(bucket=S3_BUCKET)
123+
124+
#with Flow("covid-19 stream-single") as flow:
125+
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
126+
with Flow(f"{job_name} stream", schedule=schedule) as flow:
127+
run_stream()
128+
flow.run()
129+

‎jobs/search_live.py‎

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
# coding: utf-8
3+
4+
5+
import logging
6+
logger = logging.getLogger()
7+
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL
8+
9+
10+
11+
import json, os, pandas as pd, pendulum
12+
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
13+
from ProjectDomino.FirehoseJob import FirehoseJob
14+
from ProjectDomino.TwintPool import TwintPool
15+
from prefect.environments.storage import S3
16+
from prefect import context, Flow, task
17+
from prefect.schedules import IntervalSchedule
18+
from datetime import timedelta, datetime
19+
from prefect.engine.executors import DaskExecutor
20+
21+
22+
23+
S3_BUCKET = "wzy-project-domino"
24+
25+
pd.set_option('display.max_colwidth', None)
26+
pd.set_option('display.max_rows', 500)
27+
pd.set_option('display.max_columns', 500)
28+
pd.set_option('display.width', 1000)
29+
30+
def env_non_empty(x: str):
31+
return x in os.environ and os.environ[x]
32+
33+
stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
34+
delay_sec = int(os.environ['DOMINO_DELAY_SEC']) if env_non_empty('DOMINO_DELAY_SEC') else 60
35+
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
36+
search = os.environ['DOMINO_SEARCH'] if env_non_empty('DOMINO_SEARCH') else "covid OR corona OR virus OR pandemic"
37+
38+
@task(log_stdout=True, skip_on_upstream_skip=True)
39+
def run_stream():
40+
41+
start = context.scheduled_start_time - timedelta(seconds=delay_sec + stride_sec)
42+
current = start + timedelta(seconds=stride_sec)
43+
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
44+
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
45+
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
46+
#2020-10-10 16:07:30
47+
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
48+
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
49+
#2020-10-05 17:00:30-2020-10-05 17:01:00
50+
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
51+
tp = TwintPool(is_tor=True)
52+
fh = FirehoseJob(PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=False, writers={}, write_to_disk='json')
53+
54+
try:
55+
for df in fh.search_time_range(
56+
tp=tp,
57+
Search=search,
58+
Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"),
59+
Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"),
60+
job_name=job_name,
61+
Limit=10000000,
62+
stride_sec=stride_sec
63+
):
64+
logger.info('got: %s', len(df) if not (df is None) else 'None')
65+
logger.info('proceed to next df')
66+
except Exception as e:
67+
logger.error("job exception", exc_info=True)
68+
raise e
69+
logger.info("job finished")
70+
71+
72+
schedule_opts = {
73+
'interval': timedelta(seconds=stride_sec)
74+
}
75+
print(f'Schedule options: {schedule_opts}')
76+
77+
schedule = IntervalSchedule(**schedule_opts)
78+
storage = S3(bucket=S3_BUCKET)
79+
80+
#with Flow("covid-19 stream-single") as flow:
81+
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
82+
with Flow(f"{job_name} stream", schedule=schedule) as flow:
83+
run_stream()
84+
flow.run()
85+

0 commit comments

Comments
 (0)