Add opensearch-java module using java client#1868
Conversation
- Cloned external/opensearch to external/opensearch-java to introduce the new client as a drop-in replacement. - Updated Maven artifactId and names in the new local POMs (including the archetype). - Registered the new module in the root POM. This commit isolates the pure file duplication. The actual migration to the opensearch-java client will be done in the next commit to ensure a clean, readable Git diff for reviewers.
Introduces the external/opensearch-java module, replacing the deprecated RestHighLevelClient with the official opensearch-java client. Designed as a drop-in replacement for `external/opensearch` with identical configurations. Key improvements: - Implemented AsyncBulkProcessor (Semaphore + dedicated ThreadPool) to ensure strict backpressure and replace the legacy BulkProcessor. - Fixed historical tuple-ack race conditions in IndexerBolt and DeletionBolt. - Maintained RestClientTransport to seamlessly support the Sniffer and bypass the 100MB response buffer limit. - Synced recent upstream bugfixes, adapting resource cleanup to the new async architecture.
|
thanks @dpol1 |
Will do - I focused on the core logic first (
They are identical to the existing module - removing them in this PR.
Correct - the only change is the artifactId from |
Would be good to fix that in the existing module as a separate PR. Am about to push a refactoring of that module though, so maybe best to do after that if still applies? see #1869 |
|
Thanks by all means, I'll keep the race-condition fix as-is in this module for now. Once your PR on the legacy module lands, I'll double-check the legacy module afterwards and if needed apply to this module as well. |
Think there is only one PR left ( #1869 ) and we can move on. |
|
@dpol1 Think the blocker is gone. |
This commit aligns the opensearch-java module with recent legacy updates, completes the migration to HC5/API 3.x, and cleans up duplicated resources. Refactors and Alignment: - Ported DelegateRefresher for dynamic config reloading (apache#1870). - Adopted Storm V2 metrics bridge via CrawlerMetrics (apache#1846). - Aligned log messages and metric scopes to OpenSearch (apache#1871). - Ported WaitAckCache extraction to centralize bulk-ack logic (apache#1869). - Fixed a race condition in IndexerBolt by inverting the execution order, ensuring tuples are registered in waitAck before bulk dispatch. - Refactored BulkItemResponseToFailedFlag to a Java record with a compact constructor for strict null-safety. Maintenance and Cleanup: - Removed duplicated archetype, dashboards, and opensearch-conf.yaml to prevent maintenance overhead. - Updated README with a migration guide pointing to legacy resources. - Removed dead rat-exclude in root pom.xml.
de005ac to
d530818
Compare
|
Thanks @dpol1 Looks good, here are a couple of possible issues flagged by Claude 1. Timestamp serialization format change (MAJOR — data-compat risk) The new module writes timestamps as ISO-8601 strings where the legacy module writes epoch-millis longs:
Impact depends on index mapping:
Recommendation: either match legacy (toInstant().toEpochMilli()) or document the format change in the module README and make sure the example mappings under src/test/resources/{status,metrics,indexer}.mapping declare a format that 2. responseBufferSize config key silently dropped (MINOR regression) Legacy external/opensearch/.../OpenSearchConnection.java:283 reads opensearch.*.responseBufferSize (default 100 MB) and sets it on the HTTP client. The new OpenSearchConnection.java does not reference it. Users who tuned this for large Recommendation: either port the setting to the HC5 client builder or list it as a removed key in the new module's README. |
I tried switching to toEpochMilli(), but it immediately broke AggregationSpout (which expects a String) and caused the tests to fail against the date_optional_time mapping. I thought an hybrid approch which covers both but I don't think is solid - Maybe I forgot something but I've reverted to .toInstant().toString().
Yes properly documented now and for the sniffer as well! |
|
@dpol1 did you run a crawl with this new module? |
|
Yep, run a crawl locally - Injection worked fine, but the crawler blew up with a ClassCastException in AggregationSpout - Think I'll investigate in this way. Suggestions are welcome :-) |
|
Can you Post the full stacktrace? |
482512c to
6fd51e4
Compare
- sourceAsMap(JsonData) helper forces Jackson round-trip via LinkedHashMap
to avoid JsonDataImpl#to short-circuit returning raw Parsson JsonObject
(Map<String,JsonValue>) → CCE on (String) value casts downstream.
- Dedicated daemon single-thread executor per spout for supplyAsync,
replacing ForkJoinPool.commonPool; orderly shutdown in close().
- AbstractSpoutTest covers the deserialization path.
|
@dpol1 opensearch-java 3.x stores hit.source() as a jakarta.json.JsonObject (backed by Parsson). JsonData.to(Class) has this shortcut in JsonDataImpl:
Since Object.class/Map.class are assignable from JsonObject, hit.source().to(Object.class) returns the raw Parsson JsonObject whose values are JsonString/JsonNumber, not plain String. The (String) keyValues.get(...) then blows up. Could you check if the following does solve it? |
|
@rzo1 thanks for the analysis - that’s exactly what was blowing things up. I ended up applying the same
Think this approach is more solid, not to refine later - I ran the crawler locally via Docker and perfectly work but please double check. |
|
Thanks for the update. I am going to test this with the current Storm release candidate soon |
rzo1
left a comment
There was a problem hiding this comment.
Did run a crawl with the Storm RC 2.6.7 and it worked fine. Did not look into performance, but that is something we can tune later, if it turns out. Started from an archetype for the legacy opensearch client and just replaced the dependency. Did work fine.
|
Tested on a local crawl, no visible issues. Will merge shortly |
Summary
This PR brings
external/opensearch-java(the new module targeting OpenSearch Java Client 3.x + Apache HttpClient 5) up to date with the recent refactors landed inexternal/opensearch, and completes the initial scaffolding by removing duplicated resources that only make sense in the legacy module.This module migrates StormCrawler from the deprecated
RestHighLevelClientto the officialopensearch-javaclient (v3.8.0). Following the community suggestion, this is built as a separate module to act as a drop-in replacement. Users can migrate seamlessly by simply updating theirpom.xmlartifactId, with zero changes required for Flux topologies or YAML configuration keys.The legacy
external/opensearchmodule remains untouched in this PR to allow a gradual phase-out.Architectural Decisions & Engineering
Since the new
opensearch-javaclient introduces a completely different paradigm (fluent builders, strict JSON mappers) and removes several legacy utility classes, the following architectural decisions were made:1.
AsyncBulkProcessor& BackpressureThe legacy
BulkProcessorwas removed in the new client. To preventOutOfMemoryErrors and preserve Storm's backpressure, I implemented a customAsyncBulkProcessor:BulkOperations and flushes based on action count or aScheduledExecutorServicetimer.Semaphoreto limit concurrent in-flight HTTP requests.ThreadPoolExecutorwithCallerRunsPolicyto process async callbacks without starving the JVM'sForkJoinPool.commonPool().2. Transport & Modernization (HC5)
I evaluated the transport layer and decided to fully adopt the Apache HttpClient 5 (HC5) via
ApacheHttpClient5TransportBuilder.opensearch-rest-client-snifferis tightly coupled with the legacy HC4RestClient. Given the goal of modernizing the module, this was deemed an acceptable trade-off for this new 3.x-based implementation.3. Concurrency & Race Condition Fixes
During the migration, I identified and fixed a race condition in
IndexerBoltandDeletionBoltwhere tuples were added to the processor before being safely locked in thewaitAckmap. The locking order has been inverted to guarantee zero tuple loss during high-throughput flushes. [implemented in #1869 and aligned in this module as well]4. Upstream Bugfixes Sync
This module is perfectly aligned with
main. It incorporates the recent bugfixes applied to the legacy module, adapted for the new asynchronous paradigm:AbstractSpout.OpenSearchConnectionduringSnifferinitialization failures.TimerandOpenSearchClientmemory leaks inJSONResourceWrapperandJSONURLFilterWrapperby properly implementingcleanup().OpenSearch_queries), and fixed SLF4J placeholders.CrawlerMetricsbridge across all bolts and spouts to ensure compatibility with Storm 2.x metrics.OpenSearchClient.5. Maintenance Cleanup
To keep the codebase DRY, I removed duplicated resources that are identical to the legacy module (
archetype/,dashboards/, andopensearch-conf.yaml). The moduleREADME.mdhas been rewritten to guide users on how to reference the compatible legacy resources.Test plan
IndexerBolt,StatusUpdaterBolt,DeletionBolt, etc.) against a real OpenSearch instance using Testcontainers.AsyncBulkProcessorunder load to ensure it correctly flushes based on size/time thresholds and strictly respects theSemaphoreconcurrency limits without dropping tuples.nextFetchDateandtimestampfields conform to ISO-8601 format to prevent OpenSearch mapping errors.opensearch-javadependency and its associated configurations.Closes #1515