2. Seeing the Intent

To build a robust hedging service in the SYMMIO ecosystem, it's crucial for hedgers (PartyB) to monitor blockchain events triggered by PartyA's intents. This section shows a basic implementation of an event listener using ethers.js or web3.py.

Different types of communication of hedger (PartyB) and PartyA

Web3EventPoller

The Web3EventPoller class is a high-level utility designed to:

  1. Connect to the Blockchain: Establish a connection to the blockchain network using web3.py.

  2. Load Contract and ABI: Use the contract's address and ABI to interact with its events.

  3. Monitor Events: Listen for specific on-chain events such as SendQuote or ForceCancelQuote.

  4. Handle Events: Delegate event handling to predefined handler classes (e.g., SendQuoteHandler).

  5. Maintain State: Use an EventPoller instance to manage event polling, handle retries, and ensure no events are missed.

from web3 import Web3
from web3.middleware import geth_poa_middleware
import logging
from settings import RPC, ContractAddress, PollInterval, FromBlock
from event_poller import EventPoller
from handlers.send_request_handler import SendQuoteHandler

class Web3EventPoller:
    @staticmethod
    def run():
        # 1. Initialize Web3 and contract
        w3_instance = Web3(Web3.HTTPProvider(RPC))
        w3_instance.middleware_onion.inject(geth_poa_middleware, layer=0)
        contract_abi = [...]  # Loaded from a JSON file
        contract = w3_instance.eth.contract(address=ContractAddress, abi=contract_abi)

        # 2. Create a poller instance
        poller = EventPoller(
            context=...,
            from_block=FromBlock,
            poll_interval=PollInterval
        )

        # 3. Register event + handler
        poller.add_event("SendQuote", SendQuoteHandler().handle)
        # ... add other events as needed ...

        # 4. Start the poller
        poller.start()
        poller.wait()

Event Poller Internals

EventPoller: A utility script that tracks from_block progress (often persisted in something like Redis) and polls in intervals (poll_interval).

class EventPoller:
    def __init__(self, context, from_block=0, poll_interval=10, block_chunk_size=10000):
        # 1. Initialize context, default block, and interval settings.
        # 2. Create data structures to track events and their states.
        # 3. Set up a thread pool executor for concurrent event polling.
        self.context = context
        self.tracked_events = {}
        self.poll_interval = poll_interval
        self.block_chunk_size = block_chunk_size
        self.stop_signal = False
        self.default_from_block = from_block
        self.futures = {}
        self.event_from_blocks = {}
        self.executor = ThreadPoolExecutor()

    def add_event(self, event_name, callback=None):
        # 1. Register the event name and callback in `tracked_events`.
        # 2. Check Redis for the last processed block for the event.
        # 3. Set `from_block` for the event based on Redis or the default value.
        self.tracked_events[event_name] = callback
        stored_from_block = REDIS_SERVER.get("event_poller_from_block_" + event_name)
        self.event_from_blocks[event_name] = int(
            stored_from_block) if stored_from_block is not None else self.default_from_block

    def remove_event(self, event_name):
        # 1. Remove the event and its block tracking state.
        # 2. Stop the associated polling thread if it's running.
        if event_name in self.tracked_events:
            del self.tracked_events[event_name]
            del self.event_from_blocks[event_name]
            self.stop_event_thread(event_name)

    def _fetch_event(self, event_name, callback, from_block, to_block):
        # 1. Retrieve the event object from the contract.
        # 2. Create a filter for logs between `from_block` and `to_block`.
        # 3. Fetch all logs and update the last processed block in Redis.
        # 4. Invoke the callback for each log if provided.
        event = getattr(self.context.contract.events, event_name)
        event_filter = event.create_filter(fromBlock=from_block, toBlock=to_block)
        events = event_filter.get_all_entries()
        REDIS_SERVER.set_permanent("event_poller_from_block_" + event_name, to_block)
        self.event_from_blocks[event_name] = int(to_block) + 1
        if callback:
            for ev in events:
                callback(self.context, ev)
        return events

    def _run_event(self, event_name, callback):
        # 1. Poll the blockchain to get the current block number.
        # 2. Fetch logs for the event in chunks until the current block.
        # 3. Invoke the callback for each log and sleep for `poll_interval`.
        while not self.stop_signal:
            current_block = self.context.provider.eth.block_number
            from_block = self.event_from_blocks[event_name]
            while from_block < current_block:
                end_block = min(from_block + self.block_chunk_size - 1, current_block)
                self._fetch_event(event_name, callback, from_block, end_block)
                from_block = end_block + 1
            time.sleep(self.poll_interval)

    def start(self):
        # 1. Spawn a thread for each registered event using the executor.
        # 2. Each thread invokes `_run_event` to monitor and process logs.
        for event_name, callback in self.tracked_events.items():
            future = self.executor.submit(self._run_event, event_name, callback)
            self.futures[event_name] = future

    def wait(self):
        # 1. Block until all threads complete their tasks.
        # 2. Ensures the application does not exit prematurely.
        for future in self.futures.values():
            future.result()

    def stop(self):
        # 1. Set `stop_signal` to True to gracefully terminate polling.
        # 2. Used to shut down the poller when the application exits.
        self.stop_signal = True

    def stop_event_thread(self, event_name):
        # 1. Cancel the thread associated with the specified event.
        # 2. Remove the thread from the `futures` dictionary.
        if event_name in self.futures:
            self.futures[event_name].cancel()
            del self.futures[event_name]

