Skip to content
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,4 @@ __pycache__
.idea

neo4jcreds.json
output/
9 changes: 5 additions & 4 deletions infra/pipelines/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
FROM graphistry/graphistry-blazing:v2.29.3
#FROM graphistry/graphistry-blazing:v2.29.3

FROM python:3.8

RUN export DEBIAN_FRONTEND=noninteractive \
&& apt-get update \
&& apt-get install -y --no-install-recommends supervisor \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN source activate rapids \
&& pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
&& ( prefect agent install local > supervisord.conf )

COPY . .

#TODO find cleaner way to avoid talking to cloud server
RUN source activate rapids && prefect backend server
RUN prefect backend server

CMD ["./infra/pipelines/docker/entrypoint.sh"]
6 changes: 3 additions & 3 deletions infra/pipelines/docker/datastream-Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
FROM python:3.7
FROM python:3.8
RUN apt-get update \
&& apt-get install -y --no-install-recommends git vim tor \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
pandas pyarrow urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161
pandas pyarrow s3fs urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161

#RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git
RUN pip install git+https://github.com/twintproject/twint.git
RUN pip install git+https://github.com/graphistry/twint.git
#git+https://github.com/lmeyerov/twint.git@patch-1#egg=twint
#RUN pip install git+https://github.com/himanshudabas/twint.git@twint-fixes#egg=twint

Expand Down
15 changes: 14 additions & 1 deletion infra/pipelines/docker/datastream-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ version: '3'

services:
data-stream:
image: graphistry/datastream-agent
build:
context: ../../../
dockerfile: infra/pipelines/docker/datastream-Dockerfile
tty: true
network_mode: 'bridge'
volumes:
- /home/lmeyerov2/neo4jcreds.json:/secrets/neo4jcreds.json:ro
- ./../../../jobs:/app:ro
- ./../../../jobs:/app:cached
- ./../../../output:/output
environment:
JOB_FILE: ${JOB_FILE:-search_by_date_job.py}
TOPIC: ${TOPIC:-covid}
Expand All @@ -28,6 +30,17 @@ services:
PREFECT__SERVER__UI__PORT: ${PREFECT__SERVER__UI__PORT:-8080}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
DOMINO_START_DATE: ${DOMINO_START_DATE:-}
DOMINO_DELAY_SEC: ${DOMINO_DELAY_SEC:-}
DOMINO_STRIDE_SEC: ${DOMINO_STRIDE_SEC:-}
DOMINO_HISTORIC_STRIDE_SEC: ${DOMINO_HISTORIC_STRIDE_SEC:-}
DOMINO_TWINT_STRIDE_SEC: ${DOMINO_TWINT_STRIDE_SEC:-}
DOMINO_END_DATE: ${DOMINO_END_DATE:-}
DOMINO_JOB_NAME: ${DOMINO_JOB_NAME:-}
DOMINO_SEARCH: ${DOMINO_SEARCH:-}
DOMINO_WRITE_FORMAT: ${DOMINO_WRITE_FORMAT:-}
DOMINO_S3_FILEPATH: ${DOMINO_S3_FILEPATH:-}
DOMINO_COMPRESSION: ${DOMINO_COMPRESSION:-}
logging:
options:
tag: 'ImageName:{{.ImageName}}/Name:{{.Name}}/ID:{{.ID}}/ImageFullID:{{.ImageFullID}}'
Expand Down
2 changes: 1 addition & 1 deletion infra/pipelines/docker/datastream-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -ex

echo "Starting JOB_FILE ${JOB_FILE}, maybe TOPIC ${TOPIC}"
echo "Starting JOB_FILE ${JOB_FILE}"

service tor start

Expand Down
2 changes: 1 addition & 1 deletion infra/pipelines/docker/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash

echo "Starting prefect executor daemon in foreground"
source activate rapids && supervisord --nodaemon
supervisord --nodaemon
4 changes: 2 additions & 2 deletions infra/pipelines/docker/nonrapids-Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM python:3.7
FROM python:3.8
RUN apt-get update \
&& apt-get install -y --no-install-recommends supervisor \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY . .
RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
pandas git+https://github.com/twintproject/twint.git@origin/master#egg=twint \
pandas git+https://github.com/graphistry/twint.git@origin/master#egg=twint \
&& ( prefect agent install local > supervisord.conf )
RUN prefect backend server
RUN ["chmod","+x","./infra/pipelines/docker/nonrapids-entrypoint.sh"]
Expand Down
34 changes: 34 additions & 0 deletions infra/pipelines/docker/search_historic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash
set -ex

