Skip to content

Commit f32ebbd

Browse files
websocket works better now, the issue was an exception related to the empty queue
1 parent 971e49f commit f32ebbd

File tree

3 files changed

+83
-38
lines changed

3 files changed

+83
-38
lines changed

internal_filesystem/lib/queue.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ def get(self):
2525
with self._lock:
2626
if not self._queue:
2727
raise RuntimeError("Queue is empty")
28+
print("queue not empty, returning one object!!!")
2829
return self._queue.pop(0)
2930
else:
3031
if not self._queue:
3132
raise RuntimeError("Queue is empty")
33+
print("queue not empty, returning one object!!!")
3234
return self._queue.pop(0)
3335

3436
def qsize(self):

internal_filesystem/lib/threading.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, dae
1111
def start(self):
1212
# In MicroPython, _thread.start_new_thread doesn't support daemon threads directly
1313
# We store the daemon attribute for compatibility, but it may not affect termination
14+
#_thread.stack_size(32*1024)
1415
_thread.start_new_thread(self.run, ())
1516

1617
def run(self):

internal_filesystem/lib/websocket.py

Lines changed: 80 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
import aiohttp
99
from aiohttp import WSMsgType, ClientWebSocketResponse
1010

11-
# Simplified logging for MicroPython
11+
# Simplified logging for MicroPython with timestamps
1212
def _log_debug(msg):
13-
print(f"DEBUG: {msg}")
13+
print(f"[DEBUG {time.ticks_ms()}] {msg}")
1414

1515
def _log_error(msg):
16-
print(f"ERROR: {msg}")
16+
print(f"[ERROR {time.ticks_ms()}] {msg}")
1717

1818
# Simplified ABNF for opcode compatibility
1919
class ABNF:
@@ -33,31 +33,36 @@ class WebSocketConnectionClosedException(WebSocketException):
3333
class WebSocketTimeoutException(WebSocketException):
3434
pass
3535

36-
# Queue for callback dispatching (in same thread)
36+
# Queue for callback dispatching
3737
_callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100
3838

3939
def _run_callback(callback, *args):
4040
"""Add callback to queue for execution."""
4141
try:
4242
_callback_queue.append((callback, args))
43+
_log_debug(f"Queued callback {callback}, queue size: {len(_callback_queue)}")
4344
except IndexError:
4445
_log_error("Callback queue full, dropping callback")
4546

46-
def _process_callbacks():
47-
"""Process queued callbacks."""
48-
while _callback_queue:
49-
print("processing callbacks queue...")
50-
try:
51-
callback, args = _callback_queue.popleft()
52-
if callback is not None:
53-
try:
54-
callback(*args)
55-
except Exception as e:
56-
_log_error(f"Error in callback {callback}: {e}")
57-
else:
58-
print("Not calling None callback")
59-
except IndexError:
60-
break # Queue is empty
47+
async def _process_callbacks_async():
48+
"""Process queued callbacks asynchronously."""
49+
while True:
50+
while _callback_queue:
51+
_log_debug("Processing callbacks queue...")
52+
try:
53+
callback, args = _callback_queue.popleft()
54+
if callback is not None:
55+
_log_debug(f"Executing callback {callback} with args {args}")
56+
try:
57+
callback(*args)
58+
except Exception as e:
59+
_log_error(f"Error in callback {callback}: {e}")
60+
else:
61+
_log_debug("Skipping None callback")
62+
except IndexError:
63+
_log_debug("Callback queue empty")
64+
break
65+
await asyncio.sleep(0.01) # Yield to other tasks
6166

