diff --git a/python/natsrpy/instrumentation/__init__.py b/python/natsrpy/instrumentation/__init__.py index 96b4117..f3c2841 100644 --- a/python/natsrpy/instrumentation/__init__.py +++ b/python/natsrpy/instrumentation/__init__.py @@ -40,7 +40,9 @@ async def main() -> None: from .js_consumer import JSConsumerInstrumentation from .js_publish import JSPublishInstrumentation +from .kv import KVInstrumentation from .nats_core import NatsCoreInstrumentator +from .object_store import ObjectStoreInstrumentation try: import opentelemetry # noqa: F401 @@ -105,8 +107,18 @@ def _instrument(self, **kwargs: Any) -> None: capture_body=capture_body, capture_headers=capture_headers, ).instrument() + KVInstrumentation( + tracer, + capture_body=capture_body, + ).instrument() + ObjectStoreInstrumentation( + tracer, + capture_body=capture_body, + ).instrument() def _uninstrument(self, **kwargs: Any) -> None: NatsCoreInstrumentator.uninstrument() JSConsumerInstrumentation.uninstrument() JSPublishInstrumentation.uninstrument() + KVInstrumentation.uninstrument() + ObjectStoreInstrumentation.uninstrument() diff --git a/python/natsrpy/instrumentation/kv.py b/python/natsrpy/instrumentation/kv.py new file mode 100644 index 0000000..2a7b969 --- /dev/null +++ b/python/natsrpy/instrumentation/kv.py @@ -0,0 +1,413 @@ +from typing import Any + +from opentelemetry import trace +from opentelemetry.instrumentation.utils import is_instrumentation_enabled, unwrap +from opentelemetry.trace import SpanKind, Tracer +from typing_extensions import Self +from wrapt import ObjectProxy, wrap_function_wrapper + +from natsrpy.js import KeyValue, KVEntry, KVEntryIterator + +from .span_builder import SpanAction, SpanBuilder + +_KV_MODULE = "natsrpy._natsrpy_rs.js.kv" + + +class KVEntryIteratorProxy(ObjectProxy): # type: ignore + """Proxy for KVEntryIterator returned by watch and history methods.""" + + def __init__( + self, + wrapped: KVEntryIterator, + tracer: Tracer, + bucket: str, + capture_body: bool, + ) -> None: + super().__init__(wrapped) + self._self_tracer = tracer + self._self_bucket = bucket + self._self_capture_body = capture_body + + def __aiter__(self) -> Self: + return self + + async def __anext__(self) -> KVEntry: + entry: KVEntry = await anext(self.__wrapped__) + if not is_instrumentation_enabled(): + return entry + span = ( + SpanBuilder(self._self_tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) + .with_kv_bucket(self._self_bucket) + .with_kv_key(entry.key) + .with_kv_value(entry.value, capture_body=self._self_capture_body) + .build() + ) + with trace.use_span(span, end_on_exit=True): + pass + return entry + + +class KVInstrumentation: + """Instrument KeyValue store operations.""" + + def __init__( + self, + tracer: Tracer, + capture_body: bool = False, + ) -> None: + self.tracer = tracer + self.capture_body = capture_body + + def instrument(self) -> None: + """Setup instrumentation for all KeyValue methods.""" + self._instrument_put() + self._instrument_create() + self._instrument_update() + self._instrument_delete() + self._instrument_purge() + self._instrument_get() + self._instrument_entry() + self._instrument_history() + self._instrument_watch_methods() + self._instrument_keys() + + @staticmethod + def uninstrument() -> None: + """Remove instrumentation from all KeyValue methods.""" + for method in ( + "put", + "create", + "update", + "delete", + "purge", + "get", + "entry", + "history", + "watch", + "watch_with_history", + "watch_all", + "watch_many", + "watch_many_with_history", + "keys", + ): + unwrap(KeyValue, method) + + def _instrument_put(self) -> None: + tracer = self.tracer + capture_body = self.capture_body + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + value: bytes | str = args[1] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.KV_PUT) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .with_kv_value(value, capture_body=capture_body) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.put", decorator) + + def _instrument_create(self) -> None: + tracer = self.tracer + capture_body = self.capture_body + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + value: bytes | str = args[1] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.KV_CREATE) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .with_kv_value(value, capture_body=capture_body) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.create", decorator) + + def _instrument_update(self) -> None: + tracer = self.tracer + capture_body = self.capture_body + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + value: bytes | str = args[1] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.KV_UPDATE) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .with_kv_value(value, capture_body=capture_body) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.update", decorator) + + def _instrument_delete(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.KV_DELETE) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.delete", decorator) + + def _instrument_purge(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.KV_PURGE) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.purge", decorator) + + def _instrument_get(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.KV_GET) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.get", decorator) + + def _instrument_entry(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + key: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.KV_ENTRY) + .with_kv_bucket(instance.name) + .with_kv_key(key) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.entry", decorator) + + def _instrument_history(self) -> None: + tracer = self.tracer + capture_body = self.capture_body + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + iterator: KVEntryIterator = await wrapper(*args, **kwargs) + return KVEntryIteratorProxy(iterator, tracer, instance.name, capture_body) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.history", decorator) + + def _instrument_watch_methods(self) -> None: + tracer = self.tracer + capture_body = self.capture_body + + def make_decorator() -> Any: + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + iterator: KVEntryIterator = await wrapper(*args, **kwargs) + return KVEntryIteratorProxy( + iterator, + tracer, + instance.name, + capture_body, + ) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + return decorator + + for method in ( + "watch", + "watch_with_history", + "watch_all", + "watch_many", + "watch_many_with_history", + ): + wrap_function_wrapper(_KV_MODULE, f"KeyValue.{method}", make_decorator()) + + def _instrument_keys(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.KV_KEYS) + .with_kv_bucket(instance.name) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + instance: KeyValue, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, instance, args, kwargs) + + wrap_function_wrapper(_KV_MODULE, "KeyValue.keys", decorator) diff --git a/python/natsrpy/instrumentation/object_store.py b/python/natsrpy/instrumentation/object_store.py new file mode 100644 index 0000000..4ba7444 --- /dev/null +++ b/python/natsrpy/instrumentation/object_store.py @@ -0,0 +1,366 @@ +from typing import Any + +from opentelemetry import trace +from opentelemetry.instrumentation.utils import is_instrumentation_enabled, unwrap +from opentelemetry.trace import SpanKind, Tracer +from typing_extensions import Self +from wrapt import ObjectProxy, wrap_function_wrapper + +from natsrpy.js import ObjectInfo, ObjectInfoIterator, ObjectStore + +from .span_builder import SpanAction, SpanBuilder + +_OS_MODULE = "natsrpy._natsrpy_rs.js.object_store" + + +class ObjectInfoIteratorProxy(ObjectProxy): # type: ignore + """Proxy for ObjectInfoIterator returned by watch and list methods.""" + + def __init__( + self, + wrapped: ObjectInfoIterator, + tracer: Tracer, + ) -> None: + super().__init__(wrapped) + self._self_tracer = tracer + + def __aiter__(self) -> Self: + return self + + async def __anext__(self) -> ObjectInfo: + info: ObjectInfo = await anext(self.__wrapped__) + if not is_instrumentation_enabled(): + return info + span = ( + SpanBuilder(self._self_tracer, SpanKind.CONSUMER, SpanAction.RECEIVE) + .with_object_name(info.name) + .with_object_size(info.size) + .build() + ) + with trace.use_span(span, end_on_exit=True): + pass + return info + + +class ObjectStoreInstrumentation: + """Instrument ObjectStore operations.""" + + def __init__( + self, + tracer: Tracer, + capture_body: bool = False, + ) -> None: + self.tracer = tracer + self.capture_body = capture_body + + def instrument(self) -> None: + """Setup instrumentation for all ObjectStore methods.""" + self._instrument_put() + self._instrument_get() + self._instrument_delete() + self._instrument_seal() + self._instrument_get_info() + self._instrument_watch() + self._instrument_list() + self._instrument_link_bucket() + self._instrument_link_object() + self._instrument_update_metadata() + + @staticmethod + def uninstrument() -> None: + """Remove instrumentation from all ObjectStore methods.""" + for method in ( + "put", + "get", + "delete", + "seal", + "get_info", + "watch", + "list", + "link_bucket", + "link_object", + "update_metadata", + ): + unwrap(ObjectStore, method) + + def _instrument_put(self) -> None: + tracer = self.tracer + capture_body = self.capture_body + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + name: str = args[0] + value: bytes | str = args[1] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.OBJ_PUT) + .with_object_name(name) + .with_object_size(len(value)) + .build() + ) + if capture_body: + span.set_attribute("nats.object_store.value", value) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.put", decorator) + + def _instrument_get(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + name: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.OBJ_GET) + .with_object_name(name) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.get", decorator) + + def _instrument_delete(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + name: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.OBJ_DELETE) + .with_object_name(name) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.delete", decorator) + + def _instrument_seal(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + span = SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.OBJ_SEAL).build() + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.seal", decorator) + + def _instrument_get_info(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + name: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.OBJ_INFO) + .with_object_name(name) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.get_info", decorator) + + def _instrument_watch(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + iterator: ObjectInfoIterator = await wrapper(*args, **kwargs) + return ObjectInfoIteratorProxy(iterator, tracer) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.watch", decorator) + + def _instrument_list(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + iterator: ObjectInfoIterator = await wrapper(*args, **kwargs) + return ObjectInfoIteratorProxy(iterator, tracer) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.list", decorator) + + def _instrument_link_bucket(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + src_bucket: str = args[0] + dest: str = args[1] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.OBJ_LINK) + .with_object_name(dest) + .build() + ) + span.set_attribute("nats.object_store.link_source", src_bucket) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.link_bucket", decorator) + + def _instrument_link_object(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + src: str = args[0] + dest: str = args[1] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.OBJ_LINK) + .with_object_name(dest) + .build() + ) + span.set_attribute("nats.object_store.link_source", src) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.link_object", decorator) + + def _instrument_update_metadata(self) -> None: + tracer = self.tracer + + async def _wrapped( + wrapper: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + if not is_instrumentation_enabled(): + return await wrapper(*args, **kwargs) + name: str = args[0] + span = ( + SpanBuilder(tracer, SpanKind.CLIENT, SpanAction.OBJ_UPDATE_METADATA) + .with_object_name(name) + .build() + ) + with trace.use_span(span, end_on_exit=True): + return await wrapper(*args, **kwargs) + + def decorator( + wrapper: Any, + _: ObjectStore, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ) -> Any: + return _wrapped(wrapper, args, kwargs) + + wrap_function_wrapper(_OS_MODULE, "ObjectStore.update_metadata", decorator) diff --git a/python/natsrpy/instrumentation/span_builder.py b/python/natsrpy/instrumentation/span_builder.py index b5f24bf..a62f0a7 100644 --- a/python/natsrpy/instrumentation/span_builder.py +++ b/python/natsrpy/instrumentation/span_builder.py @@ -30,6 +30,27 @@ class SpanAction(enum.Enum): PROGRESS = "progress" TERM = "term" NEXT = "next" + # KeyValue operations + KV_GET = "kv.get" + KV_PUT = "kv.put" + KV_CREATE = "kv.create" + KV_UPDATE = "kv.update" + KV_DELETE = "kv.delete" + KV_PURGE = "kv.purge" + KV_WATCH = "kv.watch" + KV_KEYS = "kv.keys" + KV_HISTORY = "kv.history" + KV_ENTRY = "kv.entry" + # ObjectStore operations + OBJ_PUT = "obj.put" + OBJ_GET = "obj.get" + OBJ_DELETE = "obj.delete" + OBJ_SEAL = "obj.seal" + OBJ_INFO = "obj.info" + OBJ_WATCH = "obj.watch" + OBJ_LIST = "obj.list" + OBJ_LINK = "obj.link" + OBJ_UPDATE_METADATA = "obj.update_metadata" class SpanBuilder: @@ -100,6 +121,35 @@ def with_js_message( self.with_headers(msg.headers) return self + def with_kv_bucket(self, bucket: str) -> Self: + """Set key-value bucket name.""" + self.attributes[MESSAGING_DESTINATION_NAME] = bucket + self.attributes["nats.kv.bucket"] = bucket + return self + + def with_kv_key(self, key: str) -> Self: + """Set key-value entry key.""" + self.attributes["nats.kv.key"] = key + return self + + def with_kv_value(self, value: bytes | str, capture_body: bool = False) -> Self: + """Set key-value entry value attributes.""" + self.attributes[MESSAGING_MESSAGE_BODY_SIZE] = len(value) + if capture_body: + self.attributes["nats.kv.value"] = value + return self + + def with_object_name(self, name: str) -> Self: + """Set object store object name.""" + self.attributes[MESSAGING_DESTINATION_NAME] = name + self.attributes["nats.object_store.name"] = name + return self + + def with_object_size(self, size: int) -> Self: + """Set object size in bytes.""" + self.attributes[MESSAGING_MESSAGE_BODY_SIZE] = size + return self + def with_links(self, links: Sequence[Link]) -> Self: """Attache linked spans.""" self.links = links