To build a robust hedging service in the SYMMIO ecosystem, it's crucial for hedgers (PartyB) to monitor blockchain events triggered by PartyA's intents. This section shows a basic implementation of an event listener using ethers.js or web3.py.
Web3EventPoller
The Web3EventPoller class is a high-level utility designed to:
Connect to the Blockchain: Establish a connection to the blockchain network using web3.py.
Load Contract and ABI: Use the contract's address and ABI to interact with its events.
Monitor Events: Listen for specific on-chain events such as SendQuote or ForceCancelQuote.
Maintain State: Use an EventPoller instance to manage event polling, handle retries, and ensure no events are missed.
from web3 import Web3from web3.middleware import geth_poa_middlewareimport loggingfrom settings import RPC, ContractAddress, PollInterval, FromBlockfrom event_poller import EventPollerfrom handlers.send_request_handler import SendQuoteHandlerclassWeb3EventPoller:@staticmethoddefrun():# 1. Initialize Web3 and contract w3_instance =Web3(Web3.HTTPProvider(RPC)) w3_instance.middleware_onion.inject(geth_poa_middleware, layer=0) contract_abi = [...] # Loaded from a JSON file contract = w3_instance.eth.contract(address=ContractAddress, abi=contract_abi)# 2. Create a poller instance poller =EventPoller( context=..., from_block=FromBlock, poll_interval=PollInterval )# 3. Register event + handler poller.add_event("SendQuote", SendQuoteHandler().handle)# ... add other events as needed ...# 4. Start the poller poller.start() poller.wait()
Event Poller Internals
EventPoller: A utility script that tracks from_block progress (often persisted in something like Redis) and polls in intervals (poll_interval).
classEventPoller:def__init__(self,context,from_block=0,poll_interval=10,block_chunk_size=10000):# 1. Initialize context, default block, and interval settings.# 2. Create data structures to track events and their states.# 3. Set up a thread pool executor for concurrent event polling. self.context = context self.tracked_events ={} self.poll_interval = poll_interval self.block_chunk_size = block_chunk_size self.stop_signal =False self.default_from_block = from_block self.futures ={} self.event_from_blocks ={} self.executor =ThreadPoolExecutor()defadd_event(self,event_name,callback=None):# 1. Register the event name and callback in `tracked_events`.# 2. Check Redis for the last processed block for the event.# 3. Set `from_block` for the event based on Redis or the default value. self.tracked_events[event_name]= callback stored_from_block = REDIS_SERVER.get("event_poller_from_block_"+ event_name) self.event_from_blocks[event_name]=int( stored_from_block)if stored_from_block isnotNoneelse self.default_from_blockdefremove_event(self,event_name):# 1. Remove the event and its block tracking state.# 2. Stop the associated polling thread if it's running.if event_name in self.tracked_events:del self.tracked_events[event_name]del self.event_from_blocks[event_name] self.stop_event_thread(event_name)def_fetch_event(self,event_name,callback,from_block,to_block):# 1. Retrieve the event object from the contract.# 2. Create a filter for logs between `from_block` and `to_block`.# 3. Fetch all logs and update the last processed block in Redis.# 4. Invoke the callback for each log if provided. event =getattr(self.context.contract.events, event_name) event_filter = event.create_filter(fromBlock=from_block, toBlock=to_block) events = event_filter.get_all_entries() REDIS_SERVER.set_permanent("event_poller_from_block_"+ event_name, to_block) self.event_from_blocks[event_name]=int(to_block)+1if callback:for ev in events:callback(self.context, ev)return eventsdef_run_event(self,event_name,callback):# 1. Poll the blockchain to get the current block number.# 2. Fetch logs for the event in chunks until the current block.# 3. Invoke the callback for each log and sleep for `poll_interval`.whilenot self.stop_signal: current_block = self.context.provider.eth.block_number from_block = self.event_from_blocks[event_name]while from_block < current_block: end_block =min(from_block + self.block_chunk_size -1, current_block) self._fetch_event(event_name, callback, from_block, end_block) from_block = end_block +1 time.sleep(self.poll_interval)defstart(self):# 1. Spawn a thread for each registered event using the executor.# 2. Each thread invokes `_run_event` to monitor and process logs.for event_name, callback in self.tracked_events.items(): future = self.executor.submit(self._run_event, event_name, callback) self.futures[event_name]= futuredefwait(self):# 1. Block until all threads complete their tasks.# 2. Ensures the application does not exit prematurely.for future in self.futures.values(): future.result()defstop(self):# 1. Set `stop_signal` to True to gracefully terminate polling.# 2. Used to shut down the poller when the application exits. self.stop_signal =Truedefstop_event_thread(self,event_name):# 1. Cancel the thread associated with the specified event.# 2. Remove the thread from the `futures` dictionary.if event_name in self.futures: self.futures[event_name].cancel()del self.futures[event_name]
Send Quote Handler
The SendQuoteHandler processes SendQuote events from PartyA. It should validate the event data (e.g., whitelist checks, quote status) and construct and stores a structured payload in Redis for further use by the system.
import datetimeimport loggingimport simplejson as jsonfrom settings import HedgerAddressfrom share import REDIS_SERVER_EVENTSclassSendQuoteHandler:defhandle(self,context,args):# 1. Extract event data and block information event_values = args.get('args') block = context.provider.eth.get_block(args.get("blockNumber"))# 2. Fetch additional data for the symbol and quote symbol = context.symbol_fm.map_fields( context.view_provider.contract.functions.getSymbol(event_values.get("symbolId")).call() ) quote = context.quote_fm.map_fields( context.view_provider.contract.functions.getQuote(event_values.get("quoteId")).call() )# 3. Check if the quote has already been processed quote_status = quote.get("quoteStatus")if quote_status ==3:return# 4. Validate if the hedger is in the whitelist for this quote whitelist = [context.provider.to_checksum_address(addr)for addr in event_values.get("partyBsWhiteList")]iflen(whitelist)>0and HedgerAddress notin whitelist:return# 5. Construct a payload with relevant event and blockchain data value ={"blockNumber": args.get("blockNumber"),"cva": event_values.get("cva"),"lf": event_values.get("lf"),"partyAmm": event_values.get("partyAmm"),"partyBmm": event_values.get("partyBmm"),"quantity": event_values.get("quantity"),"marketPrice": event_values.get("marketPrice"),"requestedOpenPrice": event_values.get("price"),"quoteStatus": quote.get("quoteStatus"),"quoteId": event_values.get("quoteId"),"symbolId": event_values.get("symbolId"),"symbol": symbol.get("name"),"action":"SendQuote","partyA": context.provider.to_checksum_address(event_values.get("partyA")),"positionType": event_values.get("positionType"),"orderTypeOpen": event_values.get("orderType"),"maxFundingRate": event_values.get("maxFundingRate"),"deadline": event_values.get("deadline"),"timeStamp": block.get("timestamp"),"timestampsSendQuoteTimeStamp": block.get("timestamp"),"trHash": args.get("transactionHash").hex(),"received_timestamp": datetime.datetime.utcnow().timestamp()//1000,"input_source":'py_event_listener',"type":'quote'}# 6. Generate a unique key for the Redis store key =f"{event_values.get('quoteId')}-{args.get('transactionHash').hex()}-{block.get('timestamp')}"# 7. Log the event for debugging purposes logging.info("SendQuote Received: "+ key)# 8. Store the event data in Redis if the key does not already exist REDIS_SERVER_EVENTS.setnx(key, json.dumps(value))
Checks the status of the quote. If it has already been processed (status 3), the handler exits early.
Validate PartyB Whitelist
whitelist = [context.provider.to_checksum_address(addr)for addr in event_values.get("partyBsWhiteList")]iflen(whitelist)>0and HedgerAddress notin whitelist:return
Confirms that the current hedger (PartyB) is authorized to act on this quote. If a whitelist exists and the hedger is not included, the handler skips processing.
Constructing the Payload
value ={"blockNumber": args.get("blockNumber"), ..."type":'quote'}
Creates a structured dictionary containing all relevant event and blockchain data. This includes trade details (e.g., quoteId, price), contextual metadata (e.g., blockNumber), and hedger-specific information (e.g., partyBmm).