6267
class WebSocketApp:
6368
def __init__(
@@ -108,7 +113,9 @@ def __init__(
108113
def send(self, data, opcode=ABNF.OPCODE_TEXT):
109114
"""Send a message."""
110115
if not self.ws or not self.running:
116+
_log_error("Send failed: Connection closed or not running")
111117
raise WebSocketConnectionClosedException("Connection is already closed.")
118+
_log_debug(f"Scheduling send: opcode={opcode}, data={str(data)[:20]}...")
112119
asyncio.create_task(self._send_async(data, opcode))
113120

114121
def send_text(self, text_data):
@@ -121,43 +128,56 @@ def send_bytes(self, data):
121128

122129
def close(self, **kwargs):
123130
"""Close the WebSocket connection."""
131+
_log_debug("Close requested")
124132
self.running = False
125133
asyncio.create_task(self._close_async())
126134

127135
async def _close_async(self):
128136
"""Async close implementation."""
137+
_log_debug("Closing WebSocket connection")
129138
try:
130139
if self.ws and not self.ws.ws.closed:
140+
_log_debug("Sending WebSocket close frame")
131141
await self.ws.close()
142+
else:
143+
_log_debug("WebSocket already closed or not initialized")
132144
if self.session:
145+
_log_debug("Closing ClientSession")
133146
await self.session.__aexit__(None, None, None)
147+
else:
148+
_log_debug("No ClientSession to close")
134149
except Exception as e:
135150
_log_error(f"Error closing WebSocket: {e}")
136151

137-
def _start_ping_thread(self):
152+
def _start_ping_task(self):
138153
"""Start ping task."""
139154
if self.ping_interval:
140-
asyncio.create_task(self._send_ping_async())
155+
_log_debug(f"NOT Starting ping task with interval {self.ping_interval}s")
156+
#asyncio.create_task(self._send_ping_async())
141157

142158
def _stop_ping_thread(self):
143-
"""No-op, ping handled in async loop."""
159+
"""No-op, ping handled in async task."""
144160
pass
145161

146162
async def _send_ping_async(self):
147163
"""Send periodic pings."""
148164
while self.running and self.ping_interval:
149165
self.last_ping_tm = time.time()
150166
try:
151-
152-
#await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload)
153-
_log_debug("NOT Sending ping because it seems corrupt")
167+
if self.ws and not self.ws.ws.closed:
168+
_log_debug(f"Sending ping with payload: {self.ping_payload}")
169+
await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload)
170+
else:
171+
_log_debug("Skipping ping: WebSocket not connected")
154172
except Exception as e:
155-
_log_debug(f"Failed to send ping: {e}")
173+
_log_error(f"Failed to send ping: {e}")
156174
await asyncio.sleep(self.ping_interval)
157175

158176
def ready(self):
159177
"""Check if connection is active."""
160-
return self.ws is not None and self.running
178+
status = self.ws is not None and self.running
179+
_log_debug(f"Connection status: ready={status}")
180+
return status
161181

162182
def run_forever(
163183
self,
@@ -180,6 +200,7 @@ def run_forever(
180200
reconnect=None,
181201
):
182202
"""Run the WebSocket event loop in the main thread."""
203+
_log_debug("Starting run_forever")
183204
if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type:
184205
raise WebSocketException("Proxy and sockopt not supported in MicroPython")
185206
if dispatcher:
@@ -200,17 +221,19 @@ def run_forever(
200221
try:
201222
self._loop.run_until_complete(self._async_main())
202223
except KeyboardInterrupt:
203-
print("run_forever got KeyboardInterrupt")
224+
_log_debug("run_forever got KeyboardInterrupt")
204225
self.close()
205226
return False
206227
except Exception as e:
207-
_log_error(f"run_forever got general exception: {e} - returning True")
228+
_log_error(f"run_forever got general exception: {e}")
208229
self.has_errored = True
209230
return True
231+
_log_debug("run_forever completed")
210232
return self.has_errored
211233

212234
async def _async_main(self):
213235
"""Main async loop for WebSocket handling."""
236+
_log_debug("Starting _async_main")
214237
reconnect = 0 # Default, as RECONNECT may not be defined
215238
try:
216239
from websocket import RECONNECT
@@ -219,57 +242,70 @@ async def _async_main(self):
219242
pass
220243
if reconnect is not None:
221244
reconnect = reconnect
245+
_log_debug(f"Reconnect interval set to {reconnect}s")
246+
247+
# Start callback processing task
248+
callback_task = asyncio.create_task(_process_callbacks_async())
249+
_log_debug("Started callback processing task")
222250

223251
while self.running:
224-
print("self.running")
225-
time.sleep(1)
252+
_log_debug("Main loop iteration: self.running=True")
226253
try:
227254
await self._connect_and_run()
228255
except Exception as e:
229256
_log_error(f"_async_main got exception: {e}")
230257
self.has_errored = True
231258
_run_callback(self.on_error, self, e)
232259
if not reconnect:
260+
_log_debug("No reconnect configured, breaking loop")
233261
break
234-
_log_debug(f"Reconnecting after error: {e}")
262+
_log_debug(f"Reconnecting after error in {reconnect}s")
235263
await asyncio.sleep(reconnect)
236264
if self.on_reconnect:
237265
_run_callback(self.on_reconnect, self)
238266

239267
# Cleanup
268+
_log_debug("Initiating cleanup")
240269
self.running = False
241-
_log_debug("websocket.py: closing...")
270+
callback_task.cancel() # Stop callback task
271+
try:
272+
await callback_task
273+
except asyncio.CancelledError:
274+
_log_debug("Callback task cancelled")
242275
await self._close_async()
243276
_run_callback(self.on_close, self, None, None)
277+
_log_debug("_async_main completed")
244278

245279
async def _connect_and_run(self):
246280
"""Connect and handle WebSocket messages."""
281+
_log_debug(f"Connecting to {self.url}")
247282
ssl_context = None
248283
if self.url.startswith("wss://"):
249284
import ssl
250285
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
251286
ssl_context.verify_mode = ssl.CERT_NONE
287+
_log_debug("Using SSL with no certificate verification")
252288

253289
self.session = aiohttp.ClientSession(headers=self.header)
254290
async with self.session.ws_connect(self.url, ssl=ssl_context) as ws:
255291
self.ws = ws
256-
print("running on_open callback...")
292+
_log_debug("WebSocket connected, running on_open callback")
257293
_run_callback(self.on_open, self)
258-
print("done running on_open callback.")
259-
self._start_ping_thread()
294+
self._start_ping_task()
260295

261296
async for msg in ws:
262-
print(f"websocket.py received msg: type {msg.type} - {msg.data[0:20]}")
297+
_log_debug(f"Received msg: type={msg.type}, data={str(msg.data)[:30]}...")
263298
if not self.running:
299+
_log_debug("Not running, breaking message loop")
264300
break
265301

266302
# Handle ping/pong timeout
267303
if self.ping_timeout and self.last_ping_tm:
268304
if time.time() - self.last_ping_tm > self.ping_timeout:
305+
_log_error("Ping/pong timed out")
269306
raise WebSocketTimeoutException("ping/pong timed out")
270307

271308
# Process message
272-
_process_callbacks() # Process callbacks in same thread
273309
if msg.type == WSMsgType.TEXT:
274310
data = msg.data
275311
_run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True)
@@ -279,18 +315,22 @@ async def _connect_and_run(self):
279315
_run_callback(self.on_data, self, data, ABNF.OPCODE_BINARY, True)
280316
_run_callback(self.on_message, self, data)
281317
elif msg.type == WSMsgType.ERROR or ws.ws.closed:
318+
_log_error("WebSocket error or closed")
282319
raise WebSocketConnectionClosedException("WebSocket closed")
283320

284321
async def _send_async(self, data, opcode):
285322
"""Async send implementation."""
323+
_log_debug(f"Sending: opcode={opcode}, data={str(data)[:20]}...")
286324
try:
287325
if opcode == ABNF.OPCODE_TEXT:
288326
await self.ws.send_str(data)
289327
elif opcode == ABNF.OPCODE_BINARY:
290328
await self.ws.send_bytes(data)
291329
else:
292330
raise WebSocketException(f"Unsupported opcode: {opcode}")
331+
_log_debug("Send successful")
293332
except Exception as e:
333+
_log_error(f"Send failed: {e}")
294334
_run_callback(self.on_error, self, e)
295335

296336
def _callback(self, callback, *args):
@@ -299,8 +339,10 @@ def _callback(self, callback, *args):
299339

300340
def _get_close_args(self, close_frame):
301341
"""Extract close code and reason (simplified)."""
342+
_log_debug("Getting close args (not supported)")
302343
return [None, None] # aiohttp doesn't provide close frame details
303344

304345
def create_dispatcher(self, ping_timeout, dispatcher, is_ssl, handleDisconnect):
305346
"""Not supported."""
347+
_log_error("Custom dispatcher not supported")
306348
raise WebSocketException("Custom dispatcher not supported")

0 commit comments

Comments
 (0)