Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,6 @@ lib
lib64
__pycache__

neo4jcreds.json
.idea

neo4jcreds.json
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/ProjectDomino.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion infra/pipelines/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
image: prefect-agent:0.0.5
build:
context: ../../../
dockerfile: ./infra/pipelines/docker/Dockerfile
dockerfile: ./infra/pipelines/docker/nonrapids-Dockerfile
container_name: prefect-agent
network_mode: 'bridge'
restart: unless-stopped
Expand Down
13 changes: 13 additions & 0 deletions infra/pipelines/docker/nonrapids-Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.7
RUN apt-get update \
&& apt-get install -y --no-install-recommends supervisor \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY . .
RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guessing we'll also need pandas/twint/bs4

pandas twint \
&& ( prefect agent install local > supervisord.conf )
RUN prefect backend server
RUN ["chmod","+x","./infra/pipelines/docker/nonrapids-entrypoint.sh"]
CMD ["./infra/pipelines/docker/nonrapids-entrypoint.sh"]
38 changes: 38 additions & 0 deletions infra/pipelines/docker/nonrapids-docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
########
#
# Run from git root's parent: with .env in local folder and ProjectDomino/ inside
#
# $ touch .env
# $ sudo docker-compose -f ./ProjectDomino/infra/pipelines/docker/docker-compose.yml up -d prefect-agent
#
########

version: '3.4'

