Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
ae2eb1d
adding is_tor flag in and setting in twintpool in checkhydrate proper…
webcoderz Sep 16, 2020
ff45ea0
adding is_tor flag in and setting in twintpool in checkhydrate proper…
webcoderz Sep 16, 2020
3c279f1
adding is_tor flag in and setting in twintpool in checkhydrate proper…
webcoderz Sep 16, 2020
10b241e
adding is_tor flag in and setting in twintpool in checkhydrate proper…
webcoderz Sep 16, 2020
14017fd
updating job
webcoderz Sep 16, 2020
3d7c969
updating job
webcoderz Sep 16, 2020
43b02d9
updating job
webcoderz Sep 16, 2020
9230edc
updating job
webcoderz Sep 16, 2020
88ed1ce
setting up datastream-Dockerfile for twitterscraper library
webcoderz Sep 24, 2020
06e322c
setting up datastream-Dockerfile for twitterscraper library
webcoderz Sep 24, 2020
9580d40
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
c36852b
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
a8ac909
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
5ecbedd
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
df9bd0b
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
2f84bcb
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
047430e
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
59d512b
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
a282d8d
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
540efe2
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
5ea76b8
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
58a66c7
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
7461321
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
332a4d9
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
431af04
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
07a7181
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
9d25744
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
c1e0eb9
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
a6983a4
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
10e600b
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
fc18003
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
0bd26b6
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
47297c5
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
84060b6
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
fad3bce
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 24, 2020
15463b4
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
dec74eb
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
631eb57
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
1aa8191
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
580a697
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
779cc99
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
b572c8e
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
0f0290f
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
a292cfe
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
b6a8ef3
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
2fed016
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
e862eef
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
b8e9757
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
39bc1e9
setting up datastream-Dockerfile for twitterscraper library w/ geckod…
webcoderz Sep 25, 2020
badca9f
ublock extension
webcoderz Sep 26, 2020
b03e85b
ublock extension
webcoderz Sep 26, 2020
b269256
ublock extension
webcoderz Sep 26, 2020
2fa3400
fix
webcoderz Sep 26, 2020
e4ca790
fix
webcoderz Oct 2, 2020
dca3b1a
_get_user_timeline fix inline with twint updates
webcoderz Dec 26, 2020
c288c08
_get_user_timeline fix inline with twint updates
webcoderz Dec 26, 2020
31acfd9
_get_user_timeline fix inline with twint updates
webcoderz Dec 26, 2020
2aa9c2d
date time pipeline fix
webcoderz Dec 26, 2020
232c6b5
date time pipeline fix
webcoderz Dec 26, 2020
91b58b2
date time pipeline fix
webcoderz Dec 26, 2020
dee6da4
date time pipeline fix
webcoderz Dec 26, 2020
b39bfe3
added self.config.Hide_output=True
webcoderz Dec 26, 2020
798d9ee
user info inline with twint
webcoderz Dec 26, 2020
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions infra/pipelines/docker/datastream-Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,58 @@
FROM python:3.7


RUN apt-get update \
&& apt-get install -y --no-install-recommends git vim tor \
&& apt-get install -y --no-install-recommends git vim tor wget libgtk-3-dev libdbus-glib-1-2 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*


#============
# Firefox
#============


