From 633351c6ebceb5e982d6082a43ae10ae95292b9d Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sun, 12 Apr 2026 19:10:10 +0200 Subject: [PATCH] Split put and put_file methods. --- examples/object_store.py | 2 +- .../natsrpy/_natsrpy_rs/js/object_store.pyi | 27 ++++++++- .../natsrpy/instrumentation/object_store.py | 31 ++++++++++ python/tests/test_object_store.py | 25 ++++++++- src/js/object_store.rs | 56 ++++++++++++++----- 5 files changed, 120 insertions(+), 21 deletions(-) diff --git a/examples/object_store.py b/examples/object_store.py index a9779dc..74c32ab 100644 --- a/examples/object_store.py +++ b/examples/object_store.py @@ -21,7 +21,7 @@ async def main() -> None: ), ) await store.put("test2.py", Path(__file__).read_bytes()) - await store.put("test.py", str(Path(__file__))) + await store.put_file("test.py", Path(__file__)) async for obj in await store.list(): print(obj) # noqa: T201 diff --git a/python/natsrpy/_natsrpy_rs/js/object_store.pyi b/python/natsrpy/_natsrpy_rs/js/object_store.pyi index 72f75e3..ff3d1ee 100644 --- a/python/natsrpy/_natsrpy_rs/js/object_store.pyi +++ b/python/natsrpy/_natsrpy_rs/js/object_store.pyi @@ -1,5 +1,6 @@ from asyncio import Future from datetime import datetime, timedelta +from os import PathLike from typing import Any, final from typing_extensions import Self, Writer @@ -130,16 +131,16 @@ class ObjectStore: def put( self, name: str, - value: bytes | str, + value: bytes | bytearray | memoryview, chunk_size: int = ..., # 24MB description: str | None = None, headers: dict[str, str | list[str]] | None = None, metadata: dict[str, str] | None = None, ) -> Future[None]: - """Upload an object to the store. + """Upload an object to the store from in-memory bytes. :param name: name for the stored object. - :param value: object content. + :param value: object content as bytes. :param chunk_size: size of upload chunks in bytes, defaults to 24 MB. :param description: human-readable object description. @@ -147,6 +148,26 @@ class ObjectStore: :param metadata: optional custom key-value metadata. """ + def put_file( + self, + name: str, + path: str | PathLike[str], + chunk_size: int | None = None, + description: str | None = None, + headers: dict[str, str | list[str]] | None = None, + metadata: dict[str, str] | None = None, + ) -> Future[None]: + """Upload an object to the store by streaming from a file. + + :param name: name for the stored object. + :param path: path to the file to upload. + :param chunk_size: size of read and upload chunks in bytes, + defaults to 200 KB. + :param description: human-readable object description. + :param headers: optional NATS headers. + :param metadata: optional custom key-value metadata. + """ + def delete(self, name: str) -> Future[None]: """Delete an object from the store. diff --git a/python/natsrpy/instrumentation/object_store.py b/python/natsrpy/instrumentation/object_store.py index 4ba7444..3043997 100644 --- a/python/natsrpy/instrumentation/object_store.py +++ b/python/natsrpy/instrumentation/object_store.py @@ -56,6 +56,7 @@ def __init__( def instrument(self) -> None: """Setup instrumentation for all ObjectStore methods.""" self._instrument_put() + self._instrument_put_file() self._instrument_get() self._instrument_delete() self._instrument_seal() @@ -71,6 +72,7 @@ def uninstrument() -> None: """Remove instrumentation from all ObjectStore methods.""" for method in ( "put", + "put_file", "get", "delete", "seal", @@ -117,6 +119,35 @@ def decorator( wrap_function_wrapper(_OS_MODULE, "ObjectStore.put", decorator) + def _instrument_put_file(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + name: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.OBJ_PUT) + .with_object_name(name) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.put_file", decorator) + def _instrument_get(self) -> None: tracer = self.tracer diff --git a/python/tests/test_object_store.py b/python/tests/test_object_store.py index e32d2f7..4bb5302 100644 --- a/python/tests/test_object_store.py +++ b/python/tests/test_object_store.py @@ -511,7 +511,7 @@ async def test_object_store_put_with_metadata(js: JetStream) -> None: await js.object_store.delete(bucket) -async def test_object_store_put_from_file(js: JetStream) -> None: +async def test_object_store_put_file_str_path(js: JetStream) -> None: bucket = f"test-os-putfile-{uuid.uuid4().hex[:8]}" config = ObjectStoreConfig(bucket=bucket) store = await js.object_store.create(config) @@ -522,7 +522,7 @@ async def test_object_store_put_from_file(js: JetStream) -> None: tmp_path = tmp.name try: - await store.put("file-object", tmp_path) + await store.put_file("file-object", tmp_path) writer = io.BytesIO() await store.get("file-object", writer) assert writer.getvalue() == file_content @@ -532,6 +532,27 @@ async def test_object_store_put_from_file(js: JetStream) -> None: await js.object_store.delete(bucket) +async def test_object_store_put_file_path_object(js: JetStream) -> None: + bucket = f"test-os-putfilepath-{uuid.uuid4().hex[:8]}" + config = ObjectStoreConfig(bucket=bucket) + store = await js.object_store.create(config) + try: + file_content = b"file content via pathlib" + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp.write(file_content) + tmp_path = Path(tmp.name) + + try: + await store.put_file("file-object", tmp_path) + writer = io.BytesIO() + await store.get("file-object", writer) + assert writer.getvalue() == file_content + finally: + tmp_path.unlink() + finally: + await js.object_store.delete(bucket) + + async def test_object_store_get_with_chunk_size(js: JetStream) -> None: bucket = f"test-os-getchunk-{uuid.uuid4().hex[:8]}" config = ObjectStoreConfig(bucket=bucket) diff --git a/src/js/object_store.rs b/src/js/object_store.rs index 2dbdd1a..0f45f2d 100644 --- a/src/js/object_store.rs +++ b/src/js/object_store.rs @@ -17,7 +17,7 @@ use crate::{ utils::{ headers::NatsrpyHeadermapExt, natsrpy_future, - py_types::{SendableValue, TimeValue, ToPyDate}, + py_types::{TimeValue, ToPyDate}, streamer::Streamer, }, }; @@ -253,7 +253,7 @@ impl ObjectStore { &self, py: Python<'py>, name: String, - value: SendableValue, + value: Vec, chunk_size: Option, description: Option, headers: Option>, @@ -269,19 +269,45 @@ impl ObjectStore { headers, }; natsrpy_future(py, async move { - match value { - SendableValue::Bytes(data) => { - let mut reader = tokio::io::BufReader::new(&*data); - ctx_guard.read().await.put(meta, &mut reader).await?; - } - SendableValue::String(filename) => { - let mut reader = tokio::io::BufReader::with_capacity( - chunk_size.unwrap_or(200 * 1024), - tokio::fs::File::open(filename).await?, - ); - ctx_guard.read().await.put(meta, &mut reader).await?; - } - } + let mut reader = tokio::io::BufReader::new(value.as_slice()); + ctx_guard.read().await.put(meta, &mut reader).await?; + Ok(()) + }) + } + + #[pyo3(signature=( + name, + path, + chunk_size=None, + description=None, + headers=None, + metadata=None, + ))] + pub fn put_file<'py>( + &self, + py: Python<'py>, + name: String, + path: std::path::PathBuf, + chunk_size: Option, + description: Option, + headers: Option>, + metadata: Option>, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + let headers = headers.map(|val| HeaderMap::from_pydict(val)).transpose()?; + let meta = async_nats::jetstream::object_store::ObjectMetadata { + name, + chunk_size, + description, + metadata: metadata.unwrap_or_default(), + headers, + }; + natsrpy_future(py, async move { + let mut reader = tokio::io::BufReader::with_capacity( + chunk_size.unwrap_or(200 * 1024), + tokio::fs::File::open(path).await?, + ); + ctx_guard.read().await.put(meta, &mut reader).await?; Ok(()) }) }