Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(parameterized flows)
  • Loading branch information
lmeyerov committed Oct 16, 2020
commit dc9a0d74edd9916f2ebbb0919299dccf7c939e8c
3 changes: 3 additions & 0 deletions infra/pipelines/docker/datastream-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git

COPY ./modules /app/ProjectDomino
COPY ./infra/pipelines/docker/jobs /app
COPY ./infra/pipelines/docker/datastream-entrypoint.sh /entrypoint.sh

ENTRYPOINT ["/entrypoint.sh"]

HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \
CMD curl -sf --socks5-hostname localhost:9050 https://check.torproject.org | grep Congrat
Expand Down
1 change: 1 addition & 0 deletions infra/pipelines/docker/datastream-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
volumes:
- /home/codywebb/ProjectDomino/infra/pipelines/docker/jobs/neo4jcreds.json:/secrets/neo4jcreds.json:ro
environment:
JOB_FILE: search_by_date_job.py
PREFECT__SERVER__HOST: ${PREFECT__SERVER__HOST:-http://host.docker.internal}
PREFECT__SERVER__PORT: ${PREFECT__SERVER__PORT:-4200}
PREFECT__SERVER__UI__HOST: ${PREFECT__SERVER__UI__HOST:-http://host.docker.internal}
Expand Down
6 changes: 6 additions & 0 deletions infra/pipelines/docker/datastream-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
set -ex

service tor start

python3 /app/${JOB_FILE} $@
71 changes: 15 additions & 56 deletions infra/pipelines/docker/jobs/search_by_date_job.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,33 @@
#!/usr/bin/env python
# coding: utf-8

# In[26]:





# In[27]:


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL




# In[28]:


import json, pandas as pd
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import Flow,task
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from random import randrange
from prefect.engine.executors import DaskExecutor
import time
import random


# In[29]:





# In[30]:


S3_BUCKET = "wzy-project-domino"


# In[31]:


pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


# ## task

# In[33]:


def random_date(start, end):
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)

def get_creds():
neo4j_creds = None
with open('/secrets/neo4jcreds.json') as json_file:
Expand All @@ -75,42 +37,39 @@ def get_creds():
@task(log_stdout=True, skip_on_upstream_skip=True)
def run_stream():
creds = get_creds()
start = datetime.strptime("2020-03-11 20:00:00", "%Y-%m-%d %H:%M:%S")
current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#rand_dt=random_date(start, current)

start = context.scheduled_start_time - timedelta(seconds=60)
current = start + timedelta(seconds=30)
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#2020-10-10 16:07:30
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
tp = TwintPool(is_tor=True)
fh = FirehoseJob(neo4j_creds=creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=True, writers={})
try:
search = "covid OR corona OR virus OR pandemic"
job_name = "covid multi test"
limit = 10000000
for df in fh.search_time_range(tp=tp, Search=search, Since=str(start), Until=str(current), job_name=job_name, Limit=10000000, stride_sec=30):
for df in fh.search_time_range(tp=tp, Search=search, Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"), Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"), job_name=job_name, Limit=10000000, stride_sec=30):
logger.info('got: %s', len(df) if not (df is None) else 'None')
logger.info('proceed to next df')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
logger.info("job finished")

# In[ ]:


schedule = IntervalSchedule(
start_date=datetime(2020, 9, 5),
interval=timedelta(seconds=10),
interval=timedelta(seconds=30),
)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow("covid-19 stream-single") as flow:
with Flow("covid-19 stream", schedule=schedule) as flow:
run_stream()
flow.run()


# In[ ]:




10 changes: 9 additions & 1 deletion infra/pipelines/docker/search.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
#!/bin/bash
set -ex

docker-compose -f datastream-docker-compose.yml build && docker-compose -f datastream-docker-compose.yml up -d && docker-compose -f datastream-docker-compose.yml logs -f -t --tail=1
FILE=${JOB_FILE:-search_by_date_job.py}
PROJECT=${PROJECT_NAME:-docker}


docker-compose -f datastream-docker-compose.yml build
JOB_FILE="search_by_date_job.py" docker-compose -f datastream-docker-compose.yml -p ${PROJECT} up -d data-stream
sleep 5
docker-compose -f datastream-docker-compose.yml -p ${PROJECT} logs -f -t --tail=100
2 changes: 2 additions & 0 deletions modules/TwintPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def __init__(self, is_tor=False):

def twint_loop(self, since, until, stride_sec=600, limit=None):
def get_unix_time(time_str):
if isinstance(time_str, datetime):
return time_str
return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')

since = get_unix_time(since)
Expand Down