33# Compatible with websocket-client's WebSocketApp API, using MicroPython aiohttp
44
55import uasyncio as asyncio
6- import _thread
76import time
87import ucollections
98import aiohttp
@@ -34,18 +33,18 @@ class WebSocketConnectionClosedException(WebSocketException):
3433class WebSocketTimeoutException (WebSocketException ):
3534 pass
3635
37- # Queue for cross-thread callback dispatching
36+ # Queue for callback dispatching (in same thread)
3837_callback_queue = ucollections .deque ((), 100 ) # Empty tuple, maxlen=100
3938
4039def _run_callback (callback , * args ):
41- """Add callback to queue for main thread execution."""
40+ """Add callback to queue for execution."""
4241 try :
4342 _callback_queue .append ((callback , args ))
4443 except IndexError :
4544 _log_error ("Callback queue full, dropping callback" )
4645
4746def _process_callbacks ():
48- """Process queued callbacks in the main thread ."""
47+ """Process queued callbacks."""
4948 while _callback_queue :
5049 try :
5150 callback , args = _callback_queue .popleft ()
@@ -97,21 +96,19 @@ def __init__(
9796 self .ws = None
9897 self .session = None
9998 self .running = False
100- self .thread = None
10199 self .ping_interval = 0
102100 self .ping_timeout = None
103101 self .ping_payload = ""
104102 self .last_ping_tm = 0
105103 self .last_pong_tm = 0
106104 self .has_errored = False
105+ self ._loop = asyncio .get_event_loop ()
107106
108107 def send (self , data , opcode = ABNF .OPCODE_TEXT ):
109108 """Send a message."""
110109 if not self .ws or not self .running :
111110 raise WebSocketConnectionClosedException ("Connection is already closed." )
112- # Schedule send in async loop
113- loop = asyncio .get_event_loop ()
114- asyncio .run_coroutine_threadsafe (self ._send_async (data , opcode ), loop )
111+ asyncio .create_task (self ._send_async (data , opcode ))
115112
116113 def send_text (self , text_data ):
117114 """Send UTF-8 text."""
@@ -124,18 +121,22 @@ def send_bytes(self, data):
124121 def close (self , ** kwargs ):
125122 """Close the WebSocket connection."""
126123 self .running = False
127- if self .ws :
128- loop = asyncio .get_event_loop ()
129- asyncio .run_coroutine_threadsafe (self .ws .close (), loop )
130- if self .session :
131- loop = asyncio .get_event_loop ()
132- asyncio .run_coroutine_threadsafe (self .session .__aexit__ (None , None , None ), loop )
124+ asyncio .create_task (self ._close_async ())
125+
126+ async def _close_async (self ):
127+ """Async close implementation."""
128+ try :
129+ if self .ws and not self .ws .ws .closed :
130+ await self .ws .close ()
131+ if self .session :
132+ await self .session .__aexit__ (None , None , None )
133+ except Exception as e :
134+ _log_error (f"Error closing WebSocket: { e } " )
133135
134136 def _start_ping_thread (self ):
135- """Simulate ping/pong in async loop ."""
137+ """Start ping task ."""
136138 if self .ping_interval :
137- loop = asyncio .get_event_loop ()
138- asyncio .run_coroutine_threadsafe (self ._send_ping_async (), loop )
139+ asyncio .create_task (self ._send_ping_async ())
139140
140141 def _stop_ping_thread (self ):
141142 """No-op, ping handled in async loop."""
@@ -176,7 +177,7 @@ def run_forever(
176177 proxy_type = None ,
177178 reconnect = None ,
178179 ):
179- """Run the WebSocket event loop."""
180+ """Run the WebSocket event loop in the main thread ."""
180181 if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type :
181182 raise WebSocketException ("Proxy and sockopt not supported in MicroPython" )
182183 if dispatcher :
@@ -193,25 +194,19 @@ def run_forever(
193194 self .ping_payload = ping_payload
194195 self .running = True
195196
196- # Start async event loop in a separate thread
197- self .thread = _thread .start_new_thread (self ._run_async_loop , ())
198-
199- # Main thread processes callbacks
197+ # Run the event loop in the main thread
200198 try :
201- while self .running :
202- _process_callbacks ()
203- time .sleep (0.01 ) # Yield to other tasks
199+ self ._loop .run_until_complete (self ._async_main ())
204200 except KeyboardInterrupt :
201+ print ("run_forever got KeyboardInterrupt" )
205202 self .close ()
206203 return False
204+ except Exception as e :
205+ _log_error (f"run_forever got general exception: { e } - returning True" )
206+ self .has_errored = True
207+ return True
207208 return self .has_errored
208209
209- def _run_async_loop (self ):
210- """Run uasyncio event loop in a separate thread."""
211- loop = asyncio .get_event_loop ()
212- loop .run_until_complete (self ._async_main ())
213- loop .run_forever ()
214-
215210 async def _async_main (self ):
216211 """Main async loop for WebSocket handling."""
217212 reconnect = 0 # Default, as RECONNECT may not be defined
@@ -227,7 +222,7 @@ async def _async_main(self):
227222 try :
228223 await self ._connect_and_run ()
229224 except Exception as e :
230- print (f"_async_main got exception { e } " )
225+ _log_error (f"_async_main got exception: { e } " )
231226 self .has_errored = True
232227 _run_callback (self .on_error , self , e )
233228 if not reconnect :
@@ -239,11 +234,8 @@ async def _async_main(self):
239234
240235 # Cleanup
241236 self .running = False
242- if self .ws :
243- print ("websocket.py: closing..." )
244- await self .ws .close ()
245- if self .session :
246- await self .session .__aexit__ (None , None , None )
237+ _log_debug ("websocket.py: closing..." )
238+ await self ._close_async ()
247239 _run_callback (self .on_close , self , None , None )
248240
249241 async def _connect_and_run (self ):
@@ -270,6 +262,7 @@ async def _connect_and_run(self):
270262 raise WebSocketTimeoutException ("ping/pong timed out" )
271263
272264 # Process message
265+ _process_callbacks () # Process callbacks in same thread
273266 if msg .type == WSMsgType .TEXT :
274267 data = msg .data
275268 _run_callback (self .on_data , self , data , ABNF .OPCODE_TEXT , True )
0 commit comments