Skip to content

Commit 4a27a13

Browse files
websocket.py: remove run_coroutine_threadsafe for now
1 parent cf681c2 commit 4a27a13

File tree

1 file changed

+30
-37
lines changed

1 file changed

+30
-37
lines changed

internal_filesystem/lib/websocket.py

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# Compatible with websocket-client's WebSocketApp API, using MicroPython aiohttp
44

55
import uasyncio as asyncio
6-
import _thread
76
import time
87
import ucollections
98
import aiohttp
@@ -34,18 +33,18 @@ class WebSocketConnectionClosedException(WebSocketException):
3433
class WebSocketTimeoutException(WebSocketException):
3534
pass
3635

37-
# Queue for cross-thread callback dispatching
36+
# Queue for callback dispatching (in same thread)
3837
_callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100
3938

4039
def _run_callback(callback, *args):
41-
"""Add callback to queue for main thread execution."""
40+
"""Add callback to queue for execution."""
4241
try:
4342
_callback_queue.append((callback, args))
4443
except IndexError:
4544
_log_error("Callback queue full, dropping callback")
4645

4746
def _process_callbacks():
48-
"""Process queued callbacks in the main thread."""
47+
"""Process queued callbacks."""
4948
while _callback_queue:
5049
try:
5150
callback, args = _callback_queue.popleft()
@@ -97,21 +96,19 @@ def __init__(
9796
self.ws = None
9897
self.session = None
9998
self.running = False
100-
self.thread = None
10199
self.ping_interval = 0
102100
self.ping_timeout = None
103101
self.ping_payload = ""
104102
self.last_ping_tm = 0
105103
self.last_pong_tm = 0
106104
self.has_errored = False
105+
self._loop = asyncio.get_event_loop()
107106

108107
def send(self, data, opcode=ABNF.OPCODE_TEXT):
109108
"""Send a message."""
110109
if not self.ws or not self.running:
111110
raise WebSocketConnectionClosedException("Connection is already closed.")
112-
# Schedule send in async loop
113-
loop = asyncio.get_event_loop()
114-
asyncio.run_coroutine_threadsafe(self._send_async(data, opcode), loop)
111+
asyncio.create_task(self._send_async(data, opcode))
115112

116113
def send_text(self, text_data):
117114
"""Send UTF-8 text."""
@@ -124,18 +121,22 @@ def send_bytes(self, data):
124121
def close(self, **kwargs):
125122
"""Close the WebSocket connection."""
126123
self.running = False
127-
if self.ws:
128-
loop = asyncio.get_event_loop()
129-
asyncio.run_coroutine_threadsafe(self.ws.close(), loop)
130-
if self.session:
131-
loop = asyncio.get_event_loop()
132-
asyncio.run_coroutine_threadsafe(self.session.__aexit__(None, None, None), loop)
124+
asyncio.create_task(self._close_async())
125+
126+
async def _close_async(self):
127+
"""Async close implementation."""
128+
try:
129+
if self.ws and not self.ws.ws.closed:
130+
await self.ws.close()
131+
if self.session:
132+
await self.session.__aexit__(None, None, None)
133+
except Exception as e:
134+
_log_error(f"Error closing WebSocket: {e}")
133135

134136
def _start_ping_thread(self):
135-
"""Simulate ping/pong in async loop."""
137+
"""Start ping task."""
136138
if self.ping_interval:
137-
loop = asyncio.get_event_loop()
138-
asyncio.run_coroutine_threadsafe(self._send_ping_async(), loop)
139+
asyncio.create_task(self._send_ping_async())
139140

140141
def _stop_ping_thread(self):
141142
"""No-op, ping handled in async loop."""
@@ -176,7 +177,7 @@ def run_forever(
176177
proxy_type=None,
177178
reconnect=None,
178179
):
179-
"""Run the WebSocket event loop."""
180+
"""Run the WebSocket event loop in the main thread."""
180181
if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type:
181182
raise WebSocketException("Proxy and sockopt not supported in MicroPython")
182183
if dispatcher:
@@ -193,25 +194,19 @@ def run_forever(
193194
self.ping_payload = ping_payload
194195
self.running = True
195196

196-
# Start async event loop in a separate thread
197-
self.thread = _thread.start_new_thread(self._run_async_loop, ())
198-
199-
# Main thread processes callbacks
197+
# Run the event loop in the main thread
200198
try:
201-
while self.running:
202-
_process_callbacks()
203-
time.sleep(0.01) # Yield to other tasks
199+
self._loop.run_until_complete(self._async_main())
204200
except KeyboardInterrupt:
201+
print("run_forever got KeyboardInterrupt")
205202
self.close()
206203
return False
204+
except Exception as e:
205+
_log_error(f"run_forever got general exception: {e} - returning True")
206+
self.has_errored = True
207+
return True
207208
return self.has_errored
208209

209-
def _run_async_loop(self):
210-
"""Run uasyncio event loop in a separate thread."""
211-
loop = asyncio.get_event_loop()
212-
loop.run_until_complete(self._async_main())
213-
loop.run_forever()
214-
215210
async def _async_main(self):
216211
"""Main async loop for WebSocket handling."""
217212
reconnect = 0 # Default, as RECONNECT may not be defined
@@ -227,7 +222,7 @@ async def _async_main(self):
227222
try:
228223
await self._connect_and_run()
229224
except Exception as e:
230-
print(f"_async_main got exception {e}")
225+
_log_error(f"_async_main got exception: {e}")
231226
self.has_errored = True
232227
_run_callback(self.on_error, self, e)
233228
if not reconnect:
@@ -239,11 +234,8 @@ async def _async_main(self):
239234

240235
# Cleanup
241236
self.running = False
242-
if self.ws:
243-
print("websocket.py: closing...")
244-
await self.ws.close()
245-
if self.session:
246-
await self.session.__aexit__(None, None, None)
237+
_log_debug("websocket.py: closing...")
238+
await self._close_async()
247239
_run_callback(self.on_close, self, None, None)
248240

249241
async def _connect_and_run(self):
@@ -270,6 +262,7 @@ async def _connect_and_run(self):
270262
raise WebSocketTimeoutException("ping/pong timed out")
271263

272264
# Process message
265+
_process_callbacks() # Process callbacks in same thread
273266
if msg.type == WSMsgType.TEXT:
274267
data = msg.data
275268
_run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True)

0 commit comments

Comments
 (0)