Are you an LLM? Read llms.txt for a summary of the docs, or llms-full.txt for the full context.
Skip to content

subscribe_block

Stream every newly-finalized block.

Signature

def subscribe_block(self, callback: Callable[[Block], None]) -> int

Example

import time
 
from dango.info import Info
from dango.utils.constants import MAINNET_API_URL
 
info = Info(MAINNET_API_URL)
 
def on_block(block):
    if isinstance(block, dict) and "_error" in block:
        print("error:", block["_error"])
        return
    print(block["blockHeight"], "txs:", len(block["transactions"]))
 
sid = info.subscribe_block(on_block)
time.sleep(30)
info.unsubscribe(sid)
info.disconnect_websocket()

Parameters

callbackCallable[[Block], None]. Invoked once per block. The Block payload includes the full transaction list and the flattened event list (flattenEvents).

Returns

int — the subscription id.

See also