diff --git a/src/collect_rewards_lib.rs b/src/collect_rewards_lib.rs index 9402d65..eb601ed 100644 --- a/src/collect_rewards_lib.rs +++ b/src/collect_rewards_lib.rs @@ -303,9 +303,9 @@ pub async fn collect_rewards( let address_hash = compute_address_hash(&wormhole_address_bytes); let prefix = get_hash_prefix(&address_hash, 8); - let params = TransferQueryParams::new().with_limit(1000); + let params = TransferQueryParams::new(); let transfers = subsquid_client - .query_transfers_by_prefix(Some(vec![prefix]), None, params) + .query_all_transfers_by_prefix(Some(vec![prefix]), None, params) .await?; // Filter to only transfers TO our wormhole address @@ -599,9 +599,9 @@ pub async fn query_pending_transfers( let address_hash = compute_address_hash(&wormhole_secret.address); let prefix = get_hash_prefix(&address_hash, 8); // 8 hex chars for good privacy - let params = TransferQueryParams::new().with_limit(1000); + let params = TransferQueryParams::new(); let transfers = subsquid_client - .query_transfers_by_prefix(Some(vec![prefix]), None, params) + .query_all_transfers_by_prefix(Some(vec![prefix]), None, params) .await?; // Filter to only transfers TO our wormhole address @@ -652,9 +652,9 @@ pub async fn query_pending_transfers_for_address( let address_hash = compute_address_hash(wormhole_address_bytes); let prefix = get_hash_prefix(&address_hash, 8); - let params = TransferQueryParams::new().with_limit(1000); + let params = TransferQueryParams::new(); let transfers = subsquid_client - .query_transfers_by_prefix(Some(vec![prefix]), None, params) + .query_all_transfers_by_prefix(Some(vec![prefix]), None, params) .await?; // Filter to only transfers TO our wormhole address diff --git a/src/subsquid/client.rs b/src/subsquid/client.rs index 614c1af..45b9b2f 100644 --- a/src/subsquid/client.rs +++ b/src/subsquid/client.rs @@ -166,6 +166,67 @@ impl SubsquidClient { Ok(data.transfers_by_hash_prefix.transfers) } + /// Fetch every transfer matching the given prefixes, paginating by block range. + /// + /// The server caps any single query at 1000 results and rejects larger result sets + /// with a "Query returned N results, which exceeds the limit of 1000" error. This + /// method handles that by binary-splitting the `[after_block, before_block]` range + /// whenever the cap is hit, then concatenating results. + /// + /// `base_params.after_block` / `base_params.before_block` are honored as the initial + /// bounds; unset means `0` / `i32::MAX` (GraphQL `Int` is signed 32-bit so we can't + /// exceed that). Other filters (amount, offset) are forwarded unchanged. `limit` is + /// always set to the server max (1000) per sub-query. + pub async fn query_all_transfers_by_prefix( + &self, + to_prefixes: Option>, + from_prefixes: Option>, + base_params: TransferQueryParams, + ) -> Result> { + const SERVER_MAX_LIMIT: u32 = 1000; + const LIMIT_EXCEEDED_MARKER: &str = "exceeds the limit"; + const MAX_BLOCK_SENTINEL: u32 = i32::MAX as u32; + + let initial_lo = base_params.after_block.unwrap_or(0); + let initial_hi = base_params.before_block.unwrap_or(MAX_BLOCK_SENTINEL); + + if initial_lo > initial_hi { + return Ok(vec![]); + } + + let mut all: Vec = Vec::new(); + let mut stack: Vec<(u32, u32)> = vec![(initial_lo, initial_hi)]; + + while let Some((lo, hi)) = stack.pop() { + let params = base_params + .clone() + .with_after_block(lo) + .with_before_block(hi) + .with_limit(SERVER_MAX_LIMIT); + + match self + .query_transfers_by_prefix(to_prefixes.clone(), from_prefixes.clone(), params) + .await + { + Ok(transfers) => all.extend(transfers), + Err(e) if e.to_string().contains(LIMIT_EXCEEDED_MARKER) => { + if lo == hi { + return Err(QuantusError::Generic(format!( + "More than {} transfers in single block {}: {}", + SERVER_MAX_LIMIT, lo, e + ))); + } + let mid = lo + (hi - lo) / 2; + stack.push((mid + 1, hi)); + stack.push((lo, mid)); + }, + Err(e) => return Err(e), + } + } + + Ok(all) + } + /// Query transfers for a set of addresses using privacy-preserving hash prefixes. /// /// This is a convenience method that: @@ -376,4 +437,14 @@ mod tests { assert_eq!(params.after_block, Some(1000)); assert_eq!(params.before_block, Some(2000)); } + + // Guards the substring the paginator matches on. If the server ever changes this + // wording, `query_all_transfers_by_prefix` will stop triggering binary-split and + // this test will fail loudly. + #[test] + fn test_server_limit_error_marker() { + let server_message = "Query returned 1234 results, which exceeds the limit of 1000. \ + Please use longer hash prefixes for more specific queries."; + assert!(server_message.contains("exceeds the limit")); + } }