Skip to content

Commit 463bf95

Browse files
committed
Some local_core schema fiddling
[skip ci]
1 parent 9988d63 commit 463bf95

1 file changed

Lines changed: 124 additions & 19 deletions

File tree

‎scripts/local_core.py‎

Lines changed: 124 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import requests
44
import sqlite3
55
import json
6+
import re
7+
from datetime import datetime, timezone
68
from pathlib import Path
79
import threading
810
from time import sleep
11+
from enum import IntEnum
912

1013
# Maybe a better place to put this mutual dependency?
1114
from local_gdrive import locked
@@ -20,15 +23,30 @@
2023
"ARXIV_ID",
2124
]
2225

26+
class TrackingQueryStatus(IntEnum):
27+
UNTESTED = 0
28+
INVALID = 1
29+
PAUSED = 2
30+
TRACKING = 3
31+
32+
2333
def call_api(subpath: str, params: dict, retries=3):
2434
url = "https://api.core.ac.uk/v3/" + subpath
25-
response = requests.get(
26-
url,
27-
headers={
28-
'Authorization': TOKEN,
29-
},
30-
params=params,
31-
)
35+
try:
36+
response = requests.get(
37+
url,
38+
headers={
39+
'Authorization': TOKEN,
40+
},
41+
params=params,
42+
)
43+
except requests.exceptions.ChunkedEncodingError as err:
44+
if retries > 0:
45+
print("CORE API response got cut off. Retrying in 4 seconds...")
46+
sleep(4)
47+
return call_api(subpath, params, retries=retries-1)
48+
else:
49+
raise err
3250
match response.status_code:
3351
case 200:
3452
return response.json()
@@ -38,8 +56,8 @@ def call_api(subpath: str, params: dict, retries=3):
3856
resp = response.json()
3957
if 'capacity' in resp.get('message', ''):
4058
if retries > 0:
41-
print("CORE API overloaded at the moment...waiting 5 secs and trying again...")
42-
sleep(5)
59+
print("CORE API overloaded at the moment...waiting 6 secs and trying again...")
60+
sleep(6)
4361
return call_api(subpath, params, retries=retries-1)
4462
else:
4563
raise ConnectionRefusedError("CORE API overloaded right now. Try again later")
@@ -49,6 +67,14 @@ def call_api(subpath: str, params: dict, retries=3):
4967
case _:
5068
raise NotImplementedError(f"Unknown status code {response.status_code}:\n\n{response.text}")
5169

70+
def api_timestring_to_timestamp(ts: str | None) -> int | None:
71+
"""The API returns timestamps as ISO-ish strings but requests them as ms timestamps"""
72+
if not ts:
73+
return None
74+
dt = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S")
75+
dt = dt.replace(tzinfo=timezone.utc)
76+
return int(dt.timestamp() * 1000)
77+
5278
class CoreAPIWorksCache:
5379
"""
5480
Manages a SQLite DB for "works" fetched from the Cambridge CORE API
@@ -65,10 +91,13 @@ class CoreAPIWorksCache:
6591
a multithreaded downloader.
6692
"""
6793

