To build a robust hedging service in the SYMMIO ecosystem, it's crucial for hedgers (PartyB) to monitor blockchain events triggered by PartyA's intents. This section shows a basic implementation of an event listener using ethers.js or web3.py.
Web3EventPoller
The Web3EventPoller class is a high-level utility designed to:
Connect to the Blockchain: Establish a connection to the blockchain network using web3.py.
Load Contract and ABI: Use the contract's address and ABI to interact with its events.
Monitor Events: Listen for specific on-chain events such as SendQuote or ForceCancelQuote.
Maintain State: Use an EventPoller instance to manage event polling, handle retries, and ensure no events are missed.
from web3 import Web3
from web3.middleware import geth_poa_middleware
import logging
from settings import RPC, ContractAddress, PollInterval, FromBlock
from event_poller import EventPoller
from handlers.send_request_handler import SendQuoteHandler
class Web3EventPoller:
@staticmethod
def run():
# 1. Initialize Web3 and contract
w3_instance = Web3(Web3.HTTPProvider(RPC))
w3_instance.middleware_onion.inject(geth_poa_middleware, layer=0)
contract_abi = [...] # Loaded from a JSON file
contract = w3_instance.eth.contract(address=ContractAddress, abi=contract_abi)
# 2. Create a poller instance
poller = EventPoller(
context=...,
from_block=FromBlock,
poll_interval=PollInterval
)
# 3. Register event + handler
poller.add_event("SendQuote", SendQuoteHandler().handle)
# ... add other events as needed ...
# 4. Start the poller
poller.start()
poller.wait()
Event Poller Internals
EventPoller: A utility script that tracks from_block progress (often persisted in something like Redis) and polls in intervals (poll_interval).
class EventPoller:
def __init__(self, context, from_block=0, poll_interval=10, block_chunk_size=10000):
# 1. Initialize context, default block, and interval settings.
# 2. Create data structures to track events and their states.
# 3. Set up a thread pool executor for concurrent event polling.
self.context = context
self.tracked_events = {}
self.poll_interval = poll_interval
self.block_chunk_size = block_chunk_size
self.stop_signal = False
self.default_from_block = from_block
self.futures = {}
self.event_from_blocks = {}
self.executor = ThreadPoolExecutor()
def add_event(self, event_name, callback=None):
# 1. Register the event name and callback in `tracked_events`.
# 2. Check Redis for the last processed block for the event.
# 3. Set `from_block` for the event based on Redis or the default value.
self.tracked_events[event_name] = callback
stored_from_block = REDIS_SERVER.get("event_poller_from_block_" + event_name)
self.event_from_blocks[event_name] = int(
stored_from_block) if stored_from_block is not None else self.default_from_block
def remove_event(self, event_name):
# 1. Remove the event and its block tracking state.
# 2. Stop the associated polling thread if it's running.
if event_name in self.tracked_events:
del self.tracked_events[event_name]
del self.event_from_blocks[event_name]
self.stop_event_thread(event_name)
def _fetch_event(self, event_name, callback, from_block, to_block):
# 1. Retrieve the event object from the contract.
# 2. Create a filter for logs between `from_block` and `to_block`.
# 3. Fetch all logs and update the last processed block in Redis.
# 4. Invoke the callback for each log if provided.
event = getattr(self.context.contract.events, event_name)
event_filter = event.create_filter(fromBlock=from_block, toBlock=to_block)
events = event_filter.get_all_entries()
REDIS_SERVER.set_permanent("event_poller_from_block_" + event_name, to_block)
self.event_from_blocks[event_name] = int(to_block) + 1
if callback:
for ev in events:
callback(self.context, ev)
return events
def _run_event(self, event_name, callback):
# 1. Poll the blockchain to get the current block number.
# 2. Fetch logs for the event in chunks until the current block.
# 3. Invoke the callback for each log and sleep for `poll_interval`.
while not self.stop_signal:
current_block = self.context.provider.eth.block_number
from_block = self.event_from_blocks[event_name]
while from_block < current_block:
end_block = min(from_block + self.block_chunk_size - 1, current_block)
self._fetch_event(event_name, callback, from_block, end_block)
from_block = end_block + 1
time.sleep(self.poll_interval)
def start(self):
# 1. Spawn a thread for each registered event using the executor.
# 2. Each thread invokes `_run_event` to monitor and process logs.
for event_name, callback in self.tracked_events.items():
future = self.executor.submit(self._run_event, event_name, callback)
self.futures[event_name] = future
def wait(self):
# 1. Block until all threads complete their tasks.
# 2. Ensures the application does not exit prematurely.
for future in self.futures.values():
future.result()
def stop(self):
# 1. Set `stop_signal` to True to gracefully terminate polling.
# 2. Used to shut down the poller when the application exits.
self.stop_signal = True
def stop_event_thread(self, event_name):
# 1. Cancel the thread associated with the specified event.
# 2. Remove the thread from the `futures` dictionary.
if event_name in self.futures:
self.futures[event_name].cancel()
del self.futures[event_name]
Send Quote Handler
The SendQuoteHandler processes SendQuote events from PartyA. It should validate the event data (e.g., whitelist checks, quote status) and construct and stores a structured payload in Redis for further use by the system.
import datetime
import logging
import simplejson as json
from settings import HedgerAddress
from share import REDIS_SERVER_EVENTS
class SendQuoteHandler:
def handle(self, context, args):
# 1. Extract event data and block information
event_values = args.get('args')
block = context.provider.eth.get_block(args.get("blockNumber"))
# 2. Fetch additional data for the symbol and quote
symbol = context.symbol_fm.map_fields(
context.view_provider.contract.functions.getSymbol(event_values.get("symbolId")).call()
)
quote = context.quote_fm.map_fields(
context.view_provider.contract.functions.getQuote(event_values.get("quoteId")).call()
)
# 3. Check if the quote has already been processed
quote_status = quote.get("quoteStatus")
if quote_status == 3:
return
# 4. Validate if the hedger is in the whitelist for this quote
whitelist = [context.provider.to_checksum_address(addr) for addr in event_values.get("partyBsWhiteList")]
if len(whitelist) > 0 and HedgerAddress not in whitelist:
return
# 5. Construct a payload with relevant event and blockchain data
value = {
"blockNumber": args.get("blockNumber"),
"cva": event_values.get("cva"),
"lf": event_values.get("lf"),
"partyAmm": event_values.get("partyAmm"),
"partyBmm": event_values.get("partyBmm"),
"quantity": event_values.get("quantity"),
"marketPrice": event_values.get("marketPrice"),
"requestedOpenPrice": event_values.get("price"),
"quoteStatus": quote.get("quoteStatus"),
"quoteId": event_values.get("quoteId"),
"symbolId": event_values.get("symbolId"),
"symbol": symbol.get("name"),
"action": "SendQuote",
"partyA": context.provider.to_checksum_address(event_values.get("partyA")),
"positionType": event_values.get("positionType"),
"orderTypeOpen": event_values.get("orderType"),
"maxFundingRate": event_values.get("maxFundingRate"),
"deadline": event_values.get("deadline"),
"timeStamp": block.get("timestamp"),
"timestampsSendQuoteTimeStamp": block.get("timestamp"),
"trHash": args.get("transactionHash").hex(),
"received_timestamp": datetime.datetime.utcnow().timestamp() // 1000,
"input_source": 'py_event_listener',
"type": 'quote'
}
# 6. Generate a unique key for the Redis store
key = f"{event_values.get('quoteId')}-{args.get('transactionHash').hex()}-{block.get('timestamp')}"
# 7. Log the event for debugging purposes
logging.info("SendQuote Received: " + key)
# 8. Store the event data in Redis if the key does not already exist
REDIS_SERVER_EVENTS.setnx(key, json.dumps(value))
Retrieves the event arguments (args) and the blockchain block details (block).
Fetch Additional Data for the Symbol and Quote
symbol = context.symbol_fm.map_fields(
context.view_provider.contract.functions.getSymbol(event_values.get("symbolId")).call()
)
quote = context.quote_fm.map_fields(
context.view_provider.contract.functions.getQuote(event_values.get("quoteId")).call()
)
Fetches detailed information about the symbol and quote using their IDs.
Maps the raw blockchain data into structured fields for easier access.
Check If the Quote Is Already Processed
quote_status = quote.get("quoteStatus")
if quote_status == 3:
return
Checks the status of the quote. If it has already been processed (status 3), the handler exits early.
Validate PartyB Whitelist
whitelist = [context.provider.to_checksum_address(addr) for addr in event_values.get("partyBsWhiteList")]
if len(whitelist) > 0 and HedgerAddress not in whitelist:
return
Confirms that the current hedger (PartyB) is authorized to act on this quote. If a whitelist exists and the hedger is not included, the handler skips processing.
Constructing the Payload
value = {
"blockNumber": args.get("blockNumber"),
...
"type": 'quote'
}
Creates a structured dictionary containing all relevant event and blockchain data. This includes trade details (e.g., quoteId, price), contextual metadata (e.g., blockNumber), and hedger-specific information (e.g., partyBmm).