From 63d9c122243ea67ff6674f8194b9f16c0194086d Mon Sep 17 00:00:00 2001 From: Koen Date: Mon, 9 Mar 2026 16:37:24 +0200 Subject: [PATCH] :zap: Pull data in parallel --- circulating-supply.py | 122 +++++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 50 deletions(-) diff --git a/circulating-supply.py b/circulating-supply.py index 76fd307..99ad791 100644 --- a/circulating-supply.py +++ b/circulating-supply.py @@ -24,6 +24,7 @@ import os import sys import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone try: @@ -156,13 +157,17 @@ def multicall(calls: list[tuple[str, bytes]]) -> list[tuple[bool, bytes]]: def multicall_chunked(calls, chunk_size=1000): - """Execute calls in chunks to stay within gas limits.""" + """Execute calls in chunks to stay within gas limits, chunks run in parallel.""" if len(calls) == 0: return [] + chunks = [calls[i : i + chunk_size] for i in range(0, len(calls), chunk_size)] + if len(chunks) == 1: + return multicall(chunks[0]) + with ThreadPoolExecutor(max_workers=len(chunks)) as pool: + chunk_results = list(pool.map(multicall, chunks)) results = [] - for i in range(0, len(calls), chunk_size): - chunk = calls[i : i + chunk_size] - results.extend(multicall(chunk)) + for cr in chunk_results: + results.extend(cr) return results @@ -195,9 +200,17 @@ def discover_contract_addresses(): print(f" Current Rollup: {current_rollup}") print(f" Current RewardDist: {current_reward_dist}") - # Fetch historical rollups from CanonicalRollupUpdated events - print("\n Fetching historical rollups from CanonicalRollupUpdated events...") - rollup_logs = get_logs_safe(REGISTRY, [TOPIC_CANONICAL_ROLLUP_UPDATED], DEPLOYMENT_BLOCKS["REGISTRY"]) + # Fetch historical contracts from Registry events (in parallel) + print("\n Fetching historical contracts from Registry events...") + reg_start = DEPLOYMENT_BLOCKS["REGISTRY"] + with ThreadPoolExecutor(max_workers=3) as pool: + fut_rollup = pool.submit(get_logs_safe, REGISTRY, [TOPIC_CANONICAL_ROLLUP_UPDATED], reg_start) + fut_ownership = pool.submit(get_logs_safe, REGISTRY, [TOPIC_OWNERSHIP_TRANSFERRED], reg_start) + fut_reward = pool.submit(get_logs_safe, REGISTRY, [TOPIC_REWARD_DISTRIBUTOR_UPDATED], reg_start) + rollup_logs = fut_rollup.result() + ownership_logs = fut_ownership.result() + reward_logs = fut_reward.result() + all_rollups = [] for log in rollup_logs: if len(log["topics"]) > 1: @@ -208,9 +221,6 @@ def discover_contract_addresses(): all_rollups = [current_rollup] print(f" Found {len(all_rollups)} rollup(s): {all_rollups}") - # Fetch historical Governance from OwnershipTransferred events - print("\n Fetching historical Governance from OwnershipTransferred events...") - ownership_logs = get_logs_safe(REGISTRY, [TOPIC_OWNERSHIP_TRANSFERRED], DEPLOYMENT_BLOCKS["REGISTRY"]) all_governance = [] for log in ownership_logs: if len(log["topics"]) > 2: @@ -221,9 +231,6 @@ def discover_contract_addresses(): all_governance.append(current_governance) print(f" Found {len(all_governance)} Governance instance(s): {all_governance}") - # Fetch historical RewardDistributor from RewardDistributorUpdated events - print("\n Fetching historical RewardDistributor from events...") - reward_logs = get_logs_safe(REGISTRY, [TOPIC_REWARD_DISTRIBUTOR_UPDATED], DEPLOYMENT_BLOCKS["REGISTRY"]) all_reward_dists = [] for log in reward_logs: if len(log["topics"]) > 1: @@ -304,23 +311,27 @@ def get_logs_safe(address, topics, from_block=None): def fetch_atps(): """Fetch all ATP addresses from ATPCreated events across all factories.""" - atps = [] - for factory in FACTORIES: + def _fetch_factory(factory): logs = get_logs_safe(factory, [TOPIC_ATP_CREATED], DEPLOYMENT_BLOCKS["FACTORIES"]) - for log in logs: - atps.append( - { - "address": to_checksum_cached( - "0x" + log["topics"][2].hex()[-40:] - ), - "beneficiary": to_checksum_cached( - "0x" + log["topics"][1].hex()[-40:] - ), - "allocation": decode(["uint256"], bytes(log["data"]))[0], - "factory": factory, - } - ) - print(f" {factory}: {len(logs)} ATPs") + return factory, logs + + atps = [] + with ThreadPoolExecutor(max_workers=len(FACTORIES)) as pool: + for factory, logs in pool.map(_fetch_factory, FACTORIES): + for log in logs: + atps.append( + { + "address": to_checksum_cached( + "0x" + log["topics"][2].hex()[-40:] + ), + "beneficiary": to_checksum_cached( + "0x" + log["topics"][1].hex()[-40:] + ), + "allocation": decode(["uint256"], bytes(log["data"]))[0], + "factory": factory, + } + ) + print(f" {factory}: {len(logs)} ATPs") return atps @@ -543,14 +554,19 @@ def _bool(i): # and query WITHDRAWAL_TIMESTAMP from each withdrawal-capable implementation print(f" Found {len(impl_to_regs)} unique staker implementation(s), checking bytecode...") withdrawal_capable_impls = {} # impl_addr -> set of registries - for impl_addr, regs in impl_to_regs.items(): - code = retry(lambda a=impl_addr: w3.eth.get_code(to_checksum_cached(a))) - if SEL_WITHDRAW_ALL_TO_BENEFICIARY in bytes(code): - print(f" {impl_addr} has withdrawAllTokensToBeneficiary") - withdrawal_capable_impls[impl_addr] = regs - for f, r in factory_registries.items(): - if r in regs: - withdrawal_capable_factories.add(f.lower()) + + def _get_code(impl_addr): + return impl_addr, retry(lambda a=impl_addr: w3.eth.get_code(to_checksum_cached(a))) + + with ThreadPoolExecutor(max_workers=len(impl_to_regs) or 1) as pool: + for impl_addr, code in pool.map(_get_code, impl_to_regs.keys()): + regs = impl_to_regs[impl_addr] + if SEL_WITHDRAW_ALL_TO_BENEFICIARY in bytes(code): + print(f" {impl_addr} has withdrawAllTokensToBeneficiary") + withdrawal_capable_impls[impl_addr] = regs + for f, r in factory_registries.items(): + if r in regs: + withdrawal_capable_factories.add(f.lower()) if withdrawal_capable_factories: print(f" {len(withdrawal_capable_factories)} factory(ies) have withdrawal-capable stakers") @@ -604,15 +620,19 @@ def _bool(i): # Follow-up: query Slashed events from all historical rollup contracts # When slashing occurs, the slashed amount stays in the rollup contract permanently print(f"\n Querying Slashed events from {len(all_rollups)} rollup(s)...") - total_slashed_funds = 0 - for rollup_addr in all_rollups: + + def _fetch_slashed(rollup_addr): logs = get_logs_safe(rollup_addr, [TOPIC_SLASHED], DEPLOYMENT_BLOCKS["REGISTRY"]) - for log in logs: - # Slashed(address attester, uint256 amount) - # amount is in the data field - if len(log["data"]) >= 32: - amount = decode(["uint256"], bytes(log["data"]))[0] - total_slashed_funds += amount + return sum( + decode(["uint256"], bytes(log["data"]))[0] + for log in logs + if len(log["data"]) >= 32 + ) + + total_slashed_funds = 0 + with ThreadPoolExecutor(max_workers=len(all_rollups) or 1) as pool: + for amount in pool.map(_fetch_slashed, all_rollups): + total_slashed_funds += amount if total_slashed_funds > 0: print(f" Total slashed: {fmt(total_slashed_funds)} AZTEC from {len(all_rollups)} rollup(s)") @@ -1254,13 +1274,15 @@ def display(atps, data): def main(): - # Discover all contract addresses from Registry - contract_addrs = discover_contract_addresses() - + # discover_contract_addresses and fetch_atps are independent — run in parallel print("\n" + "=" * 70) - print(" FETCHING ATP CREATION EVENTS") + print(" DISCOVERING CONTRACTS + FETCHING ATP EVENTS (parallel)") print("=" * 70) - atps = fetch_atps() + with ThreadPoolExecutor(max_workers=2) as pool: + fut_addrs = pool.submit(discover_contract_addresses) + fut_atps = pool.submit(fetch_atps) + contract_addrs = fut_addrs.result() + atps = fut_atps.result() print(f" Found {len(atps)} ATPs total") print("\n" + "=" * 70)