Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/collect_rewards_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ pub async fn collect_rewards<P: ProgressCallback>(
let address_hash = compute_address_hash(&wormhole_address_bytes);
let prefix = get_hash_prefix(&address_hash, 8);

let params = TransferQueryParams::new().with_limit(1000);
let params = TransferQueryParams::new();
let transfers = subsquid_client
.query_transfers_by_prefix(Some(vec![prefix]), None, params)
.query_all_transfers_by_prefix(Some(vec![prefix]), None, params)
.await?;

// Filter to only transfers TO our wormhole address
Expand Down Expand Up @@ -599,9 +599,9 @@ pub async fn query_pending_transfers(
let address_hash = compute_address_hash(&wormhole_secret.address);
let prefix = get_hash_prefix(&address_hash, 8); // 8 hex chars for good privacy

let params = TransferQueryParams::new().with_limit(1000);
let params = TransferQueryParams::new();
let transfers = subsquid_client
.query_transfers_by_prefix(Some(vec![prefix]), None, params)
.query_all_transfers_by_prefix(Some(vec![prefix]), None, params)
.await?;

// Filter to only transfers TO our wormhole address
Expand Down Expand Up @@ -652,9 +652,9 @@ pub async fn query_pending_transfers_for_address(
let address_hash = compute_address_hash(wormhole_address_bytes);
let prefix = get_hash_prefix(&address_hash, 8);

let params = TransferQueryParams::new().with_limit(1000);
let params = TransferQueryParams::new();
let transfers = subsquid_client
.query_transfers_by_prefix(Some(vec![prefix]), None, params)
.query_all_transfers_by_prefix(Some(vec![prefix]), None, params)
.await?;

// Filter to only transfers TO our wormhole address
Expand Down
71 changes: 71 additions & 0 deletions src/subsquid/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,67 @@ impl SubsquidClient {
Ok(data.transfers_by_hash_prefix.transfers)
}

/// Fetch every transfer matching the given prefixes, paginating by block range.
///
/// The server caps any single query at 1000 results and rejects larger result sets
/// with a "Query returned N results, which exceeds the limit of 1000" error. This
/// method handles that by binary-splitting the `[after_block, before_block]` range
/// whenever the cap is hit, then concatenating results.
///
/// `base_params.after_block` / `base_params.before_block` are honored as the initial
/// bounds; unset means `0` / `i32::MAX` (GraphQL `Int` is signed 32-bit so we can't
/// exceed that). Other filters (amount, offset) are forwarded unchanged. `limit` is
/// always set to the server max (1000) per sub-query.
pub async fn query_all_transfers_by_prefix(
&self,
to_prefixes: Option<Vec<String>>,
from_prefixes: Option<Vec<String>>,
base_params: TransferQueryParams,
) -> Result<Vec<Transfer>> {
const SERVER_MAX_LIMIT: u32 = 1000;
const LIMIT_EXCEEDED_MARKER: &str = "exceeds the limit";
const MAX_BLOCK_SENTINEL: u32 = i32::MAX as u32;

let initial_lo = base_params.after_block.unwrap_or(0);
let initial_hi = base_params.before_block.unwrap_or(MAX_BLOCK_SENTINEL);

if initial_lo > initial_hi {
return Ok(vec![]);
}

let mut all: Vec<Transfer> = Vec::new();
let mut stack: Vec<(u32, u32)> = vec![(initial_lo, initial_hi)];

while let Some((lo, hi)) = stack.pop() {
let params = base_params
.clone()
.with_after_block(lo)
.with_before_block(hi)
.with_limit(SERVER_MAX_LIMIT);

match self
.query_transfers_by_prefix(to_prefixes.clone(), from_prefixes.clone(), params)
.await
{
Ok(transfers) => all.extend(transfers),
Err(e) if e.to_string().contains(LIMIT_EXCEEDED_MARKER) => {
if lo == hi {
return Err(QuantusError::Generic(format!(
"More than {} transfers in single block {}: {}",
SERVER_MAX_LIMIT, lo, e
)));
}
let mid = lo + (hi - lo) / 2;
stack.push((mid + 1, hi));
stack.push((lo, mid));
},
Err(e) => return Err(e),
}
}

Ok(all)
}

/// Query transfers for a set of addresses using privacy-preserving hash prefixes.
///
/// This is a convenience method that:
Expand Down Expand Up @@ -376,4 +437,14 @@ mod tests {
assert_eq!(params.after_block, Some(1000));
assert_eq!(params.before_block, Some(2000));
}

// Guards the substring the paginator matches on. If the server ever changes this
// wording, `query_all_transfers_by_prefix` will stop triggering binary-split and
// this test will fail loudly.
#[test]
fn test_server_limit_error_marker() {
let server_message = "Query returned 1234 results, which exceeds the limit of 1000. \
Please use longer hash prefixes for more specific queries.";
assert!(server_message.contains("exceeds the limit"));
}
}
Loading