services:
##############################################################################
tor:
build: ./tor
network_mode: 'bridge'
restart: always
ports:
- 127.0.0.1:9050:9050
##############################################################################
prefect-agent:
image: prefect-agent:0.0.5
build:
context: ../../../
dockerfile: ./infra/pipelines/docker/nonrapids-Dockerfile
container_name: prefect-agent
network_mode: 'bridge'
restart: unless-stopped
environment:
PREFECT__SERVER__HOST: ${PREFECT__SERVER__HOST:-http://host.docker.internal}
PREFECT__SERVER__PORT: ${PREFECT__SERVER__PORT:-4200}
PREFECT__SERVER__UI__HOST: ${PREFECT__SERVER__UI__HOST:-http://host.docker.internal}
PREFECT__SERVER__UI__PORT: ${PREFECT__SERVER__UI__PORT:-8080}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
logging:
options:
tag: 'ImageName:{{.ImageName}}/Name:{{.Name}}/ID:{{.ID}}/ImageFullID:{{.ImageFullID}}'
4 changes: 4 additions & 0 deletions infra/pipelines/docker/nonrapids-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

echo "Starting prefect executor daemon in foreground"
supervisord --nodaemon
23 changes: 23 additions & 0 deletions infra/pipelines/docker/tor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM alpine

ARG TARGETPLATFORM
ARG BUILDPLATFORM
ARG BUILD_DATE
ARG VCS_REF
ARG VERSION



RUN apk add --no-cache curl tor && \
sed "1s/^/SocksPort 0.0.0.0:9050\n/" /etc/tor/torrc.sample > /etc/tor/torrc

EXPOSE 9050

HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \
CMD curl -s --socks5 127.0.0.1:9050 'https://check.torproject.org/' | grep -qm1 Congratulations

VOLUME ["/var/lib/tor"]

USER tor

CMD ["tor"]
71 changes: 27 additions & 44 deletions modules/FirehoseJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from .Timer import Timer
from .TwarcPool import TwarcPool
from .Neo4jDataAccess import Neo4jDataAccess
from .StatusArrow import KNOWN_FIELDS
from .TwintPool import TwintPool

import logging
logger = logging.getLogger('fh')
Expand Down Expand Up @@ -103,50 +105,6 @@
# 'url': pa.string()
# }))})

### When dtype -> arrow ambiguious, override
KNOWN_FIELDS = [
[0, 'contributors', pa.string()],
[1, 'coordinates', pa.string()],
[2, 'created_at', pa.string()],

#[3, 'display_text_range', pa.list_(pa.int64())],
[3, 'display_text_range', pa.string()],

[4, 'entities', pa.string()],
[5, 'extended_entities', pa.string()], #extended_entities_t ],
[7, 'favorited', pa.bool_()],
[8, 'favorite_count', pa.int64()],
[9, 'full_text', pa.string()],
[10, 'geo', pa.string()],
[11, 'id', pa.int64() ],
[12, 'id_str', pa.string() ],
[13, 'in_reply_to_screen_name', pa.string() ],
[14, 'in_reply_to_status_id', pa.int64() ],
[15, 'in_reply_to_status_id_str', pa.string() ],
[16, 'in_reply_to_user_id', pa.int64() ],
[17, 'in_reply_to_user_id_str', pa.string() ],
[18, 'is_quote_status', pa.bool_() ],
[19, 'lang', pa.string() ],
[20, 'place', pa.string()],
[21, 'possibly_sensitive', pa.bool_()],
[22, 'quoted_status', pa.string()],
[23, 'quoted_status_id', pa.int64()],
[24, 'quoted_status_id_str', pa.string()],
[25, 'quoted_status_permalink', pa.string()],
[26, 'retweet_count', pa.int64()],
[27, 'retweeted', pa.bool_()],
[28, 'retweeted_status', pa.string()],
[29, 'scopes', pa.string()],
[30, 'source', pa.string()],
[31, 'truncated', pa.bool_()],
[32, 'user', pa.string()],

#[33, 'withheld_in_countries', pa.list_(pa.string())],
[33, 'withheld_in_countries', pa.string()],

#[34, 'followers', pa.struct({'followers': pa.bool_()})]
[34, 'followers', pa.string()]
]

#############################

Expand Down Expand Up @@ -741,3 +699,28 @@ def ingest_range(self, begin, end, job_name=None): # This method is where the m
for i in range(0, self.TWEETS_PER_PROCESS):
ids_to_process.append(self.queue.popleft())
self.process_ids(ids_to_process, job_name)

###############################

def search_time_range(self,
Search="COVID",
Since="2020-01-01 20:00:00",
Until="2020-01-01 21:00:00",
job_name=None,
**kwargs):
if job_name is None:
job_name = "search_%s" % Search
tp = TwintPool()
for df,t0,t1 in tp._get_term(Search=Search, Since=Since, Until=Until, **kwargs):
logger.debug('hits %s to %s: %s', t0, t1, len(df))
df2 = tp.twint_df_to_neo4j_df(df)
if self.save_to_neo:
logger.debug('writing to neo4j')
df3 = Neo4jDataAccess(self.debug, self.neo4j_creds).save_df_to_graph(df2, job_name)
logger.debug('wrote to neo4j, # ', len(df3))
yield df3
else:
yield df2
logger.debug('done')


49 changes: 49 additions & 0 deletions modules/StatusArrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pyarrow as pa

### When dtype -> arrow ambiguious, override
KNOWN_FIELDS = [
[0, 'contributors', pa.string()],
[1, 'coordinates', pa.string()],
[2, 'created_at', pa.string()],

#[3, 'display_text_range', pa.list_(pa.int64())],
[3, 'display_text_range', pa.string()],

[4, 'entities', pa.string()],
[5, 'extended_entities', pa.string()], #extended_entities_t ],
[7, 'favorited', pa.bool_()],
[8, 'favorite_count', pa.int64()],
[9, 'full_text', pa.string()],
[10, 'geo', pa.string()],
[11, 'id', pa.int64() ],
[12, 'id_str', pa.string() ],
[13, 'in_reply_to_screen_name', pa.string() ],
[14, 'in_reply_to_status_id', pa.int64() ],
[15, 'in_reply_to_status_id_str', pa.string() ],
[16, 'in_reply_to_user_id', pa.int64() ],
[17, 'in_reply_to_user_id_str', pa.string() ],
[18, 'is_quote_status', pa.bool_() ],
[19, 'lang', pa.string() ],
[20, 'place', pa.string()],
[21, 'possibly_sensitive', pa.bool_()],
[22, 'quoted_status', pa.string()],
[23, 'quoted_status_id', pa.int64()],
[24, 'quoted_status_id_str', pa.string()],
[25, 'quoted_status_permalink', pa.string()],
[26, 'retweet_count', pa.int64()],
[27, 'retweeted', pa.bool_()],
[28, 'retweeted_status', pa.string()],
[29, 'scopes', pa.string()],
[30, 'source', pa.string()],
[31, 'truncated', pa.bool_()],
[32, 'user', pa.string()],

#[33, 'withheld_in_countries', pa.list_(pa.string())],
[33, 'withheld_in_countries', pa.string()],

#[34, 'followers', pa.struct({'followers': pa.bool_()})]
[34, 'followers', pa.string()]
]



Loading