DOMINO_JOB_NAME=${JOB_NAME:-historic_pfas_1}
DOMINO_SEARCH=${SEARCH:-"pfas"}
DOMINO_START_DATE=${START_DATE:-2022-01-01 00:00:00}
DOMINO_STRIDE_SEC=${STRIDE_SEC:-30}
DOMINO_HISTORIC_STRIDE_SEC=${HISTORIC_STRIDE_SEC:-86400}
DOMINO_TWINT_STRIDE_SEC=${TWINT_STRIDE_SEC:-28800}
DOMINO_WRITE_FORMAT=${WRITE_FORMAT:-parquet}
DOMINO_S3_FILEPATH=${S3_FILEPATH:-dt-phase1}
DOMINO_COMPRESSION=${COMPRESSION:-snappy}
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}

docker-compose -f datastream-docker-compose.yml -p ${DOMINO_JOB_NAME} down -v
docker-compose -f datastream-docker-compose.yml build # --no-cache
JOB_FILE="search_historic.py" \
DOMINO_JOB_NAME=$DOMINO_JOB_NAME \
DOMINO_SEARCH=$DOMINO_SEARCH \
DOMINO_START_DATE=$DOMINO_START_DATE \
DOMINO_STRIDE_SEC=$DOMINO_STRIDE_SEC \
DOMINO_HISTORIC_STRIDE_SEC=$DOMINO_HISTORIC_STRIDE_SEC \
DOMINO_TWINT_STRIDE_SEC=$TWINT_STRIDE_SEC \
DOMINO_WRITE_FORMAT=$DOMINO_WRITE_FORMAT \
DOMINO_S3_FILEPATH=$DOMINO_S3_FILEPATH \
DOMINO_COMPRESSION=$DOMINO_COMPRESSION \
AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
docker-compose -f datastream-docker-compose.yml \
-p ${DOMINO_JOB_NAME} \
up \
data-stream \
$@
129 changes: 129 additions & 0 deletions jobs/search_historic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python
# coding: utf-8


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL



import json, os, pandas as pd, pendulum
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from prefect.engine.executors import DaskExecutor



S3_BUCKET = "wzy-project-domino"

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

def env_non_empty(x: str):
return x in os.environ and os.environ[x]

stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
historic_stride_sec = int(os.environ['DOMINO_HISTORIC_STRIDE_SEC']) if env_non_empty('DOMINO_HISTORIC_STRIDE_SEC') else 60 * 60 * 24
twint_stride_sec = int(os.environ['DOMINO_TWINT_STRIDE_SEC']) if env_non_empty('DOMINO_TWINT_STRIDE_SEC') else round(historic_stride_sec/2)
delay_sec = int(os.environ['DOMINO_DELAY_SEC']) if env_non_empty('DOMINO_DELAY_SEC') else 60
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
start_date = pendulum.parse(os.environ['DOMINO_START_DATE']) if env_non_empty('DOMINO_START_DATE') else datetime.datetime.now() - datetime.timedelta(days=365)
search = os.environ['DOMINO_SEARCH'] if env_non_empty('DOMINO_SEARCH') else "covid OR corona OR virus OR pandemic"
write_format = os.environ['DOMINO_WRITE_FORMAT'] if env_non_empty('DOMINO_WRITE_FORMAT') else None

if write_format == 'parquet_s3':
s3_filepath = os.environ['DOMINO_S3_FILEPATH'] if env_non_empty('DOMINO_S3_FILEPATH') else None
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
compression = os.environ['DOMINO_COMPRESSION'] if env_non_empty('DOMINO_COMPRESSION') else 'snappy'

output_path = f'/output/{job_name}'
os.makedirs(output_path, exist_ok=True)


# FIXME unsafe when distributed
task_num = -1

@task(log_stdout=True, skip_on_upstream_skip=True, max_retries=3, retry_delay=timedelta(seconds=30))
def run_stream():

