|
2 | 2 |
|
3 | 3 | import requests |
4 | 4 | import sqlite3 |
| 5 | +import tempfile |
5 | 6 | import json |
6 | 7 | import re |
7 | | -from datetime import datetime, timezone |
| 8 | +from datetime import datetime, timezone, timedelta |
8 | 9 | from pathlib import Path |
9 | 10 | import threading |
10 | 11 | from time import sleep |
11 | 12 | from enum import IntEnum |
12 | 13 | from language_detection import LANGUAGE_DETECTOR, Language |
13 | | -from strutils import author_name_to_normal |
| 14 | +from strutils import author_name_to_normal, md5 |
14 | 15 | import nearestpdf |
| 16 | +from tqdm import tqdm |
| 17 | +from downloadutils import download, pdf_name_for_work |
15 | 18 |
|
16 | 19 | # Maybe a better place to put this mutual dependency? |
17 | 20 | from local_gdrive import locked |
@@ -604,6 +607,139 @@ def match_gfiles_to_local_works(self): |
604 | 607 | found += 1 |
605 | 608 | print(f"Found Google Drive files for {found} works and added them to the DB") |
606 | 609 |
|
| 610 | + @locked |
| 611 | + def mark_download(self, work_id: str, success: bool, timestamp: int=None): |
| 612 | + if not timestamp: |
| 613 | + timestamp = current_timestamp() |
| 614 | + if not success: |
| 615 | + timestamp = -timestamp |
| 616 | + self.cursor.execute(""" |
| 617 | + UPDATE works SET downloaded_date = ? WHERE id = ? |
| 618 | + """, |
| 619 | + (timestamp, work_id,) |
| 620 | + ) |
| 621 | + self.conn.commit() |
| 622 | + |
| 623 | + def _attempt_to_download(self, work: dict | sqlite3.Row, to_folder: Path) -> Path | None: |
| 624 | + work = dict(work) |
| 625 | + filename = pdf_name_for_work(work) |
| 626 | + outpath = to_folder.joinpath(filename) |
| 627 | + with self._lock: |
| 628 | + source = self.cursor.execute(""" |
| 629 | + SELECT id FROM identifiers WHERE work_id = ? AND id_type = 'SOURCE_URL' LIMIT 1 |
| 630 | + """, |
| 631 | + (work['id'], ) |
| 632 | + ).fetchone() |
| 633 | + if source: |
| 634 | + succ = download(source['id'], outpath, expected_type='pdf') |
| 635 | + if succ: |
| 636 | + return outpath |
| 637 | + if work.get('download_url'): |
| 638 | + output_id = re.fullmatch( |
| 639 | + r'https:\/\/core.ac.uk\/download\/(?:pdf\/)?([0-9]+).pdf', |
| 640 | + work['download_url'], |
| 641 | + ).group(1) |
| 642 | + output = call_api(f"outputs/{output_id}", {}) |
| 643 | + for url in output.get('urls', []): |
| 644 | + if url == source['id']: |
| 645 | + continue |
| 646 | + succ = download(url, outpath, expected_type='pdf') |
| 647 | + if succ: |
| 648 | + return outpath |
| 649 | + if output.get('downloadUrl'): |
| 650 | + succ = download(output['downloadUrl'], outpath, expected_type='pdf') |
| 651 | + if succ: |
| 652 | + return outpath |
| 653 | + if work['download_url'] != output.get('downloadUrl'): |
| 654 | + succ = download(work['download_url'], outpath, expected_type='pdf') |
| 655 | + if succ: |
| 656 | + return outpath |
| 657 | + return None |
| 658 | + |
| 659 | + def attempt_downloads_for_query(self, query_id: int, to_folder: Path=None, min_en_conf: float=0.8, min_drive_conf: float=0.6, retry_timedelta: int | timedelta=15811200000) -> int: |
| 660 | + """ |
| 661 | + Args: |
| 662 | + to_folder: If you'd like to keep the downloaded files, supply a folder. |
| 663 | + Otherwise won't it keep them |
| 664 | + """ |
| 665 | + works = self.get_local_works_for_query(query_id) |
| 666 | + # Filter out non-English works |
| 667 | + works = [work for work in works if work['en_confidence'] >= min_en_conf] |
| 668 | + # Filter out works that we downloaded successfully or tried recently |
| 669 | + if isinstance(retry_timedelta, timedelta): |
| 670 | + retry_timedelta = int(retry_timedelta.total_seconds() * 1000) |
| 671 | + since = -(current_timestamp() - retry_timedelta) |
| 672 | + works = [work for work in works if work.get('downloaded_date') is None or (work['downloaded_date'] <= 0 and work['downloaded_date'] > since)] |
| 673 | + # Filter out works we already have on Drive |
| 674 | + with self._lock: |
| 675 | + works = [ |
| 676 | + work for work in works if |
| 677 | + self.cursor.execute( |
| 678 | + "SELECT * FROM work_gfiles WHERE work_id = ? AND pval > ? LIMIT 1", |
| 679 | + (work['id'], min_drive_conf, ) |
| 680 | + ).fetchone() is None |
| 681 | + ] |
| 682 | + print(f"Attempting to download {len(works)} works from query {query_id}...") |
| 683 | + pbar = tqdm(works) |
| 684 | + ret = 0 |
| 685 | + import pypdf.errors |
| 686 | + from pdfutils import readpdf |
| 687 | + from bulk_import import BulkPDFImporter, BulkPDFType |
| 688 | + import gdrive |
| 689 | + import nearestpdf |
| 690 | + nearestpdf.load() |
| 691 | + importer = BulkPDFImporter(BulkPDFType.CORE_API) |
| 692 | + with tempfile.TemporaryDirectory() as temp_dir: |
| 693 | + if not to_folder: |
| 694 | + to_folder = Path(temp_dir) |
| 695 | + for work in pbar: |
| 696 | + succ = self._attempt_to_download(work, to_folder) |
| 697 | + if succ: |
| 698 | + self.mark_download(work['id'], True) |
| 699 | + hash = md5(succ) |
| 700 | + existing = gdrive.gcache.get_items_with_md5(hash) |
| 701 | + if not existing: |
| 702 | + existing = gdrive.gcache.get_trashed_items_with_md5(hash) |
| 703 | + if not existing: |
| 704 | + authors = work['authors'] |
| 705 | + if isinstance(authors, str): |
| 706 | + authors = json.loads(authors) |
| 707 | + assert isinstance(authors, list) |
| 708 | + authors = [author_name_to_normal(author['name']) for author in authors] |
| 709 | + try: |
| 710 | + fuzzy_dupes = nearestpdf.find_matching_files(work['title'], authors, readpdf(succ)) |
| 711 | + except (pypdf.errors.PdfReadError, pypdf.errors.PdfStreamError): |
| 712 | + pbar.write("Didn't get a valid PDF :(") |
| 713 | + self.mark_download(work['id'], False) |
| 714 | + continue |
| 715 | + if fuzzy_dupes: |
| 716 | + pbar.write(f"Found a fuzzy duplicate for \"{succ}\" on GDrive: \"{fuzzy_dupes[0][0]['name']}\"") |
| 717 | + skip_upload = False |
| 718 | + for dupe in fuzzy_dupes: |
| 719 | + self.register_gfile_for_work(work['id'], dupe[0]['id'], dupe[1]) |
| 720 | + if dupe[1] > min_drive_conf: |
| 721 | + skip_upload = True |
| 722 | + if skip_upload: |
| 723 | + pbar.write(f" Uploading straight to old versions...") |
| 724 | + file_id = gdrive.gcache.upload_file( |
| 725 | + succ, |
| 726 | + folder_id=gdrive.OLD_VERSIONS_FOLDER_ID, |
| 727 | + ) |
| 728 | + if file_id: |
| 729 | + self.register_gfile_for_work(work['id'], file_id, 1) |
| 730 | + continue |
| 731 | + if existing: |
| 732 | + file_id = existing[0]['id'] |
| 733 | + else: |
| 734 | + ret += 1 |
| 735 | + file_id = importer.import_item(succ, True) |
| 736 | + assert file_id is not None, f"Failed to upload {succ}" |
| 737 | + self.register_gfile_for_work(work['id'], file_id, 1) |
| 738 | + else: |
| 739 | + self.mark_download(work['id'], False) |
| 740 | + |
| 741 | + return ret |
| 742 | + |
607 | 743 | @locked |
608 | 744 | def close(self): |
609 | 745 | if self.conn: |
|
0 commit comments