Send Quote Handler

The SendQuoteHandler processes SendQuote events from PartyA. It should validate the event data (e.g., whitelist checks, quote status) and construct and stores a structured payload in Redis for further use by the system.

import datetime
import logging
import simplejson as json
from settings import HedgerAddress
from share import REDIS_SERVER_EVENTS


class SendQuoteHandler:

    def handle(self, context, args):
        # 1. Extract event data and block information
        event_values = args.get('args')
        block = context.provider.eth.get_block(args.get("blockNumber"))

        # 2. Fetch additional data for the symbol and quote
        symbol = context.symbol_fm.map_fields(
            context.view_provider.contract.functions.getSymbol(event_values.get("symbolId")).call()
        )
        quote = context.quote_fm.map_fields(
            context.view_provider.contract.functions.getQuote(event_values.get("quoteId")).call()
        )

        # 3. Check if the quote has already been processed
        quote_status = quote.get("quoteStatus")
        if quote_status == 3:
            return

        # 4. Validate if the hedger is in the whitelist for this quote
        whitelist = [context.provider.to_checksum_address(addr) for addr in event_values.get("partyBsWhiteList")]
        if len(whitelist) > 0 and HedgerAddress not in whitelist:
            return

        # 5. Construct a payload with relevant event and blockchain data
        value = {
            "blockNumber": args.get("blockNumber"),
            "cva": event_values.get("cva"),
            "lf": event_values.get("lf"),
            "partyAmm": event_values.get("partyAmm"),
            "partyBmm": event_values.get("partyBmm"),
            "quantity": event_values.get("quantity"),
            "marketPrice": event_values.get("marketPrice"),
            "requestedOpenPrice": event_values.get("price"),
            "quoteStatus": quote.get("quoteStatus"),
            "quoteId": event_values.get("quoteId"),
            "symbolId": event_values.get("symbolId"),
            "symbol": symbol.get("name"),
            "action": "SendQuote",
            "partyA": context.provider.to_checksum_address(event_values.get("partyA")),
            "positionType": event_values.get("positionType"),
            "orderTypeOpen": event_values.get("orderType"),
            "maxFundingRate": event_values.get("maxFundingRate"),
            "deadline": event_values.get("deadline"),
            "timeStamp": block.get("timestamp"),
            "timestampsSendQuoteTimeStamp": block.get("timestamp"),
            "trHash": args.get("transactionHash").hex(),
            "received_timestamp": datetime.datetime.utcnow().timestamp() // 1000,
            "input_source": 'py_event_listener',
            "type": 'quote'
        }

        # 6. Generate a unique key for the Redis store
        key = f"{event_values.get('quoteId')}-{args.get('transactionHash').hex()}-{block.get('timestamp')}"

        # 7. Log the event for debugging purposes
        logging.info("SendQuote Received: " + key)

        # 8. Store the event data in Redis if the key does not already exist
        REDIS_SERVER_EVENTS.setnx(key, json.dumps(value))

Extract Event Data and Block Information

event_values = args.get('args')
block = context.provider.eth.get_block(args.get("blockNumber"))
  • Retrieves the event arguments (args) and the blockchain block details (block).

Fetch Additional Data for the Symbol and Quote

symbol = context.symbol_fm.map_fields(
    context.view_provider.contract.functions.getSymbol(event_values.get("symbolId")).call()
)
quote = context.quote_fm.map_fields(
    context.view_provider.contract.functions.getQuote(event_values.get("quoteId")).call()
)
  • Fetches detailed information about the symbol and quote using their IDs.

  • Maps the raw blockchain data into structured fields for easier access.

Check If the Quote Is Already Processed

quote_status = quote.get("quoteStatus")
if quote_status == 3:
    return

Checks the status of the quote. If it has already been processed (status 3), the handler exits early.

Validate PartyB Whitelist

whitelist = [context.provider.to_checksum_address(addr) for addr in event_values.get("partyBsWhiteList")]
if len(whitelist) > 0 and HedgerAddress not in whitelist:
    return

Confirms that the current hedger (PartyB) is authorized to act on this quote. If a whitelist exists and the hedger is not included, the handler skips processing.

Constructing the Payload

value = {
    "blockNumber": args.get("blockNumber"),
    ...
    "type": 'quote'
}

Creates a structured dictionary containing all relevant event and blockchain data. This includes trade details (e.g., quoteId, price), contextual metadata (e.g., blockNumber), and hedger-specific information (e.g., partyBmm).

Generate a Unique Key for Redis

key = f"{event_values.get('quoteId')}-{args.get('transactionHash').hex()}-{block.get('timestamp')}"

Ensures that each event is uniquely identified to avoid duplication in the Redis store.

Logging the Event

logging.info("SendQuote Received: " + key)

Logs the event processing for debugging and traceability.

Store the Event Data in Redis

REDIS_SERVER_EVENTS.setnx(key, json.dumps(value))

Stores the payload in Redis using the unique key. The setnx method ensures that the event is only stored if the key does not already exist.

Last updated