Skip to content
Prev Previous commit
Next Next commit
async scrape speedup
  • Loading branch information
007vasy committed Apr 17, 2020
commit effa6cd76e14b0db9bac5cf8577ebcb9a0cbad84
59 changes: 39 additions & 20 deletions modules/IngestDrugSynonyms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@
from typing import Optional
from functools import partial
import logging
import aiohttp
import asyncio
from aiohttp import ClientSession

logger = logging.getLogger('ds')

async def fetch_url_resp(url):
async with ClientSession() as session:
async with session.get(url, allow_redirects=True, headers={'User-Agent': 'python-requests/2.20.0'}) as resp:
return resp

class IngestDrugSynonyms():

Expand All @@ -40,28 +48,23 @@ def load_config(configPath:Path):
logger.warning("Could not load config file from: {}".format(configPath))

@staticmethod
def api(query,from_study,to_study,url):
async def api(query,from_study,to_study,url):
url = url.format(query,from_study,to_study)
response = requests.request("GET", url)
return response.json()
return await fetch_url_resp(url).json()

def api_wrapper(self,query,from_study):
return self.api(query,from_study,from_study+99,self.url_USA)
def getAllStudiesByQuery(self,query:str) -> list:
async def api_wrapper(self,query,from_study):
return await self.api(query,from_study,from_study+99,self.url_USA)

async def getAllStudiesByQuery(self,query:str) -> list:
logger.info("> STARTING scraping with '{}' keyword".format(query))
studies:list = []
studies_task:list = []
from_study = 1
temp = self.api_wrapper(query,from_study)
temp = await self.api_wrapper(query,from_study)
nstudies = temp['FullStudiesResponse']['NStudiesFound']
logger.info("> {} studies found by '{}' keyword".format(nstudies,query))
if nstudies > 0:
studies = temp['FullStudiesResponse']['FullStudies']
for study_index in range(from_study+100,nstudies,100):
temp = self.api_wrapper(query,study_index)
studies.extend(temp['FullStudiesResponse']['FullStudies'])

return studies
studies_task = [asyncio.create_task(self.api_wrapper(query,study_index)) for study_index in range(from_study,nstudies,100)]
return studies_task

@staticmethod
def xls_handler(r):
Expand Down Expand Up @@ -96,12 +99,13 @@ def csvzip_handler(r):
return df

@staticmethod
def urlToDF(url:str,respHandler) -> pd.DataFrame:
r = requests.get(url, allow_redirects=True)
return respHandler(r)
async def urlToDF(url:str,respHandler) -> pd.DataFrame:
resp = await fetch_url_resp(url)
return respHandler(resp)

@staticmethod
def _convert_US_studies(US_studies:dict) -> pd.DataFrame:
# studies.extend(temp['FullStudiesResponse']['FullStudies'])
list_of_US_studies:list = []
for key in US_studies.keys():
for study in US_studies[key]:
Expand Down Expand Up @@ -134,11 +138,26 @@ def _convert_US_studies(US_studies:dict) -> pd.DataFrame:
return US_studies_df

def _scrapeData(self):
self.internationalstudies = self.urlToDF(self.url_international,self.xls_handler)
self.drug_vocab_df = self.urlToDF(self.url_drugbank,self.csvzip_handler)
logger.info("Scraping Started")
loop = asyncio.get_event_loop()
tasks:list = []

tasks.append(asyncio.create_task(self.urlToDF(self.url_international,self.xls_handler)))
tasks.append(asyncio.create_task(self.urlToDF(self.url_drugbank,self.csvzip_handler))

self.all_US_studies_by_keyword:dict = {}

for key in self.query_keywords:
self.all_US_studies_by_keyword[key] = self.getAllStudiesByQuery(key)

running_tasks:list = asyncio.gather(*tasks)
responses:list = loop.run_until_complete(running_tasks)

self.internationalstudies = responses[0]
self.drug_vocab_df = responses[1]


loop.close()

def _filterData(self):
self.drug_vocab_reduced = self.drug_vocab_df[['Common name', 'Synonyms']]
Expand Down