WebsocketClient#
- class starknet_py.net.websockets.websocket_client.WebsocketClient#
Starknet client for WebSocket API.
- __init__(node_url: str)#
- Parameters:
node_url – URL of the node providing the WebSocket API.
- async connect()#
Establishes the WebSocket connection.
- async disconnect()#
Closes the WebSocket connection.
- async subscribe_events(handler: Callable[[NewEventsNotification], Any], from_address: List[int] | int | None = None, keys: List[List[int]] | None = None, block_hash: int | str | Literal['latest'] | None = None, block_number: int | Literal['latest'] | None = None, finality_status: TransactionFinalityStatusWithoutL1 | None = None) str#
Creates a WebSocket stream which will fire events for new Starknet events with applied filters.
- Parameters:
handler – The function to call when a new event is received.
from_address – A contract address or a list of addresses from which events should originate.
keys – The keys to filter events by.
block_hash – Hash of the block to get notifications from or literal “latest”. Mutually exclusive with
block_numberparameter. If not provided, queries block “latest”.block_number – Number (height) of the block to get notifications from or literal “latest”.
finality_status – The finality status of the most recent events to include, default is ACCEPTED_ON_L2. If PRE_CONFIRMED finality is selected, events might appear multiple times, once for each finality status update.
- Returns:
The subscription ID.
Example
from starknet_py.net.websockets.models import NewEventsNotification # Create a handler function that will be called when a new event is emitted def handler(new_events_notification: NewEventsNotification): # Perform the necessary actions with the new event... # Subscribe to new events notifications subscription_id = await websocket_client.subscribe_events( handler=handler, from_address=account.address ) # Here you can put code which will keep the application running (e.g. using loop and `asyncio.sleep`) # ... # Unsubscribe from the notifications unsubscribe_result = await websocket_client.unsubscribe(subscription_id)
- async subscribe_new_heads(handler: Callable[[NewHeadsNotification], Any], block_hash: int | str | Literal['latest'] | None = None, block_number: int | Literal['latest'] | None = None) str#
Creates a WebSocket stream which will fire events for new block headers.
- Parameters:
handler – The function to call when a new block header is received.
block_hash – Hash of the block to get notifications from or literal “latest”. Mutually exclusive with
block_numberparameter. If not provided, queries block “latest”.block_number – Number (height) of the block to get notifications from or literal “latest”.
- Returns:
The subscription ID.
Example
from starknet_py.net.websockets.models import NewHeadsNotification # Create a handler function that will be called when a new block is created def handler(new_heads_notification: NewHeadsNotification): # Perform the necessary actions with the new block... # Subscribe to new heads notifications subscription_id = await websocket_client.subscribe_new_heads(handler=handler) # Here you can put code which will keep the application running (e.g. using loop and `asyncio.sleep`) # ... # Unsubscribe from the notifications unsubscribe_result = await websocket_client.unsubscribe(subscription_id)
- async subscribe_new_transaction_receipts(handler: Callable[[NewTransactionReceiptsNotification], Any], finality_status: List[TransactionFinalityStatusWithoutL1] | None = None, sender_address: List[int] | None = None) str#
Creates a WebSocket stream which will fire events when new transaction receipts are created. An event is fired for each finality status update. It is possible for receipts of pre-confirmed transactions to be received multiple times, or not at all.
- Parameters:
handler – The function to call when a new pending transaction is received.
finality_status – The finality statuses to filter transaction receipts by, default is [ACCEPTED_ON_L2].
sender_address – List of addresses to filter transactions by.
- Returns:
The subscription ID.
Example
from starknet_py.net.websockets.models import NewTransactionReceiptsNotification # Create a handler function that will be called when a new transaction is emitted def handler( new_transaction_receipts_notification: NewTransactionReceiptsNotification, ): # Perform the necessary actions with the new transaction receipts... # Subscribe to new transaction receipts notifications subscription_id = await websocket_client.subscribe_new_transaction_receipts( handler=handler, sender_address=[account.address], ) # Here you can put code which will keep the application running (e.g. using loop and `asyncio.sleep`) # ... # Unsubscribe from the notifications unsubscribe_result = await websocket_client.unsubscribe(subscription_id)
- async subscribe_new_transactions(handler: Callable[[NewTransactionNotification], Any], sender_address: List[int] | None = None, finality_status: List[TransactionStatusWithoutL1] | None = None, tags: List[SubscriptionTag] | None = None) str#
Creates a WebSocket stream which will fire events when a new pending transaction is added. While there is no mempool, this notifies of transactions in the pending block.
- Parameters:
handler – The function to call when a new pending transaction is received.
sender_address – List of sender addresses to filter transactions by.
finality_status – The finality statuses to filter transaction receipts by, default is [ACCEPTED_ON_L2].
tags – Tags that control what additional fields are included in transaction responses.
- Returns:
The subscription ID.
- async subscribe_transaction_status(handler: Callable[[TransactionStatusNotification], Any], transaction_hash: int) str#
Creates a WebSocket stream which at first fires an event with the current known transaction status, followed by events for every transaction status update.
- Parameters:
handler – The function to call when a new transaction status is received.
transaction_hash – The transaction hash to fetch status updates for.
- Returns:
The subscription ID.
Example
from starknet_py.net.websockets.models import TransactionStatusNotification # Create a handler function that will be called when a new transaction status is emitted def handler(transaction_status_notification: TransactionStatusNotification): # Perform the necessary actions with the new transaction status... # Subscribe to transaction status notifications subscription_id = await websocket_client.subscribe_transaction_status( handler=handler, transaction_hash=execute.transaction_hash ) # Here you can put code which will keep the application running (e.g. using loop and `asyncio.sleep`) # ... # Unsubscribe from the notifications unsubscribe_result = await websocket_client.unsubscribe(subscription_id)
- async unsubscribe(subscription_id: str) bool#
Close a previously opened WebSocket stream, with the corresponding subscription id.
- Parameters:
subscription_id – ID of the subscription to close.
- Returns:
True if the unsubscription was successful, False otherwise.
- async wait_closed_or_failed() None#
Awaits until the listener is canceled (on calling
disconnect()method) or WebsocketClient fails with an exception. Ifconnect()was never called or failure happened, this method returns immediately. Raises the original exception on failure.
- property is_connected: bool#
Checks if the WebSocket connection is established.
- Returns:
True if the connection is established, False otherwise.
- property on_chain_reorg: Callable[[ReorgNotification], Any] | None#
The notifications handler for reorganization of the chain. Will be called when subscribing to new heads, events or transaction status.
- Returns:
The handler for reorg notifications.