Creating the APIs and Websockets

Quote Services, Hedger API, Hedger Websockets

As a Solver integrating with SYMMIO, you’ll typically want:

  • REST APIs – To allow frontends to display data like available symbols, retrieving open interest and notional caps.

  • WebSocket Endpoints – To push real-time data (funding rates, user position updates, etc.) to front-ends.

  • Quote Streaming – Streams price quotes to the front-end.

REST APIs

Detailed documentation about REST API endpoint queries and returned data can be found here. It's recommended that solvers follow the same structure in their endpoints to integrate with frontends more easily.

Websocket Endpoints

Detailed documentation about websockets queries and returned data can be found here. It's recommended that solvers follow the same structure in their websockets to integrate with frontends more easily.

/upnl-ws

Streams unrealized profit/loss (uPnL) data for a given user sub-account address. The client sends an address (string) to subscribe. The server should validates this address (whitelisting check) and then, in a loop, retrieves and sends the user’s uPnL every 2 seconds.

  async def send_upnl_data(websocket: WebSocket, address: str, q: asyncio.Queue):
        """
        Fetch user's uPnL and send it to the client.
        If there's new data in the queue (new address?), switch to that on next iteration.
        """
        if q.empty():
            content = UpnlCalculator.get_party_a_upnl(address) if address else {}
            await safe_send_data(websocket, content)
            await asyncio.sleep(2) 
            return address
        else:
            return await q.get()

The uPnL Calculator is designed to compute and store unrealized profit/loss (uPnL) for PartyA and PartyB accounts. It ensures that the system maintains a real-time view of user account health by leveraging database computations and caching these results in Redis for efficient access. Prices are retrieved using an external price-fetcher utility (e.g., Binance or other sources) and uPnL is calculated for each account.

/upnl-a

The /upnl-a route retrieves the precomputed uPnL value for a given PartyA account from Redis. The route queries Redis for the key corresponding to the given account:If the key exists, it returns the cached uPnL data as a JSON object.

@router.get('/upnl-a')
def get_party_a_upnl(account_identifier: SingleAccountIdentifier = Depends()) -> SingleUpnlResponse:
    # Convert account identifier to checksum format
    account_identifier = account_identifier.get_checksum_object()
    
    # Retrieve uPnL data from Redis
    res = REDIS_SERVER_PNL.get(account_identifier.get_upnl_key())
    
    # Return cached result or raise an error
    if res is not None:
        return JsonWrapper.loads(res)
    raise ErrorCodeResponse(ErrorInfoContainer.not_found_error, status_code=404)

uPnL Calculation Flow

  • The system iterates through all PartyA accounts stored in the database. Each account is identified using a SingleAccountIdentifier.

  • Current market prices for each symbol are fetched using the PriceFetcher utility

  • Open positions for each account are grouped by symbol. For each position, Quantities and prices are aggregated. Signed and total quantities are calculated based on position types (LONG/SHORT).

For each account:

  • Unrealized profit/loss is computed based on the difference between current market prices and position opening prices.

  • Notional values are calculated as the total exposure per account.

  • Liquidation checks are performed to determine account solvency.

Calculation Snippet

for symbol, (signed_quantity_price, signed_quantity, total_quantity, count) in symbols_quantity.items():
    price = PriceFetcher.get_price(symbol)['markPrice']
    notional += total_quantity * price
    upnl += signed_quantity * price - signed_quantity_price

/position-state-ws

This WebSocket allows clients (e.g., frontends or user dashboards) to subscribe to updates for specific addresses and stream real-time position states for those addresses as well as notify users about changes to their positions.

 async def initialize_listeners():
    async for data in websocket.iter_json():
        channel: List[str] = data.get("address")
        channel = [ch for ch in channel if CounterPartyWhiteList.check_in_account_white_list(ch)]
        if channel and len(channel) > 0:
            await pm.listen(channels=channel)

Core Functions

initialize_listeners()

  • Purpose: Listens for incoming data from clients and validates addresses.

  • Flow:

    • Parse the address list sent by the client.

    • Validate addresses against a whitelist (CounterPartyWhiteList).

    • Register channels for listening to position state updates.

