Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions infra/pipelines/docker/datastream-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*



RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
pandas pyarrow urlextract git+https://github.com/homm/yurl.git@1943161973aeb3b3cf2e1e9de6671673b8356161

RUN echo "ok6" && pip install git+https://github.com/TheDataRideAlongs/twint.git
#RUN pip install git+https://github.com/twintproject/twint.git
#git+https://github.com/lmeyerov/twint.git@patch-1#egg=twint

#FIXME this should be part of entrypoint / service config?
#RUN service tor start
COPY ./modules /modules/ProjectDomino
COPY ./infra/pipelines/docker/datastream-entrypoint.sh /entrypoint.sh

ENV JOB_FILE=search_by_date_job.py

COPY ./modules /app/ProjectDomino
COPY ./infra/pipelines/docker/jobs /app
ENTRYPOINT ["/entrypoint.sh"]
#ENTRYPOINT ["/bin/bash", "-c", 'echo "ok"']

HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \
CMD curl -sf --socks5-hostname localhost:9050 https://check.torproject.org | grep Congrat
Expand Down
5 changes: 3 additions & 2 deletions infra/pipelines/docker/datastream-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ services:
dockerfile: infra/pipelines/docker/datastream-Dockerfile
tty: true
network_mode: 'bridge'
command: sh -c "pwd && ls && service tor start && python3 /app/search_by_date_job.py"
volumes:
- /home/codywebb/ProjectDomino/infra/pipelines/docker/jobs/neo4jcreds.json:/secrets/neo4jcreds.json:ro
- /home/lmeyerov2/neo4jcreds.json:/secrets/neo4jcreds.json:ro
- ./jobs:/app:ro
environment:
JOB_FILE: ${JOB_FILE:-search_by_date_job.py}
PREFECT__SERVER__HOST: ${PREFECT__SERVER__HOST:-http://host.docker.internal}
PREFECT__SERVER__PORT: ${PREFECT__SERVER__PORT:-4200}
PREFECT__SERVER__UI__HOST: ${PREFECT__SERVER__UI__HOST:-http://host.docker.internal}
Expand Down
10 changes: 10 additions & 0 deletions infra/pipelines/docker/datastream-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
set -ex

echo "Starting JOB_FILE ${JOB_FILE}"

service tor start

export PYTHONPATH="/modules:${PYTHONPATH}"

python3 /app/${JOB_FILE} $@
71 changes: 15 additions & 56 deletions infra/pipelines/docker/jobs/search_by_date_job.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,33 @@
#!/usr/bin/env python
# coding: utf-8

# In[26]:





# In[27]:


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL




# In[28]:


import json, pandas as pd
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import Flow,task
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from random import randrange
from prefect.engine.executors import DaskExecutor
import time
import random


# In[29]:





# In[30]:


S3_BUCKET = "wzy-project-domino"


# In[31]:


pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


# ## task

# In[33]:


def random_date(start, end):
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = randrange(int_delta)
return start + timedelta(seconds=random_second)

def get_creds():
neo4j_creds = None
with open('/secrets/neo4jcreds.json') as json_file:
Expand All @@ -75,42 +37,39 @@ def get_creds():
@task(log_stdout=True, skip_on_upstream_skip=True)
def run_stream():
creds = get_creds()
start = datetime.strptime("2020-03-11 20:00:00", "%Y-%m-%d %H:%M:%S")
current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#rand_dt=random_date(start, current)

start = context.scheduled_start_time - timedelta(seconds=60)
current = start + timedelta(seconds=30)
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#2020-10-10 16:07:30
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
tp = TwintPool(is_tor=True)
fh = FirehoseJob(neo4j_creds=creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=True, writers={})
try:
search = "covid OR corona OR virus OR pandemic"
job_name = "covid multi test"
limit = 10000000
for df in fh.search_time_range(tp=tp, Search=search, Since=str(start), Until=str(current), job_name=job_name, Limit=10000000, stride_sec=30):
for df in fh.search_time_range(tp=tp, Search=search, Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"), Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"), job_name=job_name, Limit=10000000, stride_sec=30):
logger.info('got: %s', len(df) if not (df is None) else 'None')
logger.info('proceed to next df')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
logger.info("job finished")

# In[ ]:


schedule = IntervalSchedule(
start_date=datetime(2020, 9, 5),
interval=timedelta(seconds=10),
interval=timedelta(seconds=30),
)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow("covid-19 stream-single") as flow:
with Flow("covid-19 stream", schedule=schedule) as flow:
run_stream()
flow.run()


# In[ ]:




134 changes: 134 additions & 0 deletions infra/pipelines/docker/jobs/track_qanon1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/usr/bin/env python
# coding: utf-8


import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) #DEBUG, INFO, WARNING, ERROR, CRITICAL



