Subscriptions
What this teaches: the graphql-transport-ws model, when to multiplex over a Session, and when to open a dedicated connection with WsClient::subscribe.
Mental model
WsClient is a cheap configuration value. Every live socket is a Session — an Arc-backed handle that owns a background tokio task driving one WebSocket. Subscriptions are tagged with protocol-level ids and routed back to per-stream channels.
The protocol is graphql-transport-ws (the graphql-ws rewrite that async-graphql speaks). The SDK sends a Ping every 15 seconds to stay under the server's 30-second keepalive_timeout.
Two entry points
| Entry point | Connections | When to use |
|---|---|---|
Session::subscribe | One socket, many subscriptions | Long-lived process with several streams. Pay the handshake cost once. |
WsClient::subscribe | One socket per subscription | One-shot stream, short-lived task, or trivial demo. |
Multiplexed Session
use {
anyhow::Result,
dango_sdk::{SubscribeBlock, SubscribeTrades, WsClient, subscribe_block, subscribe_trades},
futures::StreamExt,
};
#[tokio::main]
async fn main() -> Result<()> {
let session = WsClient::new("wss://api-mainnet.dango.zone/graphql")?
.connect()
.await?;
let mut blocks = session
.subscribe::<SubscribeBlock>(subscribe_block::Variables {})
.await?;
let mut trades = session
.subscribe::<SubscribeTrades>(subscribe_trades::Variables {
base_denom: "dango".into(),
quote_denom: "bridge/usdc".into(),
})
.await?;
loop {
tokio::select! {
Some(item) = blocks.next() => println!("block: {item:?}"),
Some(item) = trades.next() => println!("trade: {item:?}"),
else => break,
}
}
Ok(())
}The Session and every clone close together. The connection drops when the last clone and every derived stream have been dropped — see Session.
Dedicated WsClient::subscribe
use {
anyhow::Result,
dango_sdk::{SubscribeBlock, WsClient, subscribe_block},
futures::StreamExt,
};
#[tokio::main]
async fn main() -> Result<()> {
let ws = WsClient::new("wss://api-mainnet.dango.zone/graphql")?;
let mut stream = ws
.subscribe::<SubscribeBlock>(subscribe_block::Variables {})
.await?;
while let Some(item) = stream.next().await {
println!("{item:?}");
}
Ok(())
}Convenience: connect + subscribe + drop session when the stream ends.
Sugar: SubscriptionVariables
Each of the 13 codegen Variables types implements SubscriptionVariables, letting you call vars.subscribe(&ws) instead of ws.subscribe::<Q>(vars):
use {
dango_sdk::{SubscriptionVariables, WsClient, subscribe_block},
futures::StreamExt,
};
let ws = WsClient::new("wss://api-mainnet.dango.zone/graphql")?;
let mut stream = subscribe_block::Variables {}.subscribe(&ws).await?;
while let Some(item) = stream.next().await {
println!("{item:?}");
}
// Ok::<(), anyhow::Error>(())Stream item shape
type Item<T> = Result<graphql_client::Response<T>, dango_sdk::WsError>;Err(WsError)— transport-level or subscription-protocol failure. Inspect the variant; some are terminal.Ok(Response { data, errors, .. })— a server-side payload. GraphQL errors live inerrors;datamay beNoneif every selection errored.
while let Some(item) = stream.next().await {
match item {
Ok(resp) => {
if let Some(errors) = resp.errors {
eprintln!("graphql: {errors:?}");
}
if let Some(data) = resp.data {
println!("{data:?}");
}
}
Err(err) => {
eprintln!("ws: {err}");
break;
}
}
}
// Ok::<(), anyhow::Error>(())Limits
The server caps each connection at 30 concurrent subscriptions. When you need more, shard across multiple WsClient/Session pairs.
Next
- Rate limits and quotas — server-side limits including the 30/WS cap.
- Session — lifecycle and
Dropsemantics.