Source code for tradehub.websocket_client

from typing import Optional, List, Callable
import websockets
import json


[docs]class DemexWebsocket: """ DemexWebsocket is a high-level async implementation off the official Tradehub Demex websocket and provides all functionalities described in the documentation. """ def __init__(self, uri: str, ping_interval: Optional[int] = 10, ping_timeout: Optional[int] = 30): """ Create a websocket which is complaint with the specification provided by the offical documentation. .. see:: https://docs.switcheo.org/#/?id=websocket :param uri: Websocket URI, starting with 'ws://' or 'wss://' e.g. 'ws://85.214.81.155:5000/ws' :param ping_interval: Interval for pinging the server in seconds. :param ping_timeout: Time after no response for pings are considered as timeout in seconds. """ self._uri: str = uri self._ping_interval: int = ping_interval self._ping_timeout: int = ping_timeout self._websocket: Optional[websockets.WebSocketClientProtocol] = None
[docs] async def subscribe(self, message_id: str, channels: List[str]): """ Subscribe to one or many channels. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param channels: List with channels to join. :return: None """ await self.send({ "id": message_id, "method": "subscribe", "params": {"channels": channels} })
[docs] async def unsubscribe(self, message_id: str, channels: List[str]): """ Unsubscribe to one or many channels. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param channels: List with channels to leave. :return: None """ await self.send({ "id": message_id, "method": "unsubscribe", "params": {"channels": channels} })
[docs] async def subscribe_leverages(self, message_id: str, swth_address: str): """ Subscribe to wallet specific leverages channel. .. warning:: This channel has not been tested yet. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :return: None """ # TODO not tested yet channel_name: str = f"leverages.{swth_address}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_market_stats(self, message_id: str): """ Subscribe to market stats. Example:: ws_client.subscribe_market_stats('market_stats') The initial channel message is expected as:: { 'id':'market_stats', 'result': ['market_stats'] } The subscription and channel messages are expected as follow:: { 'channel': 'market_stats', 'sequence_number': 484, 'result': { 'cel1_usdc1': { 'day_high': '5.97', 'day_low': '5.72', 'day_open': '5.86', 'day_close': '5.74', 'day_volume': '414.4', 'day_quote_volume': '2429.009', 'index_price': '0', 'mark_price': '0', 'last_price': '5.74', 'market': 'cel1_usdc1', 'market_type': 'spot', 'open_interest': '0' } ... } } :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :return: None """ channel_name: str = "market_stats" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_books(self, message_id: str, market: str): """ Subscribe to book channel. Example:: ws_client.subscribe_books('orderbook', "swth_eth1') The initial channel message is expected as:: { 'id':'orderbook', 'result': ['books.eth1_usdc1', ...] } The initial subscription message is expected as:: { 'channel': 'books.eth1_usdc1', 'sequence_number': 924, 'result': [ { 'market': 'eth1_usdc1', 'price': '1797.1', 'quantity': '0.02', 'side': 'sell', 'type': 'new' }, ... { 'market': 'eth1_usdc1', 'price': '1790.1', 'quantity': '0.02', 'side': 'buy', 'type': 'new' } ... ] } The channel update messages are expected as:: { 'channel': 'books.eth1_usdc1', 'sequence_number': 924, 'result': [ { 'market': 'eth1_usdc1', 'price': '1797.1', 'quantity': '0', 'side': 'sell', 'type': 'delete' }, ... { 'market':'eth1_usdc1', 'price': '1800.18', 'quantity': '-0.43', 'side': 'sell', 'type': 'update' }, ... { 'market': 'eth1_usdc1', 'price': '1114.48', 'quantity': '182.716', 'side': 'buy', 'type': 'new' } ] } .. note:: The initial message is a snapshot of the current orderbook. The following messages are delta messages to the snapshot. Each message has a 'sequence_number'. Updates can contain update types: 'new', 'update' or 'delete'. The quantity in a 'update' message can be negative indicating a reduction, while positive value means an increment. All updates need to be processed in the provided order to maintain an consistent orderbook. .. warning:: The initial snapshot is a partial orderbook with a total of 100 entries! Expect receiving updates for orders outside the local managed orderbook. Ignore or reconnect to maintain the local orderbook. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ channel_name: str = f"books.{market}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_orders(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Subscribe to orders channel. .. note:: The market identifier is optional and acts as a filter. Example:: ws_client.subscribe_orders('orders', "swth1...abcd') The initial channel message is expected as:: { 'id':'orders', 'result': ['orders.swth1...abcd'] } The channel update messages are expected as:: { 'channel': 'orders.swth1...abcd', 'result': [ { 'order_id': '7CBBF51B75CF2E046726BB...56757D6D502B01F4BB62178DCF', 'block_height': 7375724, 'triggered_block_height': 0, 'address': 'swth1...abcd', 'market': 'eth1_wbtc1', 'side': 'sell', 'price': '0', 'quantity': '0.08', 'available': '0.08', 'filled': '0', 'order_status': 'pending', 'order_type': 'market', 'initiator': 'user', 'time_in_force': 'fok', 'stop_price': '0', 'trigger_type': '', 'allocated_margin_denom': 'eth1', 'allocated_margin_amount': '0', 'is_liquidation': False, 'is_post_only': False, 'is_reduce_only': False, 'type': 'new', 'block_created_at': '2021-02-11T20:36:02.244175356Z', 'username': '', 'id': '' } ... ] } :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ if market: channel_name: str = f"orders_by_market.{market}.{swth_address}" else: channel_name: str = f"orders.{swth_address}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_positions(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Subscribe to positions channel. .. note:: The market identifier is optional and acts as a filter. .. warning:: This channel is not tested yet. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ # TODO not tested yet if market: channel_name: str = f"positions_by_market.{market}.{swth_address}" else: channel_name: str = f"positions.{swth_address}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_recent_trades(self, message_id: str, market: str): """ Subscribe to recent trades. Example:: ws_client.subscribe_recent_trades('trades', "swth_eth1') The initial channel message is expected as:: { 'id': 'trades', 'result': ['recent_trades.swth_eth1'] } The channel update messages are expected as:: { 'channel': 'recent_trades.eth1_usdc1', 'sequence_number': 812, 'result': [ { 'id': '0', 'block_created_at': '2021-02-11T20:49:07.095418551Z', 'taker_id': '5FF349410F9CF59BED36D412D1223424835342274BC0E504ED0A17EE4B5B0856', 'taker_address': 'swth1vaavrkrm7usqg9hcwhqh2hev9m3nryw7aera8p', 'taker_fee_amount': '0.00002', 'taker_fee_denom': 'eth1', 'taker_side': 'buy', 'maker_id': '8334A9C97CAEFAF84774AAADB0D5666E7764BA023DF145C8AF90BB6A6862EA2E', 'maker_address': 'swth1wmcj8gmz4tszy5v8c0d9lxnmguqcdkw22275w5', 'maker_fee_amount': '-0.00001', 'maker_fee_denom': 'eth1', 'maker_side': 'sell', 'market': 'eth1_usdc1', 'price': '1797.1', 'quantity': '0.02', 'liquidation': '', 'taker_username': '', 'maker_username': '', 'block_height': '7376096' }, ... ] } .. warning:: The field 'id' is sometimes '0'. This endpoint/channel does not seem to work correct. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ channel_name: str = f"recent_trades.{market}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_account_trades(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Subscribe to account trades. Example:: ws_client.subscribe_account_trades('account', 'swth...abcd', 'eth1_usdc1') # or for all markets ws_client.subscribe_account_trades('account', "swth...abcd') The initial channel message is expected as:: { 'id': 'account', 'result': ['account_trades_by_market.eth1_usdc1.swth1...abcd'] } # or for all markets { 'id': 'account', 'result': ['account_trades.swth1...abcd'] } The channel update messages are expected as:: { 'channel': 'recent_trades.eth1_usdc1', 'sequence_number': 812, 'result': [ { 'id': '0', 'block_created_at': '2021-02-11T20:49:07.095418551Z', 'taker_id': '5FF349410F9CF59BED36D412D1223424835342274BC0E504ED0A17EE4B5B0856', 'taker_address': 'swth1...taker', 'taker_fee_amount': '0.00002', 'taker_fee_denom': 'eth1', 'taker_side': 'buy', 'maker_id': '8334A9C97CAEFAF84774AAADB0D5666E7764BA023DF145C8AF90BB6A6862EA2E', 'maker_address': 'swth1...maker', 'maker_fee_amount': '-0.00001', 'maker_fee_denom': 'eth1', 'maker_side': 'sell', 'market': 'eth1_usdc1', 'price': '1797.1', 'quantity': '0.02', 'liquidation': '', 'taker_username': '', 'maker_username': '', 'block_height': '7376096' }, ... ] } .. note:: The market identifier is optional and acts as a filter. .. warning:: The field 'id' is '0' all the time. This endpoint/channel does not seem to work correct. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ if market: channel_name: str = f"account_trades_by_market.{market}.{swth_address}" else: channel_name: str = f"account_trades.{swth_address}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_balances(self, message_id: str, swth_address: str): """ Subscribe to wallet specific balance channel. Example:: ws_client.subscribe_balances('balance', "swth1...abcd') The initial channel message is expected as:: { 'id': 'balance', 'result': ['balances.swth1...abcd'] } The subscription and channel messages are expected as follow:: { 'channel': 'balances.swth1vaavrkrm7usqg9hcwhqh2hev9m3nryw7aera8p', 'result': { 'eth1': { 'available': '0.83941506825', 'order': '0', 'position': '0', 'denom': 'eth1' }, ... } } :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :return: None """ channel_name: str = f"balances.{swth_address}" await self.subscribe(message_id, [channel_name])
[docs] async def subscribe_candlesticks(self, message_id: str, market: str, granularity: int): """ Subscribe to candlesticks channel. Example:: ws_client.subscribe_candlesticks('candle', "swth_eth1', 1) The initial channel message is expected as:: { 'id': 'candle', 'result': ['candlesticks.swth_eth1.1'] } The subscription and channel messages are expected as follow:: { 'channel': 'candlesticks.swth_eth1.1', 'sequence_number': 57, 'result': { 'id': 0, 'market':'swth_eth1', 'time': '2021-02-17T10:59:00Z', 'resolution': 1, 'open': '0.000018', 'close': '0.000018', 'high': '0.000018', 'low': '0.000018', 'volume': '5555', 'quote_volume': '0.09999' } } :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param market: Tradehub market identifier, e.g. 'swth_eth1' :param granularity: Define the candlesticks granularity. Allowed values: 1, 5, 15, 30, 60, 360, 1440. :return: None """ if granularity not in [1, 5, 15, 30, 60, 360, 1440]: raise ValueError(f"Granularity '{granularity}' not supported. Allowed values: 1, 5, 15, 30, 60, 360, 1440") channel_name: str = f"candlesticks.{market}.{granularity}" await self.subscribe(message_id, [channel_name])
[docs] async def get_order_history(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Request up to 200 order histories. Example:: ws_client.get_order_history('order_history', "swth1vaavrkrm7usqg9hcwhqh2hev9m3nryw7aera8p") The expected return result for this function is as follows:: { "id": "order_history", "result": [ { "order_id": "C7D7DDDCFDC68DF2D078CBD8630B657148893AC24CF8DB8F2E23293C6EDC90AD", "block_height": 7561537, "triggered_block_height": 0, "address": "swth1vaavrkrm7usqg9hcwhqh2hev9m3nryw7aera8p", "market": "wbtc1_usdc1", "side": "sell", "price": "0", "quantity": "0.0011", "available": "0", "filled": "0.0011", "order_status": "filled", "order_type": "market", "initiator": "user", "time_in_force": "fok", "stop_price": "0", "trigger_type": "", "allocated_margin_denom": "wbtc1", "allocated_margin_amount": "0", "is_liquidation": false, "is_post_only": false, "is_reduce_only": false, "type": "", "block_created_at": "2021-02-16T08:31:13.225303Z", "username": "", "id": "2315998" }, ... ] } .. note:: The market identifier is optional and acts as a filter. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ await self.send({ "id": message_id, "method": "get_order_history", "params": { "address": swth_address, "market": market } })
[docs] async def get_recent_trades(self, message_id: str, market: str): """ Request up to 100 recent trades for a market. Example:: ws_client.get_recent_trades('recent_trades', "swth_eth1") The expected return result for this function is as follows:: { "id": "recent_trades", "sequence_number": 3, "result": [ { "id": "0", "block_created_at": "2021-02-16T10:21:31.346041707Z", "taker_id": "3F71918F83D84639F505464335FD355105EE63E622CBB819AAFBBAC97368CC7A", "taker_address": "swth1ysezxr46dhd4dzjsswqte35wfm0ml5dxx97aqt", "taker_fee_amount": "3.2475", "taker_fee_denom": "swth", "taker_side": "buy", "maker_id": "343590CF4F54FEB1E2429F60B77CD3BED701A040418AEB914BB41D561E24E7DE", "maker_address": "swth1a5v8pyhkzjjmyw03mh9zqfakwyu0t5wkv0tf66", "maker_fee_amount": "-0.6495", "maker_fee_denom": "swth", "maker_side": "sell", "market": "swth_eth1", "price": "0.0000182", "quantity": "1299", "liquidation": "", "taker_username": "", "maker_username": "", "block_height": "7564715" }, ... ] } .. warning:: The field 'id' is sometimes '0'. This endpoint/channel does not seem to work correct. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ await self.send({ "id": message_id, "method": "get_recent_trades", "params": { "market": market } })
[docs] async def get_candlesticks(self, message_id: str, market: str, granularity: int, from_epoch: int, to_epoch: int): """ Requests candlesticks for market with granularity. Example:: ws_client.get_candlesticks('recent_trades', "swth_eth1") The subscription and channel messages are expected as follow:: { 'id': 'candlesticks.swth_eth1.1', 'sequence_number': 57, 'result': [ { 'id': 0, 'market':'swth_eth1', 'time': '2021-02-17T10:59:00Z', 'resolution': 1, 'open': '0.000018', 'close': '0.000018', 'high': '0.000018', 'low': '0.000018', 'volume': '5555', 'quote_volume': '0.09999' } ] } .. note:: Only candles with non empty volume will be returned. Expect almost none or just a few candles with a low granularity. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param market: Tradehub market identifier, e.g. 'swth_eth1' :param granularity: Define the candlesticks granularity. Allowed values: 1, 5, 15, 30, 60, 360, 1440. :param from_epoch: Starting from epoch seconds. :param to_epoch: Ending to epoch seconds. :return: None """ if granularity not in [1, 5, 15, 30, 60, 360, 1440]: raise ValueError(f"Granularity '{granularity}' not supported. Allowed values: 1, 5, 15, 30, 60, 360, 1440") await self.send({ "id": message_id, "method": "get_candlesticks", "params": { "market": market, "resolution": str(granularity), "from": str(from_epoch), "to": str(to_epoch) } })
[docs] async def get_open_orders(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Request open orders. Example:: ws_client.get_open_orders('open_orders', "swth1p5hjhag5glkpqaj0y0vn3au7x0vz33k0gxuejk") The expected return result for this function is as follows:: { "id": "open_orders", "result": [ { "order_id": "A7C488A6AE25249E90523FCD603236342025340E3DCAE6A6312133905C41794C", "block_height": 7564973, "triggered_block_height": 0, "address": "swth1p5hjhag5glkpqaj0y0vn3au7x0vz33k0gxuejk", "market": "swth_eth1", "side": "sell", "price": "0.0000181", "quantity": "58806", "available": "58806", "filled": "0", "order_status": "open", "order_type": "limit", "initiator": "amm", "time_in_force": "gtc", "stop_price": "0", "trigger_type": "", "allocated_margin_denom": "swth", "allocated_margin_amount": "0", "is_liquidation": false, "is_post_only": false, "is_reduce_only": false, "type": "", "block_created_at": "2021-02-16T10:30:27.079962Z", "username": "", "id": "2316597" }, ... ] } .. note:: The market identifier is optional and acts as a filter. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ await self.send({ "id": message_id, "method": "get_open_orders", "params": { "address": swth_address, "market": market } })
[docs] async def get_account_trades(self, message_id: str, swth_address: str, market: Optional[str] = None, page: Optional[int] = None): """ Request up to 100 account trades. Example:: ws_client.get_account_trades('account_trades', 'swth1vaavrkrm7usqg9hcwhqh2hev9m3nryw7aera8p') The expected return result for this function is as follows:: { "id": "account_trades", "result": [ { "base_precision": 8, "quote_precision": 6, "fee_precision": 6, "order_id": "C7D7DDDCFDC68DF2D078CBD8630B657148893AC24CF8DB8F2E23293C6EDC90AD", "market": "wbtc1_usdc1", "side": "sell", "quantity": "0.0001", "price": "48745.12", "fee_amount": "0.004875", "fee_denom": "usdc1", "address": "swth1vaavrkrm7usqg9hcwhqh2hev9m3nryw7aera8p", "block_height": "7561537", "block_created_at": "2021-02-16T08:31:13.225303Z", "id": 289733 }, ... ] } .. note:: The market identifier is optional and acts as a filter. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1'. :param page: Used for pagination. :return: None """ await self.send({ "id": message_id, "method": "get_account_trades", "params": { "address": swth_address, "market": market, "page": str(page) if page else None } })
[docs] async def get_market_stats(self, message_id: str, market: Optional[str] = None): """ Request market stats. Example:: ws_client.get_market_stats('market_stats') The expected return result for this function is as follows:: { "id": "market_stats", "result": { "eth1_usdc1": { "day_high": "1818.51", "day_low": "1751.81", "day_open": "1760.07", "day_close": "1788.19", "day_volume": "36.503", "day_quote_volume": "65153.50224", "index_price": "0", "mark_price": "0", "last_price": "1788.19", "market": "eth1_usdc1", "market_type": "spot", "open_interest": "0" }, ... } } .. warning:: Parameter 'market' has no effect. Maybe not intended as parameter. Request will result in all market stats. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param market: Tradehub market identifier, e.g. 'swth_eth1' :return: None """ # TODO market has no effect await self.send({ "id": message_id, "method": "get_market_stats", "params": { "market": market } })
[docs] async def get_leverages(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Request leverages. .. note:: The market identifier is optional and acts as a filter. .. warning:: The request method has not been tested yet. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1'. :return: None """ # TODO not tested yet await self.send({ "id": message_id, "method": "get_leverages", "params": { "address": swth_address, "market": market } })
[docs] async def get_open_positions(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Request open positions. .. note:: The market identifier is optional and acts as a filter. .. warning:: The request method has not been tested yet. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1'. :return: None """ # TODO not tested yet await self.send({ "id": message_id, "method": "get_open_positions", "params": { "address": swth_address, "market": market } })
[docs] async def get_closed_positions(self, message_id: str, swth_address: str, market: Optional[str] = None): """ Request closed positions. .. note:: The market identifier is optional and acts as a filter. .. warning:: The request method has not been tested yet. :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to identify which channel the notification is originated from. :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet. :param market: Tradehub market identifier, e.g. 'swth_eth1'. :return: None """ # TODO not tested yet await self.send({ "id": message_id, "method": "get_closed_positions", "params": { "address": swth_address, "market": market } })
[docs] async def send(self, data: dict): """ Send data to websocket server. Provided data will be translated to json. :param data: data as dictionary. :return: """ await self._websocket.send(json.dumps(data))
[docs] def open(self) -> bool: """ Check if the connection is open. :return: Bool """ if not self._websocket: return False return self._websocket.open
[docs] async def disconnect(self): """ Safely close the websocket connection. :return: """ if self._websocket: await self._websocket.close()
[docs] async def connect(self, on_receive_message_callback: Callable, on_connect_callback: Optional[Callable] = None, on_error_callback: Optional[Callable] = None): """ Connect to websocket server. .. warning:: Callbacks need to be NON-BLOCKING! Otherwise the PING-PONG coroutine is blocked and the server will close the connection. You will not receive any notification about this. :param on_receive_message_callback: async callback which is called with the received message as dict. :param on_connect_callback: async callback which is called if websocket is connected. :param on_error_callback: async callback which is called if websocket has an error. :return: None """ try: async with websockets.connect(self._uri, ping_interval=self._ping_interval, ping_timeout=self._ping_timeout) as websocket: self._websocket = websocket if on_connect_callback: await on_connect_callback() async for message in websocket: data = json.loads(message) await on_receive_message_callback(data) except Exception as e: if on_error_callback: await on_error_callback(e) else: raise e