Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def main() -> None:
),
)
await store.put("test2.py", Path(__file__).read_bytes())
await store.put("test.py", str(Path(__file__)))
await store.put_file("test.py", Path(__file__))

async for obj in await store.list():
print(obj) # noqa: T201
Expand Down
27 changes: 24 additions & 3 deletions python/natsrpy/_natsrpy_rs/js/object_store.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from asyncio import Future
from datetime import datetime, timedelta
from os import PathLike
from typing import Any, final

from typing_extensions import Self, Writer
Expand Down Expand Up @@ -130,23 +131,43 @@ class ObjectStore:
def put(
self,
name: str,
value: bytes | str,
value: bytes | bytearray | memoryview,
chunk_size: int = ..., # 24MB
description: str | None = None,
headers: dict[str, str | list[str]] | None = None,
metadata: dict[str, str] | None = None,
) -> Future[None]:
"""Upload an object to the store.
"""Upload an object to the store from in-memory bytes.

:param name: name for the stored object.
:param value: object content.
:param value: object content as bytes.
:param chunk_size: size of upload chunks in bytes,
defaults to 24 MB.
:param description: human-readable object description.
:param headers: optional NATS headers.
:param metadata: optional custom key-value metadata.
"""

def put_file(
self,
name: str,
path: str | PathLike[str],
chunk_size: int | None = None,
description: str | None = None,
headers: dict[str, str | list[str]] | None = None,
metadata: dict[str, str] | None = None,
) -> Future[None]:
"""Upload an object to the store by streaming from a file.

:param name: name for the stored object.
:param path: path to the file to upload.
:param chunk_size: size of read and upload chunks in bytes,
defaults to 200 KB.
:param description: human-readable object description.
:param headers: optional NATS headers.
:param metadata: optional custom key-value metadata.
"""

def delete(self, name: str) -> Future[None]:
"""Delete an object from the store.

Expand Down
31 changes: 31 additions & 0 deletions python/natsrpy/instrumentation/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
def instrument(self) -> None:
"""Setup instrumentation for all ObjectStore methods."""
self._instrument_put()
self._instrument_put_file()
self._instrument_get()
self._instrument_delete()
self._instrument_seal()
Expand All @@ -71,6 +72,7 @@ def uninstrument() -> None:
"""Remove instrumentation from all ObjectStore methods."""
for method in (
"put",
"put_file",
"get",
"delete",
"seal",
Expand Down Expand Up @@ -117,6 +119,35 @@ def decorator(

wrap_function_wrapper(_OS_MODULE, "ObjectStore.put", decorator)

def _instrument_put_file(self) -> None:
tracer = self.tracer

async def _wrapped(
wrapper: Any,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Any:
if not is_instrumentation_enabled():
return await wrapper(*args, **kwargs)
name: str = args[0]
span = (
SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.OBJ_PUT)
.with_object_name(name)
.build()
)
with trace.use_span(span, end_on_exit=True):
return await wrapper(*args, **kwargs)

def decorator(
wrapper: Any,
_: ObjectStore,
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Any:
return _wrapped(wrapper, args, kwargs)

wrap_function_wrapper(_OS_MODULE, "ObjectStore.put_file", decorator)

def _instrument_get(self) -> None:
tracer = self.tracer

Expand Down
25 changes: 23 additions & 2 deletions python/tests/test_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ async def test_object_store_put_with_metadata(js: JetStream) -> None:
await js.object_store.delete(bucket)


async def test_object_store_put_from_file(js: JetStream) -> None:
async def test_object_store_put_file_str_path(js: JetStream) -> None:
bucket = f"test-os-putfile-{uuid.uuid4().hex[:8]}"
config = ObjectStoreConfig(bucket=bucket)
store = await js.object_store.create(config)
Expand All @@ -522,7 +522,7 @@ async def test_object_store_put_from_file(js: JetStream) -> None:
tmp_path = tmp.name

try:
await store.put("file-object", tmp_path)
await store.put_file("file-object", tmp_path)
writer = io.BytesIO()
await store.get("file-object", writer)
assert writer.getvalue() == file_content
Expand All @@ -532,6 +532,27 @@ async def test_object_store_put_from_file(js: JetStream) -> None:
await js.object_store.delete(bucket)


async def test_object_store_put_file_path_object(js: JetStream) -> None:
bucket = f"test-os-putfilepath-{uuid.uuid4().hex[:8]}"
config = ObjectStoreConfig(bucket=bucket)
store = await js.object_store.create(config)
try:
file_content = b"file content via pathlib"
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(file_content)
tmp_path = Path(tmp.name)

try:
await store.put_file("file-object", tmp_path)
writer = io.BytesIO()
await store.get("file-object", writer)
assert writer.getvalue() == file_content
finally:
tmp_path.unlink()
finally:
await js.object_store.delete(bucket)


async def test_object_store_get_with_chunk_size(js: JetStream) -> None:
bucket = f"test-os-getchunk-{uuid.uuid4().hex[:8]}"
config = ObjectStoreConfig(bucket=bucket)
Expand Down
56 changes: 41 additions & 15 deletions src/js/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
utils::{
headers::NatsrpyHeadermapExt,
natsrpy_future,
py_types::{SendableValue, TimeValue, ToPyDate},
py_types::{TimeValue, ToPyDate},
streamer::Streamer,
},
};
Expand Down Expand Up @@ -253,7 +253,7 @@ impl ObjectStore {
&self,
py: Python<'py>,
name: String,
value: SendableValue,
value: Vec<u8>,
chunk_size: Option<usize>,
description: Option<String>,
headers: Option<Bound<'py, PyDict>>,
Expand All @@ -269,19 +269,45 @@ impl ObjectStore {
headers,
};
natsrpy_future(py, async move {
match value {
SendableValue::Bytes(data) => {
let mut reader = tokio::io::BufReader::new(&*data);
ctx_guard.read().await.put(meta, &mut reader).await?;
}
SendableValue::String(filename) => {
let mut reader = tokio::io::BufReader::with_capacity(
chunk_size.unwrap_or(200 * 1024),
tokio::fs::File::open(filename).await?,
);
ctx_guard.read().await.put(meta, &mut reader).await?;
}
}
let mut reader = tokio::io::BufReader::new(value.as_slice());
ctx_guard.read().await.put(meta, &mut reader).await?;
Ok(())
})
}

#[pyo3(signature=(
name,
path,
chunk_size=None,
description=None,
headers=None,
metadata=None,
))]
pub fn put_file<'py>(
&self,
py: Python<'py>,
name: String,
path: std::path::PathBuf,
chunk_size: Option<usize>,
description: Option<String>,
headers: Option<Bound<'py, PyDict>>,
metadata: Option<HashMap<String, String>>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx_guard = self.object_store.clone();
let headers = headers.map(|val| HeaderMap::from_pydict(val)).transpose()?;
let meta = async_nats::jetstream::object_store::ObjectMetadata {
name,
chunk_size,
description,
metadata: metadata.unwrap_or_default(),
headers,
};
natsrpy_future(py, async move {
let mut reader = tokio::io::BufReader::with_capacity(
chunk_size.unwrap_or(200 * 1024),
tokio::fs::File::open(path).await?,
);
ctx_guard.read().await.put(meta, &mut reader).await?;
Ok(())
})
}
Expand Down
Loading