import json, pandas as pd
from ProjectDomino.Neo4jDataAccess import Neo4jDataAccess
from ProjectDomino.FirehoseJob import FirehoseJob
from ProjectDomino.TwintPool import TwintPool
from prefect.environments.storage import S3
from prefect import context, Flow, task
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
from prefect.engine.executors import DaskExecutor



S3_BUCKET = "wzy-project-domino"

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


def get_creds():
neo4j_creds = None
with open('/secrets/neo4jcreds.json') as json_file:
neo4j_creds = json.load(json_file)
return neo4j_creds

@task(log_stdout=True, skip_on_upstream_skip=True)
def run_stream():
creds = get_creds()

start = context.scheduled_start_time - timedelta(seconds=60)
current = start + timedelta(seconds=30)
#start = datetime.strptime("2020-10-06 22:10:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime("2020-10-10 16:08:00", "%Y-%m-%d %H:%M:%S")
#current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
#2020-10-10 16:07:30
#2020-10-11 06:29:00 to 2020-10-11 06:29:30:
#2020-10-11 18:45:00 to 2020-10-11 18:45:30:
#2020-10-05 17:00:30-2020-10-05 17:01:00
# 2020-10-06 22:10:00 to 2020-10-06 22:10:30:
terms = [
"17anon",
"adrenochromeharvesting",
"agenda21",
"C0vid-19",
"cannibalclub",
"exposepedogate",
"fake virus",
"great awakening",
"HisNameWasSethRich",
"outofshadows",
"Patriot Defender",
"pedovore",
"pedowood",
"qanon",
"qanonmap",
"qanons",
"qanonymous",
"qanontruth",
"qanon2020",
"qdrop",
"qanondrops",
"qanonposts",
"qanonpub",
"qsentme",
"QANONWORLDWIDE",
"QANON_WORLDWIDE",
"QAnonUS",
"QAnonUSA",
"QAnonUK",
"QAnonCanada",
"greatawakening",
"thegreateawakening",
"qmap_pub",
"Rothshilds",
"save the children",
"saveourchildren",
"savethechildren",
"saveourplanet",
"sheepnomore",
"silentnomore",
"taketheoath",
"digitalsoldiers",
"trumprussiamatrix",
"trusttheplan",
"weareq",
"weareallq",
"WWG1WGA",
"wwg",
"wga",
"wgaworldwide",
"wwg1wgaworldwide",
"wwg1wga_worldwide"
]

search_term = " OR ".join(['"' + t + '"' for t in terms[:20]])
logger.info('Using :20: %s', search_term)


tp = TwintPool(is_tor=True)
fh = FirehoseJob(neo4j_creds=creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=True, writers={})
try:
search = search_term
job_name = "qanon"
limit = 10000000
for df in fh.search_time_range(tp=tp, Search=search, Since=datetime.strftime(start, "%Y-%m-%d %H:%M:%S"), Until=datetime.strftime(current, "%Y-%m-%d %H:%M:%S"), job_name=job_name, Limit=10000000, stride_sec=30):
logger.info('got: %s', len(df) if not (df is None) else 'None')
logger.info('proceed to next df')
except Exception as e:
logger.error("job exception", exc_info=True)
raise e
logger.info("job finished")

schedule = IntervalSchedule(
interval=timedelta(seconds=30),
)
storage = S3(bucket=S3_BUCKET)

#with Flow("covid-19 stream-single") as flow:
#with Flow("covid-19 stream", storage=storage, schedule=schedule) as flow:
with Flow("qanon stream", schedule=schedule) as flow:
run_stream()
flow.run()

Loading