async def initialize_listeners():
    async for data in websocket.iter_json():
        channel: List[str] = data.get("address")
        channel = [ch for ch in channel if CounterPartyWhiteList.check_in_account_white_list(ch)]
        if channel and len(channel) > 0:
            await pm.listen(channels=channel)

send_position_data()

  • Purpose: Fetch position updates from the database or message queue and send them to the client.

  • Flow:

    • Retrieve notifications from PsycopgManager.

    • Parse the notification payload for position state updates.

    • Send the parsed data back to the client.

 async def send_position_data(websocket: WebSocket, pm: PsycopgManager):
    await asyncio.sleep(0.1)  # Throttle updates to avoid overwhelming clients
    msg = pm.get_notifies()
    if msg:
        msg_json = json.loads(msg.payload)
        msg_json['data'] = json.loads(msg_json['data'])
        if msg_json.get("should_send"):
            positions_state_msg = map_notification_to_positions_state(msg_json)
            await safe_send_data(websocket, positions_state_msg)

get_data_and_send()

  • Purpose: Combines the above functions into a continuous data retrieval and sending loop.

  • Flow:

    • Call initialize_listeners() to set up subscriptions.

    • Continuously fetch and send position state data to clients.

async def get_data_and_send():
    try:
        while True:
            await safe_function_caller(websocket, send_position_data, websocket, pm)
    except (websockets.ConnectionClosedError, WebSocketDisconnect):
        pass

/funding-rate-ws

This WebSocket enables real-time funding rate updates to clients. Below is the structure and key features of the WebSocket implementation.

@websocket_router.websocket("/funding-rate-ws")
async def get_funding_rate(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket():
        async for data in websocket.iter_json():
            symbols = data.get("symbols", [])
            symbols = [s for s in symbols if s in symbol_whitelist]
            if symbols:
                await safe_function_caller(websocket, queue.put_nowait, symbols)

    async def get_data_and_send():
        while True:
            symbols = await queue.get()
            await safe_function_caller(websocket, send_funding_rate_data, websocket, symbols, queue)

    async def send_funding_rate_data(websocket: WebSocket, symbols: List[str], queue: asyncio.Queue):
        if queue.empty():
            content = get_funding_rates(symbols)  # Fetch funding rates for the symbols
            await safe_send_data(websocket, content)
            await asyncio.sleep(1)
        else:
            await queue.get()

    await run_tasks_until_complete([read_from_socket, get_data_and_send])

Fetching Symbol Funding Details

The funding rate details are fetched from Binance or another external source.

@staticmethod
def get_symbol_funding_details() -> List[SymbolFundingDetail]:
    binance_markets = binance_client.futures_mark_price()
    symbol_details = []
    for symbol in session.scalars_all(Symbol.select().where(Symbol.is_valid.is_(true()))):
        binance_next_funding_time = binance_markets[symbol.title].nextFundingTime // 1000
        symbol_details.append(SymbolFundingDetail(
            symbol_id=symbol.symbol_id,
            binance_mark_price=binance_markets[symbol.title].markPrice,
            binance_last_funding_rate=binance_markets[symbol.title].lastFundingRate,
            binance_next_funding_time=binance_next_funding_time,
            funding_rate_window_time=symbol.funding_rate_window_time
        ))
    return symbol_details

Calculating Funding Details

This method prepares the funding rate details required for applying funding rates to positions. The query groups positions by party_a_address and calculates funding rates:

Applying Funding Rates

Applies calculated rates to positions and update the system. You can read about charging funding rates here.

@classmethod
def apply_funding_rate(cls, funding_detail: FundingDetail, binance_funding_time: int):
    for i in range(0, len(funding_detail.quote_ids), FundingRateBatchSize):
        quote_ids = funding_detail.quote_ids[i:i + FundingRateBatchSize]
        funding_rates = funding_detail.funding_rates[i:i + FundingRateBatchSize]
        try:
            tx_receipt = master_agreement.charge_funding_rate(
                funding_detail.address,
                quote_ids,
                funding_rates,
                enable_gas_estimation=True
            )
            AppliedFundingRate.mark_quotes_as_funding_rate_applied(quote_ids)
            cls.set_update_db_events(funding_detail.address, quote_ids, funding_rates, binance_funding_time, tx_receipt)
        except Exception as e:
            logging.error(f"Failed to apply funding rate: {e}")

Last updated