ARG FIREFOX_VERSION=latest
RUN FIREFOX_DOWNLOAD_URL=$(if [ $FIREFOX_VERSION = "latest" ] || [ $FIREFOX_VERSION = "nightly-latest" ] || [ $FIREFOX_VERSION = "devedition-latest" ] || [ $FIREFOX_VERSION = "esr-latest" ]; then echo "https://download.mozilla.org/?product=firefox-$FIREFOX_VERSION-ssl&os=linux64&lang=en-US"; else echo "https://download-installer.cdn.mozilla.net/pub/firefox/releases/$FIREFOX_VERSION/linux-x86_64/en-US/firefox-$FIREFOX_VERSION.tar.bz2"; fi) \
&& apt-get update -qqy \
&& apt-get -qqy --no-install-recommends install libavcodec-extra \
&& rm -rf /var/lib/apt/lists/* /var/cache/apt/* \
&& wget --no-verbose -O /tmp/firefox.tar.bz2 $FIREFOX_DOWNLOAD_URL \
&& tar -C /opt -xjf /tmp/firefox.tar.bz2 \
&& rm /tmp/firefox.tar.bz2 \
&& mv /opt/firefox /opt/firefox-$FIREFOX_VERSION \
&& ln -fs /opt/firefox-$FIREFOX_VERSION/firefox /usr/bin/firefox

#============
# GeckoDriver
#============
ARG GECKODRIVER_VERSION=latest
RUN GK_VERSION=$(if [ ${GECKODRIVER_VERSION:-latest} = "latest" ]; then echo "0.27.0"; else echo $GECKODRIVER_VERSION; fi) \
&& echo "Using GeckoDriver version: "$GK_VERSION \
&& wget --no-verbose -O /tmp/geckodriver.tar.gz https://github.com/mozilla/geckodriver/releases/download/v$GK_VERSION/geckodriver-v$GK_VERSION-linux64.tar.gz \
&& rm -rf /opt/geckodriver \
&& tar -C /opt -zxf /tmp/geckodriver.tar.gz \
&& rm /tmp/geckodriver.tar.gz \
&& mv /opt/geckodriver /opt/geckodriver-$GK_VERSION \
&& cp /opt/geckodriver-$GK_VERSION /bin \
&& chmod 755 /opt/geckodriver-$GK_VERSION \
&& ln -fs /opt/geckodriver-$GK_VERSION /usr/bin/geckodriver \
&& ln -fs /opt/geckodriver-$GK_VERSION /usr/bin/wires




RUN pip install prefect==0.10.1 simplejson twarc neo4j boto3==1.12.39 \
pandas pyarrow urlextract git+https://github.com/lmeyerov/twint.git@patch-1#egg=twint
pandas pyarrow urlextract git+https://github.com/lmeyerov/twint.git@patch-1#egg=twint \
git+https://github.com/lapp0/twitterscraper.git@selenium

#FIXME this should be part of entrypoint / service config?
RUN service tor start
#RUN service tor start

COPY ./modules /app/ProjectDomino
COPY ./infra/pipelines/docker/jobs /app

HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \
CMD curl -sf --socks5-hostname localhost:9050 https://check.torproject.org | grep Congrat
#HEALTHCHECK --interval=60s --timeout=15s --start-period=20s \
# CMD curl -sf --socks5-hostname localhost:9050 https://check.torproject.org | grep Congrat

WORKDIR /app
2 changes: 1 addition & 1 deletion infra/pipelines/docker/datastream-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
dockerfile: infra/pipelines/docker/datastream-Dockerfile
tty: true
network_mode: 'bridge'
command: sh -c "pwd && ls && service tor start && python3 /app/search_by_date_job.py"
#command: sh -c "pwd && ls && service tor start && python3 /app/search_by_date_job.py"
volumes:
- /home/codywebb/ProjectDomino/infra/pipelines/docker/jobs/neo4jcreds.json:/secrets/neo4jcreds.json:ro
environment:
Expand Down
6 changes: 3 additions & 3 deletions infra/pipelines/docker/jobs/search_by_date_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ def get_creds():
@task(log_stdout=True, skip_on_upstream_skip=True)
def run_stream():
creds = get_creds()
start = datetime.strptime("2020-03-11 20:00:00", "%Y-%m-%d %H:%M:%S")
start = datetime.strptime("2020-04-25 20:00:00", "%Y-%m-%d %H:%M:%S")
current = datetime.strptime(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
rand_dt=random_date(start, current)
tp = TwintPool(is_tor=True)
fh = FirehoseJob(neo4j_creds=creds, PARQUET_SAMPLE_RATE_TIME_S=30, save_to_neo=True, writers={})
try:
for df in fh.search_time_range(tp=tp, Search="covid",Since=str(rand_dt),Until=str(current),job_name="covid stream"):
for df in fh.search_time_range(tp=tp, Search="covid",Since=str(rand_dt),Until=str(current),job_name="covid stream",Limit=350):
logger.debug('got: %s', len(df))
except:
logger.debug("job finished")
Expand All @@ -93,7 +93,7 @@ def run_stream():

schedule = IntervalSchedule(
start_date=datetime(2020, 9, 5),
interval=timedelta(seconds=10),
interval=timedelta(seconds=randrange(15)),
)
storage = S3(bucket=S3_BUCKET)

Expand Down
105 changes: 105 additions & 0 deletions modules/FacebookScraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from facebook_scraper import get_posts
import pandas as pd
from urllib.parse import urlparse

pd.set_option('display.max_colwidth', -1)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)


class FacebookScrape:

def __init__(self):
self.pages = 5
self.timeout = 5
self.sleep = None #(int) how long to sleep inbetween requests

def get_posts_From_Page(self, account: str, pages: int, timeout: int, sleep: int):
self.pages = pages
self.timeout = timeout
self.sleep = sleep
df = pd.concat([pd.DataFrame([post]) for post in
get_posts(account=account, pages=pages, extra_info=True, timeout=timeout, sleep=sleep)],
ignore_index=True, sort=False)
df["page_name"] = account
df['text'] = df['text'].str.replace(r'\n', '')
df['post_text'] = df['post_text'].str.replace(r'\n', '')
df['shared_text'] = df['shared_text'].str.replace(r'\n', '')
if 'reactions' in df:
df = self.__normalize_reactions(df)
urls = self.__parse_urls(df)
dfs = pd.concat([df, urls], axis=1)
return dfs
else:
urls = self.__parse_urls(df)
dfs = pd.concat([df, urls], axis=1)
return dfs

def get_posts_From_Group(self, group: str, pages: int, timeout: int, sleep: int):
self.pages = pages
self.timeout = timeout
self.sleep = sleep
df = pd.concat([pd.DataFrame([post]) for post in
get_posts(group=group, pages=pages, extra_info=True, timeout=timeout, sleep=sleep)],
ignore_index=True, sort=False)
df["group_name"] = group
df['text'] = df['text'].str.replace(r'\n', '')
df['post_text'] = df['post_text'].str.replace(r'\n', '')
df['shared_text'] = df['shared_text'].str.replace(r'\n', '')
if 'reactions' in df:
df = self.__normalize_reactions(df)
urls = self.__parse_urls(df)
dfs = pd.concat([df, urls], axis=1)
return dfs
else:
urls = self.__parse_urls(df)
dfs = pd.concat([df, urls], axis=1)
return dfs

def __normalize_reactions(self, df):
rx = df["reactions"].apply(lambda x: {} if pd.isna(x) else x)
rx = pd.io.json.json_normalize(rx)
df.drop('reactions', axis=1, inplace=True)
return pd.concat([df, rx], axis=1)

def __parse_urls(self, df):
url_params = []
for u in df['link']:
try:
parsed = urlparse(u)
url_params.append({
'url': u,
'schema': parsed.scheme,
'netloc': parsed.netloc,
'path': parsed.path,
'params': parsed.params,
'query': parsed.query,
'fragment': parsed.fragment,
'username': parsed.username,
'password': parsed.password,
'hostname': parsed.hostname,
'port': parsed.port,
})
except Exception as e:
raise e
if len(url_params) == 0:
return pd.DataFrame({
'url': pd.Series([], dtype='object'),
'schema': pd.Series([], dtype='object'),
'netloc': pd.Series([], dtype='object'),
'path': pd.Series([], dtype='object'),
'params': pd.Series([], dtype='object'),
'query': pd.Series([], dtype='object'),
'fragment': pd.Series([], dtype='object'),
'username': pd.Series([], dtype='object'),
'password': pd.Series([], dtype='object'),
'hostname': pd.Series([], dtype='object'),
'port': pd.Series([], dtype='int64')
})
return pd.DataFrame(url_params)





4 changes: 2 additions & 2 deletions modules/FirehoseJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,13 +702,13 @@ def search_time_range(self,
if job_name is None:
job_name = "search_%s" % Search
if tp is None:
tp = TwintPool()
tp = TwintPool(is_tor=True)
for df, t0, t1 in tp._get_term(Search=Search, Since=Since, Until=Until, **kwargs):
logger.debug('hits %s to %s: %s', t0, t1, len(df))
if self.save_to_neo:
logger.debug('writing to neo4j')
hydratetic = time.perf_counter()
chkd = TwintPool().check_hydrate(df)
chkd = TwintPool(is_tor=True).check_hydrate(df)
hydratetoc = time.perf_counter()
logger.info(f'finished checking for hydrate: {hydratetoc - hydratetic:0.4f} seconds')

Expand Down
12 changes: 7 additions & 5 deletions modules/Neo4jDataAccess.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def __get_neo4j_graph(self, role_type):
if not (self.creds is None):
creds = self.creds
else:
with open('neo4jcreds.json') as json_file:
with open('/secrets/neo4jcreds.json') as json_file:
creds = json.load(json_file)
res = list(filter(lambda c: c["type"] == role_type, creds))
if len(res):
Expand Down Expand Up @@ -502,6 +502,7 @@ def save_twintdf_to_neo(self, df, job_name, job_id=None):
params.append(pd.DataFrame([{'tweet_id': int(row['status_id']),
'text': row['full_text'],
'created_at': str(pd.to_datetime(row['created_at'])),
'date': str(pd.to_datetime(row['date'])),
'favorite_count': row['favorite_count'],
'retweet_count': row['retweet_count'],
'type': tweet_type,
Expand Down Expand Up @@ -663,6 +664,7 @@ def __tweetdf_to_neo_account_df(self, df, job_name):
acctdf['screen_name'] = df['username']
acctdf['friends_count'] = df["following"]
acctdf['followers_count'] = df["followers"]
acctdf['created_at'] = df['created_at']
acctdf['job_name'] = str(job_name)
return acctdf

Expand All @@ -682,12 +684,12 @@ def write_twint_enriched_tweetdf_to_neo(self, res, job_name, job_id):
logger.info("writing URL Nodes and Properties")
self.save_enrichment_df_to_graph(self.NodeLabel.Url, df, job_name, job_id)
elif key == 'params':
logger.info("writing tweets and accts")
with self.graph.session() as session:
logger.debug('writing tweets and accounts')
#logger.info("writing tweets and accts")
with graph.session() as session:
logger.info('writing tweets and accounts')
session.run(self.tweetsandaccounts, tweets=df.to_dict(orient='records'),
timeout=self.timeout)
logger.debug('writing tweet relationships')
logger.info('writing tweet relationships')
session.run(self.tweeted_rel, tweets=df.to_dict(orient='records'), timeout=self.timeout)
toc = time.perf_counter()
logger.info(f'Neo4j Periodic Save Complete in {toc - tic:0.4f} seconds')
Expand Down
27 changes: 20 additions & 7 deletions modules/TwintPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,30 @@ def _get_term(self, Search="IngSoc", Since="1984-04-20 13:00:00", Until="1984-04
toc = time.perf_counter()
logger.info(f'finished searching for tweets in: {toc - tic:0.4f} seconds')

def _get_timeline(self, username, limit):
def _get_Replies_to_user(self,username, limit):
self.config.Retweets = True
self.config.Search = "from:" + username
self.config.Search = username
self.config.Limit = limit
self.config.Pandas = True
self.config.Hide_output = True
twint.run.Search(self.config)
tweets_df = twint.storage.panda.Tweets_df
return tweets_df
return twint.storage.panda.Tweets_df

def _get_user_timeline(self,username,limit):
self.config.Profile_full = True
self.config.Retweets = True
self.config.Limit = limit
self.config.Username = username
self.config.Pandas = True
self.config.Hide_output = True
twint.run.Profile(self.config)
return twint.storage.panda.Tweets_df

def _get_user_info(self, username):
def _get_user_info(self,username):
self.config.User_full = True
self.config.Username = username
self.config.Pandas = True
self.config.Hide_output=True
twint.run.Lookup(self.config)
return twint.storage.panda.User_df

Expand All @@ -89,7 +101,7 @@ def check_hydrate(self, df):

from .Neo4jDataAccess import Neo4jDataAccess
neo4j_creds = None
with open('neo4jcreds.json') as json_file:
with open('/secrets/neo4jcreds.json') as json_file:
neo4j_creds = json.load(json_file)

# dft : df[[id:int64, hydrated: NaN | 'FULL' | 'PARTIAL'??]]
Expand Down Expand Up @@ -140,14 +152,15 @@ def row_tweet_to_urls(row):
neo4j_df['hashtags'] = df['hashtags'].apply(lambda x: [{'text': ht} for ht in x])
neo4j_df['user_followers_count'] = None
neo4j_df['user_friends_count'] = None
neo4j_df['date'] = df['date']
# neo4j_df['user_created_at'] = None
neo4j_df['user_profile_image_url'] = None
neo4j_df['reply_tweet_id'] = None
neo4j_df['user_mentions'] = df['tweet'].str.findall('@[\w]+')
# neo4j_df['retweet_id'] is suspiciously empty (always)
neo4j_df['retweeted_status'] = None
neo4j_df['conversation_id'] = df['conversation_id'] # FIXME no-op?
neo4j_df['created_at'] = (neo4j_df['created_at'] / 1000).apply(lambda n: datetime.fromtimestamp(n))
neo4j_df['created_at'] = (df['created_at'] / 1000).apply(lambda n: datetime.fromtimestamp(n))

# neo4j_df['quoted_status_id'] = df.apply(row_to_quoted_status_id, axis=1)
# neo4j_df['is_quote_status'] = neo4j_df['quoted_status_id'] != None
Expand Down