88import aiohttp
99from aiohttp import WSMsgType , ClientWebSocketResponse
1010
11- # Simplified logging for MicroPython
11+ # Simplified logging for MicroPython with timestamps
1212def _log_debug (msg ):
13- print (f"DEBUG: { msg } " )
13+ print (f"[ DEBUG { time . ticks_ms () } ] { msg } " )
1414
1515def _log_error (msg ):
16- print (f"ERROR: { msg } " )
16+ print (f"[ ERROR { time . ticks_ms () } ] { msg } " )
1717
1818# Simplified ABNF for opcode compatibility
1919class ABNF :
@@ -33,31 +33,36 @@ class WebSocketConnectionClosedException(WebSocketException):
3333class WebSocketTimeoutException (WebSocketException ):
3434 pass
3535
36- # Queue for callback dispatching (in same thread)
36+ # Queue for callback dispatching
3737_callback_queue = ucollections .deque ((), 100 ) # Empty tuple, maxlen=100
3838
3939def _run_callback (callback , * args ):
4040 """Add callback to queue for execution."""
4141 try :
4242 _callback_queue .append ((callback , args ))
43+ _log_debug (f"Queued callback { callback } , queue size: { len (_callback_queue )} " )
4344 except IndexError :
4445 _log_error ("Callback queue full, dropping callback" )
4546
46- def _process_callbacks ():
47- """Process queued callbacks."""
48- while _callback_queue :
49- print ("processing callbacks queue..." )
50- try :
51- callback , args = _callback_queue .popleft ()
52- if callback is not None :
53- try :
54- callback (* args )
55- except Exception as e :
56- _log_error (f"Error in callback { callback } : { e } " )
57- else :
58- print ("Not calling None callback" )
59- except IndexError :
60- break # Queue is empty
47+ async def _process_callbacks_async ():
48+ """Process queued callbacks asynchronously."""
49+ while True :
50+ while _callback_queue :
51+ _log_debug ("Processing callbacks queue..." )
52+ try :
53+ callback , args = _callback_queue .popleft ()
54+ if callback is not None :
55+ _log_debug (f"Executing callback { callback } with args { args } " )
56+ try :
57+ callback (* args )
58+ except Exception as e :
59+ _log_error (f"Error in callback { callback } : { e } " )
60+ else :
61+ _log_debug ("Skipping None callback" )
62+ except IndexError :
63+ _log_debug ("Callback queue empty" )
64+ break
65+ await asyncio .sleep (0.01 ) # Yield to other tasks
6166
6267class WebSocketApp :
6368 def __init__ (
@@ -108,7 +113,9 @@ def __init__(
108113 def send (self , data , opcode = ABNF .OPCODE_TEXT ):
109114 """Send a message."""
110115 if not self .ws or not self .running :
116+ _log_error ("Send failed: Connection closed or not running" )
111117 raise WebSocketConnectionClosedException ("Connection is already closed." )
118+ _log_debug (f"Scheduling send: opcode={ opcode } , data={ str (data )[:20 ]} ..." )
112119 asyncio .create_task (self ._send_async (data , opcode ))
113120
114121 def send_text (self , text_data ):
@@ -121,43 +128,56 @@ def send_bytes(self, data):
121128
122129 def close (self , ** kwargs ):
123130 """Close the WebSocket connection."""
131+ _log_debug ("Close requested" )
124132 self .running = False
125133 asyncio .create_task (self ._close_async ())
126134
127135 async def _close_async (self ):
128136 """Async close implementation."""
137+ _log_debug ("Closing WebSocket connection" )
129138 try :
130139 if self .ws and not self .ws .ws .closed :
140+ _log_debug ("Sending WebSocket close frame" )
131141 await self .ws .close ()
142+ else :
143+ _log_debug ("WebSocket already closed or not initialized" )
132144 if self .session :
145+ _log_debug ("Closing ClientSession" )
133146 await self .session .__aexit__ (None , None , None )
147+ else :
148+ _log_debug ("No ClientSession to close" )
134149 except Exception as e :
135150 _log_error (f"Error closing WebSocket: { e } " )
136151
137- def _start_ping_thread (self ):
152+ def _start_ping_task (self ):
138153 """Start ping task."""
139154 if self .ping_interval :
140- asyncio .create_task (self ._send_ping_async ())
155+ _log_debug (f"NOT Starting ping task with interval { self .ping_interval } s" )
156+ #asyncio.create_task(self._send_ping_async())
141157
142158 def _stop_ping_thread (self ):
143- """No-op, ping handled in async loop ."""
159+ """No-op, ping handled in async task ."""
144160 pass
145161
146162 async def _send_ping_async (self ):
147163 """Send periodic pings."""
148164 while self .running and self .ping_interval :
149165 self .last_ping_tm = time .time ()
150166 try :
151-
152- #await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload)
153- _log_debug ("NOT Sending ping because it seems corrupt" )
167+ if self .ws and not self .ws .ws .closed :
168+ _log_debug (f"Sending ping with payload: { self .ping_payload } " )
169+ await self .ws .send_bytes (self .ping_payload .encode () if isinstance (self .ping_payload , str ) else self .ping_payload )
170+ else :
171+ _log_debug ("Skipping ping: WebSocket not connected" )
154172 except Exception as e :
155- _log_debug (f"Failed to send ping: { e } " )
173+ _log_error (f"Failed to send ping: { e } " )
156174 await asyncio .sleep (self .ping_interval )
157175
158176 def ready (self ):
159177 """Check if connection is active."""
160- return self .ws is not None and self .running
178+ status = self .ws is not None and self .running
179+ _log_debug (f"Connection status: ready={ status } " )
180+ return status
161181
162182 def run_forever (
163183 self ,
@@ -180,6 +200,7 @@ def run_forever(
180200 reconnect = None ,
181201 ):
182202 """Run the WebSocket event loop in the main thread."""
203+ _log_debug ("Starting run_forever" )
183204 if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type :
184205 raise WebSocketException ("Proxy and sockopt not supported in MicroPython" )
185206 if dispatcher :
@@ -200,17 +221,19 @@ def run_forever(
200221 try :
201222 self ._loop .run_until_complete (self ._async_main ())
202223 except KeyboardInterrupt :
203- print ("run_forever got KeyboardInterrupt" )
224+ _log_debug ("run_forever got KeyboardInterrupt" )
204225 self .close ()
205226 return False
206227 except Exception as e :
207- _log_error (f"run_forever got general exception: { e } - returning True " )
228+ _log_error (f"run_forever got general exception: { e } " )
208229 self .has_errored = True
209230 return True
231+ _log_debug ("run_forever completed" )
210232 return self .has_errored
211233
212234 async def _async_main (self ):
213235 """Main async loop for WebSocket handling."""
236+ _log_debug ("Starting _async_main" )
214237 reconnect = 0 # Default, as RECONNECT may not be defined
215238 try :
216239 from websocket import RECONNECT
@@ -219,57 +242,70 @@ async def _async_main(self):
219242 pass
220243 if reconnect is not None :
221244 reconnect = reconnect
245+ _log_debug (f"Reconnect interval set to { reconnect } s" )
246+
247+ # Start callback processing task
248+ callback_task = asyncio .create_task (_process_callbacks_async ())
249+ _log_debug ("Started callback processing task" )
222250
223251 while self .running :
224- print ("self.running" )
225- time .sleep (1 )
252+ _log_debug ("Main loop iteration: self.running=True" )
226253 try :
227254 await self ._connect_and_run ()
228255 except Exception as e :
229256 _log_error (f"_async_main got exception: { e } " )
230257 self .has_errored = True
231258 _run_callback (self .on_error , self , e )
232259 if not reconnect :
260+ _log_debug ("No reconnect configured, breaking loop" )
233261 break
234- _log_debug (f"Reconnecting after error: { e } " )
262+ _log_debug (f"Reconnecting after error in { reconnect } s " )
235263 await asyncio .sleep (reconnect )
236264 if self .on_reconnect :
237265 _run_callback (self .on_reconnect , self )
238266
239267 # Cleanup
268+ _log_debug ("Initiating cleanup" )
240269 self .running = False
241- _log_debug ("websocket.py: closing..." )
270+ callback_task .cancel () # Stop callback task
271+ try :
272+ await callback_task
273+ except asyncio .CancelledError :
274+ _log_debug ("Callback task cancelled" )
242275 await self ._close_async ()
243276 _run_callback (self .on_close , self , None , None )
277+ _log_debug ("_async_main completed" )
244278
245279 async def _connect_and_run (self ):
246280 """Connect and handle WebSocket messages."""
281+ _log_debug (f"Connecting to { self .url } " )
247282 ssl_context = None
248283 if self .url .startswith ("wss://" ):
249284 import ssl
250285 ssl_context = ssl .SSLContext (ssl .PROTOCOL_TLS_CLIENT )
251286 ssl_context .verify_mode = ssl .CERT_NONE
287+ _log_debug ("Using SSL with no certificate verification" )
252288
253289 self .session = aiohttp .ClientSession (headers = self .header )
254290 async with self .session .ws_connect (self .url , ssl = ssl_context ) as ws :
255291 self .ws = ws
256- print ( " running on_open callback... " )
292+ _log_debug ( "WebSocket connected, running on_open callback" )
257293 _run_callback (self .on_open , self )
258- print ("done running on_open callback." )
259- self ._start_ping_thread ()
294+ self ._start_ping_task ()
260295
261296 async for msg in ws :
262- print (f"websocket.py received msg: type { msg .type } - { msg .data [ 0 : 20 ] } " )
297+ _log_debug (f"Received msg: type= { msg .type } , data= { str ( msg .data )[: 30 ] } ... " )
263298 if not self .running :
299+ _log_debug ("Not running, breaking message loop" )
264300 break
265301
266302 # Handle ping/pong timeout
267303 if self .ping_timeout and self .last_ping_tm :
268304 if time .time () - self .last_ping_tm > self .ping_timeout :
305+ _log_error ("Ping/pong timed out" )
269306 raise WebSocketTimeoutException ("ping/pong timed out" )
270307
271308 # Process message
272- _process_callbacks () # Process callbacks in same thread
273309 if msg .type == WSMsgType .TEXT :
274310 data = msg .data
275311 _run_callback (self .on_data , self , data , ABNF .OPCODE_TEXT , True )
@@ -279,18 +315,22 @@ async def _connect_and_run(self):
279315 _run_callback (self .on_data , self , data , ABNF .OPCODE_BINARY , True )
280316 _run_callback (self .on_message , self , data )
281317 elif msg .type == WSMsgType .ERROR or ws .ws .closed :
318+ _log_error ("WebSocket error or closed" )
282319 raise WebSocketConnectionClosedException ("WebSocket closed" )
283320
284321 async def _send_async (self , data , opcode ):
285322 """Async send implementation."""
323+ _log_debug (f"Sending: opcode={ opcode } , data={ str (data )[:20 ]} ..." )
286324 try :
287325 if opcode == ABNF .OPCODE_TEXT :
288326 await self .ws .send_str (data )
289327 elif opcode == ABNF .OPCODE_BINARY :
290328 await self .ws .send_bytes (data )
291329 else :
292330 raise WebSocketException (f"Unsupported opcode: { opcode } " )
331+ _log_debug ("Send successful" )
293332 except Exception as e :
333+ _log_error (f"Send failed: { e } " )
294334 _run_callback (self .on_error , self , e )
295335
296336 def _callback (self , callback , * args ):
@@ -299,8 +339,10 @@ def _callback(self, callback, *args):
299339
300340 def _get_close_args (self , close_frame ):
301341 """Extract close code and reason (simplified)."""
342+ _log_debug ("Getting close args (not supported)" )
302343 return [None , None ] # aiohttp doesn't provide close frame details
303344
304345 def create_dispatcher (self , ping_timeout , dispatcher , is_ssl , handleDisconnect ):
305346 """Not supported."""
347+ _log_error ("Custom dispatcher not supported" )
306348 raise WebSocketException ("Custom dispatcher not supported" )
0 commit comments