Skip to content

Commit 9fc25c8

Browse files
committed
WIP
1 parent 8276c27 commit 9fc25c8

File tree

4 files changed

+455
-0
lines changed

4 files changed

+455
-0
lines changed
Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
from __future__ import annotations
2+
import json
3+
from collections import Generator
4+
from concurrent.futures import Future
5+
from queue import Queue
6+
from threading import Lock, Semaphore
7+
from typing import Optional
8+
9+
import requests
10+
11+
from ravendb.exceptions.raven_exceptions import RavenException
12+
from ravendb.http.server_node import ServerNode
13+
from ravendb.http.raven_command import RavenCommand
14+
from ravendb.documents.operations.misc import GetOperationStateOperation
15+
from ravendb.documents.session.entity_to_json import EntityToJson
16+
from ravendb.documents.session.document_info import DocumentInfo
17+
from ravendb import DocumentStore, MetadataAsDictionary, constants
18+
from ravendb.documents.commands.batches import CommandType
19+
from ravendb.documents.commands.bulkinsert import GetNextOperationIdCommand, KillOperationCommand
20+
from ravendb.exceptions.documents.bulkinsert import BulkInsertAbortedException
21+
from ravendb.documents.identity.hilo import GenerateEntityIdOnTheClient
22+
23+
24+
class BulkInsertOperation:
25+
class _BufferExposer(Generator):
26+
def __init__(self):
27+
self._data = {}
28+
self._done = Future()
29+
self.output_stream = Future()
30+
self._yield_buffer_semaphore = Semaphore(0)
31+
self._flushed_buffers_queue = Queue()
32+
33+
def enqueue_buffer(self, buffer: bytearray):
34+
self._flushed_buffers_queue.put(buffer)
35+
36+
def send_data(self): # todo: prepare a send loop not just single one
37+
with self._yield_buffer_semaphore.acquire():
38+
yield self._flushed_buffers_queue.get()
39+
40+
def is_done(self) -> bool:
41+
return self._done.done()
42+
43+
def done(self):
44+
self._done.set_result(None)
45+
46+
def error_on_processing_request(self, exception: Exception):
47+
self._done.set_exception(exception)
48+
49+
def error_on_request_start(self, exception: Exception):
50+
self.output_stream.set_exception(exception)
51+
52+
class _BulkInsertCommand(RavenCommand[requests.Response]):
53+
def __init__(self, key: int, buffer_exposer: BulkInsertOperation._BufferExposer, node_tag: str):
54+
super().__init__(requests.Response)
55+
self._buffer_exposer = buffer_exposer
56+
self._key = key
57+
self._selected_node_tag = node_tag
58+
self.use_compression = False
59+
60+
def create_request(self, node: ServerNode) -> requests.Request:
61+
return requests.Request(
62+
"POST",
63+
f"{node.url}/databases/{node.database}/bulk_insert?id={self._key}",
64+
data=self._buffer_exposer.send_data(),
65+
)
66+
67+
def set_response(self, response: Optional[str], from_cache: bool) -> None:
68+
raise NotImplementedError("Not Implemented")
69+
70+
def is_read_request(self) -> bool:
71+
return False
72+
73+
def send(self, session: requests.Session, request: requests.Request) -> requests.Response:
74+
try:
75+
return super().send(session, request)
76+
except Exception as e:
77+
self._buffer_exposer.error_on_request_start(e)
78+
79+
def __init__(self, database: str = None, store: DocumentStore = None):
80+
self.use_compression = False
81+
82+
self._bulk_insert_execute_task = Future()
83+
self._first = True
84+
self._in_progress_command: Optional[CommandType] = None
85+
self._operation_id = -1
86+
self._node_tag = None
87+
self._concurrent_check = 0
88+
self._concurrent_check_lock = Lock()
89+
90+
self._thread_pool_executor = store.thread_pool_executor
91+
self._conventions = store.conventions
92+
if not database or database.isspace():
93+
self._throw_no_database()
94+
self._request_executor = store.get_request_executor(database)
95+
96+
self._async_write = Future()
97+
self._async_write.set_result(None)
98+
99+
self._max_size_in_buffer = 1024 * 1024
100+
101+
self._current_data_buffer = bytearray(self._max_size_in_buffer)
102+
103+
# todo: self._time_series_batch_size = self._conventions.time_series_batch_size
104+
self._buffer_exposer = BulkInsertOperation._BufferExposer()
105+
106+
self._generate_entity_id_on_the_client = GenerateEntityIdOnTheClient(
107+
self._request_executor.conventions,
108+
lambda entity: self._request_executor.conventions.generate_document_id(database, entity),
109+
)
110+
111+
def __enter__(self):
112+
return self
113+
114+
def __exit__(self, exc_type, exc_val, exc_tb):
115+
self._end_previous_command_if_needed()
116+
117+
flush_ex = None
118+
119+
if self._buffer_exposer.is_done():
120+
return
121+
122+
if self._current_data_buffer:
123+
try:
124+
self._current_data_buffer += bytearray("}", encoding="utf-8")
125+
self._async_write.result()
126+
127+
buffer = self._current_data_buffer
128+
self._buffer_exposer.enqueue_buffer(buffer)
129+
except Exception as e:
130+
flush_ex = e
131+
132+
self._buffer_exposer.done()
133+
134+
if self._operation_id == -1:
135+
# closing without calling a single store
136+
return
137+
138+
if self._bulk_insert_execute_task is not None:
139+
try:
140+
self._bulk_insert_execute_task.result()
141+
except Exception as e:
142+
self._throw_bulk_insert_aborted(e, flush_ex)
143+
144+
def _throw_bulk_insert_aborted(self, e: Exception, flush_ex: Optional[Exception]):
145+
error_from_server = None
146+
try:
147+
error_from_server = self._get_exception_from_operation()
148+
except Exception:
149+
pass # server is probably down, will propagate the original exception
150+
151+
if error_from_server is not None:
152+
raise error_from_server
153+
154+
raise BulkInsertAbortedException("Failed to execute bulk insert", e or flush_ex)
155+
156+
def _throw_no_database(self):
157+
raise RuntimeError(
158+
"Cannot start bulk insert operation without specifying a name of a database to operate on. "
159+
"Database name can be passed as an argument when bulk insert is being created or "
160+
"default database can be defined using 'DocumentStore.database' property."
161+
)
162+
163+
def _wait_for_id(self):
164+
if self._operation_id != -1:
165+
return
166+
167+
bulk_insert_get_id_request = GetNextOperationIdCommand()
168+
self._request_executor.execute_command(bulk_insert_get_id_request)
169+
self._operation_id = bulk_insert_get_id_request.result
170+
self._node_tag = bulk_insert_get_id_request._node_tag
171+
172+
def store_by_metadata(self, entity: object, metadata: Optional[MetadataAsDictionary] = None) -> str:
173+
key = (
174+
self._get_id(entity)
175+
if metadata is None or constants.Documents.Metadata.ID not in metadata
176+
else metadata[constants.Documents.Metadata.ID]
177+
)
178+
179+
self.store(entity, key, metadata)
180+
return key
181+
182+
def store(self, entity: object, key: str, metadata: Optional[MetadataAsDictionary] = None) -> None:
183+
try:
184+
with self._concurrency_check() as check:
185+
self._verify_valid_key(key)
186+
187+
self._execute_before_store()
188+
if metadata is None:
189+
metadata = MetadataAsDictionary()
190+
191+
if constants.Documents.Metadata.COLLECTION not in metadata:
192+
collection = self._request_executor.conventions.get_collection_name(entity)
193+
if collection is not None:
194+
metadata[constants.Documents.Metadata.COLLECTION] = collection
195+
196+
if constants.Documents.Metadata.RAVEN_PYTHON_TYPE not in metadata:
197+
python_type = self._request_executor.conventions.get_python_class_name(entity.__class__)
198+
if python_type is not None:
199+
metadata[constants.Documents.Metadata.RAVEN_PYTHON_TYPE] = python_type
200+
201+
self._end_previous_command_if_needed()
202+
203+
try:
204+
if not self._first:
205+
self._write_comma()
206+
207+
self._first = False
208+
self._in_progress_command = CommandType.NONE
209+
self._current_data_buffer += bytearray('{"Id":"', encoding="utf-8")
210+
self._write_string(key)
211+
self._current_data_buffer += bytearray('","Type":"PUT","Document":', encoding="utf-8")
212+
213+
self._flush_if_needed()
214+
215+
document_info = DocumentInfo(metadata_instance=metadata)
216+
json_dict = EntityToJson.convert_entity_to_json_internal_static(
217+
entity, self._conventions, document_info, True
218+
)
219+
220+
self._current_data_buffer += bytearray(json.dumps(json_dict), encoding="utf-8")
221+
self._current_data_buffer += bytearray("}", encoding="utf-8")
222+
except Exception as e:
223+
self._handle_errors(key, e)
224+
finally:
225+
with self._concurrent_check_lock:
226+
self._concurrent_check = 0
227+
228+
def _handle_errors(self, document_id: str, e: Exception) -> None:
229+
error = self._get_exception_from_operation()
230+
if error is not None:
231+
raise error
232+
233+
self._throw_on_unavailable_stream(document_id, e)
234+
235+
def _concurrency_check(self):
236+
with self._concurrent_check_lock:
237+
if not self._concurrent_check == 0:
238+
raise RuntimeError("Bulk Insert store methods cannot be executed concurrently.")
239+
self._concurrent_check = 1
240+
241+
def __return_func():
242+
with self._concurrent_check_lock:
243+
if self._concurrent_check == 1:
244+
self._concurrent_check = 0
245+
246+
return __return_func
247+
248+
def _flush_if_needed(self) -> None:
249+
if len(self._current_data_buffer) > self._max_size_in_buffer or self._async_write.done():
250+
self._async_write.result()
251+
252+
buffer = self._current_data_buffer
253+
self._current_data_buffer.clear()
254+
255+
# todo: check if it's better to create a new bytearray of max size or clear it (possible dealloc)
256+
257+
def __async_write():
258+
self._buffer_exposer.enqueue_buffer(buffer)
259+
return None
260+
261+
self._async_write = self._thread_pool_executor.submit(__async_write)
262+
263+
def _end_previous_command_if_needed(self) -> None:
264+
if self._in_progress_command == CommandType.COUNTERS:
265+
pass # todo: counters
266+
elif self._in_progress_command == CommandType.TIME_SERIES:
267+
pass # todo: time series
268+
269+
def _write_string(self, input_string: str) -> None:
270+
for i in range(len(input_string)):
271+
c = input_string[i]
272+
if '"' == c:
273+
if i == 0 or input_string[i - 1] != "\\":
274+
self._current_data_buffer += bytearray("\\", encoding="utf-8")
275+
276+
self._current_data_buffer += bytearray(c, encoding="utf-8")
277+
278+
def _write_comma(self) -> None:
279+
self._current_data_buffer += bytearray(",", encoding="utf-8")
280+
281+
def _execute_before_store(self) -> None:
282+
if self._bulk_insert_execute_task is None: # todo: check if it's valid way
283+
self._wait_for_id()
284+
self._ensure_execute_task()
285+
286+
if (
287+
self._bulk_insert_execute_task.exception()
288+
): # todo: check if isCompletedExceptionally returns false if task isn't finished
289+
try:
290+
self._bulk_insert_execute_task.result()
291+
except Exception as e:
292+
self._throw_bulk_insert_aborted(e, None)
293+
294+
@staticmethod
295+
def _verify_valid_id(key: str) -> None:
296+
if not key or key.isspace():
297+
raise ValueError("Document id must have a non empty value")
298+
299+
if key.endswith("|"):
300+
raise RuntimeError(f"Document ids cannot end with '|', but was called with {key}")
301+
302+
def _get_exception_from_operation(self) -> Optional[BulkInsertAbortedException]:
303+
state_request = GetOperationStateOperation.GetOperationStateCommand(self._operation_id, self._node_tag)
304+
self._request_executor.execute_command(state_request)
305+
306+
if "Faulted" != state_request.result["Status"]:
307+
return None
308+
309+
result = state_request.result["Result"]
310+
311+
if result["$type"].starts_with("Raven.Client.Documents.Operations.OperationExceptionResult"):
312+
return BulkInsertAbortedException(result["Error"])
313+
314+
return None
315+
316+
def _ensure_execute_task(self) -> None:
317+
try:
318+
bulk_command = BulkInsertOperation._BulkInsertCommand(
319+
self._operation_id, self._buffer_exposer, self._node_tag
320+
)
321+
bulk_command.use_compression = self.use_compression
322+
323+
def __core_async():
324+
self._request_executor.execute_command(bulk_command)
325+
return None
326+
327+
self._bulk_insert_execute_task = self._thread_pool_executor.submit(__core_async)
328+
self._current_data_buffer += bytearray("[", encoding="utf-8")
329+
330+
except Exception as e:
331+
raise RavenException("Unable to open bulk insert stream", e)
332+
333+
def _throw_on_unavailable_stream(self, key: str, inner_ex: Exception) -> None:
334+
self._buffer_exposer.error_on_processing_request(
335+
BulkInsertAbortedException(f"Write to stream failed at document with id {key}", inner_ex)
336+
)
337+
338+
def abort(self) -> None:
339+
if self._operation_id == -1:
340+
return # nothing was done, nothing to kill
341+
342+
self._wait_for_id()
343+
344+
try:
345+
self._request_executor.execute_command(KillOperationCommand(self._operation_id, self._node_tag))
346+
except RavenException as e:
347+
raise BulkInsertAbortedException(
348+
"Unable to kill this bulk insert operation, because it was not found on the server", e
349+
)
350+
351+
def _get_id(self, entity: object) -> str:
352+
success, key = self._generate_entity_id_on_the_client.try_get_id_from_instance(entity)
353+
if success:
354+
return key
355+
356+
key = self._generate_entity_id_on_the_client.generate_document_key_for_storage(entity)
357+
358+
self._generate_entity_id_on_the_client.try_set_identity(entity, key)
359+
return key
360+
361+
# todo: attachments_for
362+
# todo: time_series_for
363+
# todo: CountersBulkInsert
364+
# todo: CountersBulkInsertOperation
365+
# todo: TimeSeriesBulkInsertBase
366+
# todo: TimeSeriesBulkInsert
367+
# todo: TypedTimeSeriesBulkInsert
368+
# todo: AttachmentsBulkInsert
369+
# todo: AttachmentsBulkInsertOperation

0 commit comments

Comments
 (0)