68-
def __init__(self, db_path: str | Path):
94+
def __init__(self, db_path: str | Path, page_size=100):
6995
"""
7096
Connects to the SQLite DB at `db_path`
7197
"""
98+
assert page_size == int(page_size), "Page size must be an int"
99+
assert page_size > 0, "Page size must be positive"
100+
self.page_size = page_size
72101
self.db_path = Path(db_path)
73102
self.conn = sqlite3.connect(db_path, check_same_thread=False)
74103
self._lock = threading.RLock()
@@ -81,16 +110,17 @@ def _create_tables(self):
81110
CREATE TABLE IF NOT EXISTS tracking_queries (
82111
id INTEGER PRIMARY KEY AUTOINCREMENT,
83112
query TEXT NOT NULL UNIQUE,
84-
last_updated TEXT
113+
up_to INTEGER, -- updated date of the latest work in epochal ms
114+
status INTEGER NOT NULL
85115
);
86116
"""
87117

88118
create_works_table_sql = """
89119
CREATE TABLE IF NOT EXISTS works (
90120
id TEXT PRIMARY KEY NOT NULL, -- CORE's own ID
91121
title TEXT NOT NULL,
92-
created_date TEXT NOT NULL, -- CORE added Date
93-
updated_date TEXT NOT NULL, -- CORE updated Date
122+
created_date INTEGER NOT NULL, -- CORE added Date
123+
updated_date INTEGER NOT NULL, -- CORE updated Date
94124
data_provider INTEGER NOT NULL, -- First provider id
95125
additional_data_providers TEXT, -- json if more than one
96126
abstract TEXT,
@@ -100,10 +130,10 @@ def _create_tables(self):
100130
document_type TEXT, -- from API, almost useless
101131
download_url TEXT,
102132
full_text TEXT,
103-
published_date TEXT, -- might not have the exact date
133+
published_date INTEGER, -- in ms lol
104134
publisher TEXT,
105135
-- End CORE fields, below are my fields
106-
downloaded_timestamp INTEGER -- negative means failed
136+
downloaded_date INTEGER -- negative means failed
107137
);
108138
"""
109139

@@ -125,20 +155,32 @@ def _create_tables(self):
125155
CREATE TABLE IF NOT EXISTS journals_works (
126156
work_id TEXT NOT NULL,
127157
journal_id TEXT NOT NULL, -- ISSN
128-
FOREIGN KEY(work_id) REFERENCES works(id)
158+
FOREIGN KEY(work_id) REFERENCES works(id),
159+
PRIMARY KEY (work_id, journal_id)
129160
);
130161
"""
131162
create_journal_works_indexes_sql = """
132163
CREATE INDEX IF NOT EXISTS idx_work_journal ON journals_works(work_id);
133164
CREATE INDEX IF NOT EXISTS idx_journal_work ON journals_works(journal_id);
134165
"""
135166

167+
create_query_works_join_table_sql = """
168+
CREATE TABLE IF NOT EXISTS query_works (
169+
query_id INTEGER NOT NULL,
170+
work_id TEXT NOT NULL,
171+
FOREIGN KEY(work_id) REFERENCES works(id),
172+
FOREIGN KEY(query_id) REFERENCES tracking_queries(id),
173+
PRIMARY KEY (query_id, work_id) ON CONFLICT IGNORE
174+
);
175+
"""
176+
136177
self.cursor.execute(create_tracking_table_sql)
137178
self.cursor.execute(create_works_table_sql)
138179
self.cursor.execute(create_identifiers_table_sql)
139180
self.cursor.execute(create_id_table_index_sql)
140181
self.cursor.execute(create_journals_join_table_sql)
141182
self.cursor.executescript(create_journal_works_indexes_sql)
183+
self.cursor.execute(create_query_works_join_table_sql)
142184
self.conn.commit()
143185

144186
@locked
@@ -148,13 +190,14 @@ def get_source_urls_for_work_id(self, work_id: str | int):
148190
return [row['id'] for row in rows]
149191

150192
@locked
151-
def upsert_work_from_api(self, api_obj: dict):
193+
def upsert_work_from_api(self, api_obj: dict, tracking_query_id: int | None=None):
152194
data_provider = api_obj['dataProviders'][0]['id']
153195
additional_data_providers = None
154196
if len(api_obj['dataProviders']) > 1:
155197
additional_data_providers = json.dumps([
156198
p['id'] for p in api_obj['dataProviders'][1:]
157199
])
200+
updated_time = api_timestring_to_timestamp(api_obj['updatedDate'])
158201
sql = f"""
159202
INSERT INTO works (id, title, created_date, updated_date, data_provider, additional_data_providers, abstract, authors, citation_count, contributors, document_type, download_url, full_text, published_date, publisher)
160203
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
@@ -174,13 +217,35 @@ def upsert_work_from_api(self, api_obj: dict):
174217
publisher = excluded.publisher
175218
WHERE excluded.updated_date > works.updated_date;
176219
"""
177-
self.cursor.execute(sql, (api_obj['id'], api_obj['title'], api_obj['createdDate'], api_obj['updatedDate'], data_provider, additional_data_providers, api_obj['abstract'], json.dumps(api_obj['authors']), api_obj['citationCount'], json.dumps(api_obj['contributors']), api_obj.get('documentType'), api_obj['downloadUrl'], api_obj['fullText'], api_obj['publishedDate'], api_obj['publisher']))
220+
self.cursor.execute(sql, (
221+
api_obj['id'],
222+
api_obj['title'],
223+
api_timestring_to_timestamp(api_obj['createdDate']),
224+
updated_time,
225+
data_provider,
226+
additional_data_providers,
227+
api_obj['abstract'],
228+
json.dumps(api_obj['authors']),
229+
api_obj['citationCount'],
230+
json.dumps(api_obj['contributors']),
231+
api_obj.get('documentType'),
232+
api_obj['downloadUrl'],
233+
api_obj['fullText'],
234+
api_timestring_to_timestamp(api_obj.get('publishedDate')),
235+
api_obj['publisher'],
236+
))
178237