global task_num
task_num = task_num + 1

start = start_date + timedelta(seconds=task_num * historic_stride_sec)
current = start + timedelta(seconds=historic_stride_sec)
print('------------------------')
print('task %s with start %s: %s to %s', task_num, start_date, start, current)
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#2020-10-10 16:07:30
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
tp = TwintPool(is_tor=True)
fh = FirehoseJob(
PARQUET_SAMPLE_RATE_TIME_S=30,
save_to_neo=False,
writers={},
write_to_disk=write_format,
write_opts=(
{
's3_filepath': s3_filepath,
's3fs_options': {
'key': AWS_ACCESS_KEY_ID,
'secret': AWS_SECRET_ACCESS_KEY
},
'compression': compression
}
if write_format == 'parquet_s3' else
{}
)
)

try:
for df in fh.search_time_range(
tp=tp,
Search=search,
Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"),
Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"),
job_name=job_name,
Limit=10000000,
stride_sec=twint_stride_sec
):
print('got: %s', df.shape if df is not None else 'None')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
print("task finished")


schedule_opts = {
'interval': timedelta(seconds=stride_sec),
'start_date': pendulum.parse('2019-01-01 00:00:00')
}
logger.info(f'Schedule options: {schedule_opts}')
logger.info(f'Task settings: stride_sec={stride_sec}, \
historic_stride_sec={historic_stride_sec}, \
twint_stride_sec={twint_stride_sec} \
start_date={start_date}, \
search={search}')

schedule = IntervalSchedule(**schedule_opts)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow(f"{job_name} stream", schedule=schedule) as flow:
run_stream()
flow.run()

85 changes: 85 additions & 0 deletions jobs/search_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python
# coding: utf-8


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL



import json, os, pandas as pd, pendulum
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from prefect.engine.executors import DaskExecutor



S3_BUCKET = "wzy-project-domino"

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

def env_non_empty(x: str):
return x in os.environ and os.environ[x]

stride_sec = int(os.environ['DOMINO_STRIDE_SEC']) if env_non_empty('DOMINO_STRIDE_SEC') else 30
delay_sec = int(os.environ['DOMINO_DELAY_SEC']) if env_non_empty('DOMINO_DELAY_SEC') else 60
job_name = os.environ['DOMINO_JOB_NAME'] if env_non_empty('DOMINO_JOB_NAME') else "covid"
search = os.environ['DOMINO_SEARCH'] if env_non_empty('DOMINO_SEARCH') else "covid OR corona OR virus OR pandemic"

@task(log_stdout=True, skip_on_upstream_skip=True)
def run_stream():

start = context.scheduled_start_time - timedelta(seconds=delay_sec + stride_sec)
current = start + timedelta(seconds=stride_sec)
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#2020-10-10 16:07:30
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
tp = TwintPool(is_tor=True)
fh = FirehoseJob(PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=False, writers={}, write_to_disk='json')

try:
for df in fh.search_time_range(
tp=tp,
Search=search,
Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"),
Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"),
job_name=job_name,
Limit=10000000,
stride_sec=stride_sec
):
logger.info('got: %s', len(df) if not (df is None) else 'None')
logger.info('proceed to next df')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
logger.info("job finished")


schedule_opts = {
'interval': timedelta(seconds=stride_sec)
}
print(f'Schedule options: {schedule_opts}')

schedule = IntervalSchedule(**schedule_opts)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow(f"{job_name} stream", schedule=schedule) as flow:
run_stream()
flow.run()

16 changes: 16 additions & 0 deletions jobs/search_pfas.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash
set -ex

cd ../infra/pipelines/docker/

JOB_NAME="pfas_pfoa_pfos" \
SEARCH='pfas OR pfoa OR pfos' \
START_DATE="2022-01-01 00:00:00" \
HISTORIC_STRIDE_SEC="`python -c 'print(60 * 60 * 24)'`" \
TWINT_STRIDE_SEC="`python -c 'print(60 * 60 * 8)'`" \
STRIDE_SEC="`python -c 'print(30 * 1)'`" \
WRITE_FORMAT="parquet_s3" \
S3_FILEPATH="dt-phase1" \
AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
./search_historic.sh $@
Loading