Subscriptions
What this teaches: how the WebSocket layer is wired, the per-connection limits, and what the callback contract is.
One manager, one connection
Info constructs a WebsocketManager lazily on the first subscribe_* call. The manager owns exactly one WebSocket connection over the graphql-transport-ws protocol and multiplexes every subscription onto it. The manager itself is a daemon threading.Thread — it runs websocket-client's run_forever() and your callbacks fire on that thread.
Every subscribe call returns an int subscription id. Hold onto it; you need it to unsubscribe.
from dango.info import Info
from dango.utils.types import PairId
info = Info("https://api-mainnet.dango.zone")
sid = info.subscribe_perps_trades(PairId("perp/ethusd"), print)
# ...
info.unsubscribe(sid)
info.disconnect_websocket()Connection lifecycle
The handshake follows the graphql-transport-ws spec:
- Client opens the WebSocket and sends
connection_init. - Server replies with
connection_ack. - Subscriptions sent before the ack are queued and flushed afterwards.
The manager pings every 15 seconds (protocol-level {"type": "ping"} JSON, not frame-level ping) to defeat the server's 30-second idle timeout. Server pings are answered with pong automatically.
Per-connection limit
The server caps every WebSocket connection at 30 simultaneous subscriptions. Past 30, the server rejects new subscribe frames with an error message.
To shard subscriptions across that limit, construct multiple Info instances. Each Info lazy-builds its own WebsocketManager and therefore its own connection:
from dango.info import Info
from dango.utils.types import PairId
info_a = Info("https://api-mainnet.dango.zone")
info_b = Info("https://api-mainnet.dango.zone")
for pair in pairs_1_to_30:
info_a.subscribe_perps_trades(pair, on_trade)
for pair in pairs_31_to_60:
info_b.subscribe_perps_trades(pair, on_trade)The callback contract
Callbacks fire on the WebSocket reader thread. Keep them fast — long-running work should hand off to a queue or another thread. Exceptions inside a callback are NOT caught by the manager and will crash the reader thread.
Each subscribe_* method unwraps the GraphQL next payload before invoking your callback, so the argument is the inner node type:
| Method | Callback argument |
|---|---|
subscribe_perps_trades | Trade |
subscribe_perps_candles | PerpsCandle |
subscribe_user_events | PerpsEvent |
subscribe_block | Block |
subscribe_query_app | dict[str, Any] (the contract response) |
Errors arrive through the callback
Server-side errors do NOT raise. They arrive as {"_error": payload} through the same callback:
def on_trade(event):
if "_error" in event:
print("subscription failed:", event["_error"])
return
print(event["fillPrice"], event["fillSize"])After an error, the subscription is terminal — the manager has already dropped the callback. To resume, call subscribe_* again with a new id.
Filtering user events
subscribe_user_events takes an optional event_types list. The filter rule is (type=A AND user=X) OR (type=B AND user=X) — the union of types intersected with the user:
from dango.utils.types import Addr
info.subscribe_user_events(
Addr("0x..."),
on_event,
event_types=["order_persisted", "order_removed"],
)Without event_types, every event for the user streams through.
Polling a query as a subscription
subscribe_query_app re-runs a queryApp request every N blocks (default 10, ~10 seconds at Dango's ~1s block time). Use it to poll contract state when no native subscription exists:
from dango.utils.types import Addr
request = {
"wasm_smart": {
"contract": Addr("0x..."),
"msg": {"my_query": {}},
},
}
info.subscribe_query_app(request, print, block_interval=1)The callback receives the contract's response under payload["response"] (already unwrapped from the kind-keyed envelope).
Next
- Rate limits & quotas — sharding subscriptions, 167/10s HTTP cap, 30/WS sub cap
- Error handling — exception classes and when each is raised