179238
for ID_TYPE in IDENTIFIERS_FIELD_TYPES:
180239
ids_of_type = [identif['identifier'] for identif in api_obj['identifiers'] if identif['type'] == ID_TYPE]
181240
self.cursor.execute("DELETE FROM identifiers WHERE work_id = ? AND id_type = ?;", (api_obj['id'], ID_TYPE))
182241
for ident in ids_of_type:
183-
self.cursor.execute("INSERT INTO identifiers (id, work_id, id_type) VALUES (?, ?, ?)", (ident, api_obj['id'], ID_TYPE))
242+
try:
243+
self.cursor.execute("INSERT INTO identifiers (id, work_id, id_type) VALUES (?, ?, ?)", (ident, api_obj['id'], ID_TYPE))
244+
except sqlite3.IntegrityError:
245+
self.cursor.execute("SELECT work_id FROM identifiers WHERE id = ?", (ident, ))
246+
other_work_id = self.cursor.fetchone()['work_id']
247+
self.conn.rollback()
248+
raise ValueError(f"Can't insert work {api_obj['id']} because {ID_TYPE} \"{ident}\" already exists associated with work {other_work_id}")
184249

185250
existing_source_urls = self.get_source_urls_for_work_id(api_obj['id'])
186251
missing_source_urls = set(api_obj['sourceFulltextUrls']) - set(existing_source_urls)
@@ -190,10 +255,50 @@ def upsert_work_from_api(self, api_obj: dict):
190255
self.cursor.execute("DELETE FROM journals_works WHERE work_id = ?", (api_obj['id'],))
191256
for journal in api_obj['journals']:
192257
for issn in journal['identifiers']:
258+
assert re.match(r'^[0-9]{4}-[0-9]{3}[0-9X]$', issn), f"Invalid ISSN: {issn}"
193259
self.cursor.execute("INSERT INTO journals_works (work_id, journal_id) VALUES (?, ?)", (api_obj['id'], issn, ))
194260

261+
if tracking_query_id:
262+
# Associate this work with this query and bump the query's up_to date
263+
self.cursor.execute("INSERT INTO query_works (query_id, work_id) VALUES (?, ?)", (tracking_query_id, api_obj['id'], ))
264+
self.cursor.execute("UPDATE tracking_queries SET up_to = ? WHERE id = ? AND (up_to IS NULL OR up_to < ?)", (updated_time, tracking_query_id, updated_time, ))
265+
266+
self.conn.commit()
267+
268+
@locked
269+
def register_query(self, query: str):
270+
assert "updatedDate" not in query, "Leave the updatedDate to me"
271+
self.cursor.execute("INSERT INTO tracking_queries (query, status) VALUES (?, ?)", (query, TrackingQueryStatus.UNTESTED, ))
272+
ret = self.cursor.lastrowid
195273
self.conn.commit()
274+
return ret
196275

276+
@locked
277+
def get_query(self, query_id: int) -> dict:
278+
self.cursor.execute("SELECT * FROM tracking_queries WHERE id = ?", (query_id, ))
279+
return dict(self.cursor.fetchone())
280+
281+
def load_one_page_from_query(self, query_id: int) -> int:
282+
"""Returns the number added"""
283+
query_obj = self.get_query(query_id)
284+
query_str = query_obj['query']
285+
if query_obj['up_to']:
286+
query_str = f"({query_str}) AND updatedDate>{query_obj['up_to']}"
287+
one_page = call_api(
288+
'search/works',
289+
{
290+
'q': query_str,
291+
'limit': self.page_size,
292+
'sort': 'updatedDate:asc',
293+
},
294+
)
295+
print(f"Got {len(one_page['results'])} / {one_page['totalHits']} for \"{query_str}\"")
296+
ret = 0
297+
for result in one_page['results']:
298+
self.upsert_work_from_api(result, tracking_query_id=query_id)
299+
ret += 1
300+
return ret
301+
197302
@locked
198303
def close(self):
199304
if self.conn:

0 commit comments

Comments
 (0)