Are you an LLM? Read llms.txt for a summary of the docs, or llms-full.txt for the full context.
Skip to content

Subscriptions

What this teaches: how the WebSocket layer is wired, the per-connection limits, and what the callback contract is.

One manager, one connection

Info constructs a WebsocketManager lazily on the first subscribe_* call. The manager owns exactly one WebSocket connection over the graphql-transport-ws protocol and multiplexes every subscription onto it. The manager itself is a daemon threading.Thread — it runs websocket-client's run_forever() and your callbacks fire on that thread.

Every subscribe call returns an int subscription id. Hold onto it; you need it to unsubscribe.

from dango.info import Info
from dango.utils.types import PairId
 
info = Info("https://api-mainnet.dango.zone")
sid = info.subscribe_perps_trades(PairId("perp/ethusd"), print)
# ...
info.unsubscribe(sid)
info.disconnect_websocket()

Connection lifecycle

The handshake follows the graphql-transport-ws spec:

  1. Client opens the WebSocket and sends connection_init.
  2. Server replies with connection_ack.
  3. Subscriptions sent before the ack are queued and flushed afterwards.

The manager pings every 15 seconds (protocol-level {"type": "ping"} JSON, not frame-level ping) to defeat the server's 30-second idle timeout. Server pings are answered with pong automatically.

Per-connection limit

The server caps every WebSocket connection at 30 simultaneous subscriptions. Past 30, the server rejects new subscribe frames with an error message.

To shard subscriptions across that limit, construct multiple Info instances. Each Info lazy-builds its own WebsocketManager and therefore its own connection:

from dango.info import Info
from dango.utils.types import PairId
 
info_a = Info("https://api-mainnet.dango.zone")
info_b = Info("https://api-mainnet.dango.zone")
 
for pair in pairs_1_to_30:
    info_a.subscribe_perps_trades(pair, on_trade)
 
for pair in pairs_31_to_60:
    info_b.subscribe_perps_trades(pair, on_trade)

The callback contract

Callbacks fire on the WebSocket reader thread. Keep them fast — long-running work should hand off to a queue or another thread. Exceptions inside a callback are NOT caught by the manager and will crash the reader thread.

Each subscribe_* method unwraps the GraphQL next payload before invoking your callback, so the argument is the inner node type:

MethodCallback argument
subscribe_perps_tradesTrade
subscribe_perps_candlesPerpsCandle
subscribe_user_eventsPerpsEvent
subscribe_blockBlock
subscribe_query_appdict[str, Any] (the contract response)

Errors arrive through the callback

Server-side errors do NOT raise. They arrive as {"_error": payload} through the same callback:

def on_trade(event):
    if "_error" in event:
        print("subscription failed:", event["_error"])
        return
    print(event["fillPrice"], event["fillSize"])

After an error, the subscription is terminal — the manager has already dropped the callback. To resume, call subscribe_* again with a new id.

Filtering user events

subscribe_user_events takes an optional event_types list. The filter rule is (type=A AND user=X) OR (type=B AND user=X) — the union of types intersected with the user:

from dango.utils.types import Addr
 
info.subscribe_user_events(
    Addr("0x..."),
    on_event,
    event_types=["order_persisted", "order_removed"],
)

Without event_types, every event for the user streams through.

Polling a query as a subscription

subscribe_query_app re-runs a queryApp request every N blocks (default 10, ~10 seconds at Dango's ~1s block time). Use it to poll contract state when no native subscription exists:

from dango.utils.types import Addr
 
request = {
    "wasm_smart": {
        "contract": Addr("0x..."),
        "msg": {"my_query": {}},
    },
}
 
info.subscribe_query_app(request, print, block_interval=1)

The callback receives the contract's response under payload["response"] (already unwrapped from the kind-keyed envelope).

Next