Skip to content

Commit cf681c2

Browse files
websocket.py: fix decue()
1 parent 9ff5dfa commit cf681c2

File tree

1 file changed

+306
-0
lines changed

1 file changed

+306
-0
lines changed
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
# websocket.py
2+
# MicroPython WebSocketApp implementation for python-nostr port
3+
# Compatible with websocket-client's WebSocketApp API, using MicroPython aiohttp
4+
5+
import uasyncio as asyncio
6+
import _thread
7+
import time
8+
import ucollections
9+
import aiohttp
10+
from aiohttp import WSMsgType, ClientWebSocketResponse
11+
12+
# Simplified logging for MicroPython
13+
def _log_debug(msg):
14+
print(f"DEBUG: {msg}")
15+
16+
def _log_error(msg):
17+
print(f"ERROR: {msg}")
18+
19+
# Simplified ABNF for opcode compatibility
20+
class ABNF:
21+
OPCODE_TEXT = 1
22+
OPCODE_BINARY = 2
23+
OPCODE_CLOSE = 8
24+
OPCODE_PING = 9
25+
OPCODE_PONG = 10
26+
27+
# Exceptions
28+
class WebSocketException(Exception):
29+
pass
30+
31+
class WebSocketConnectionClosedException(WebSocketException):
32+
pass
33+
34+
class WebSocketTimeoutException(WebSocketException):
35+
pass
36+
37+
# Queue for cross-thread callback dispatching
38+
_callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100
39+
40+
def _run_callback(callback, *args):
41+
"""Add callback to queue for main thread execution."""
42+
try:
43+
_callback_queue.append((callback, args))
44+
except IndexError:
45+
_log_error("Callback queue full, dropping callback")
46+
47+
def _process_callbacks():
48+
"""Process queued callbacks in the main thread."""
49+
while _callback_queue:
50+
try:
51+
callback, args = _callback_queue.popleft()
52+
if callback is not None:
53+
try:
54+
callback(*args)
55+
except Exception as e:
56+
_log_error(f"Error in callback {callback}: {e}")
57+
else:
58+
print("Not calling None callback")
59+
except IndexError:
60+
break # Queue is empty
61+
62+
class WebSocketApp:
63+
def __init__(
64+
self,
65+
url,
66+
header=None,
67+
on_open=None,
68+
on_reconnect=None,
69+
on_message=None,
70+
on_error=None,
71+
on_close=None,
72+
on_ping=None,
73+
on_pong=None,
74+
on_cont_message=None,
75+
keep_running=True, # Ignored for compatibility
76+
get_mask_key=None,
77+
cookie=None,
78+
subprotocols=None,
79+
on_data=None,
80+
socket=None,
81+
):
82+
self.url = url
83+
self.header = header if header is not None else {}
84+
self.cookie = cookie
85+
self.on_open = on_open
86+
self.on_reconnect = on_reconnect
87+
self.on_message = on_message
88+
self.on_data = on_data
89+
self.on_error = on_error
90+
self.on_close = on_close
91+
self.on_ping = on_ping
92+
self.on_pong = on_pong
93+
self.on_cont_message = on_cont_message
94+
self.get_mask_key = get_mask_key
95+
self.subprotocols = subprotocols
96+
self.prepared_socket = socket # Ignored, not supported
97+
self.ws = None
98+
self.session = None
99+
self.running = False
100+
self.thread = None
101+
self.ping_interval = 0
102+
self.ping_timeout = None
103+
self.ping_payload = ""
104+
self.last_ping_tm = 0
105+
self.last_pong_tm = 0
106+
self.has_errored = False
107+
108+
def send(self, data, opcode=ABNF.OPCODE_TEXT):
109+
"""Send a message."""
110+
if not self.ws or not self.running:
111+
raise WebSocketConnectionClosedException("Connection is already closed.")
112+
# Schedule send in async loop
113+
loop = asyncio.get_event_loop()
114+
asyncio.run_coroutine_threadsafe(self._send_async(data, opcode), loop)
115+
116+
def send_text(self, text_data):
117+
"""Send UTF-8 text."""
118+
self.send(text_data, ABNF.OPCODE_TEXT)
119+
120+
def send_bytes(self, data):
121+
"""Send binary data."""
122+
self.send(data, ABNF.OPCODE_BINARY)
123+
124+
def close(self, **kwargs):
125+
"""Close the WebSocket connection."""
126+
self.running = False
127+
if self.ws:
128+
loop = asyncio.get_event_loop()
129+
asyncio.run_coroutine_threadsafe(self.ws.close(), loop)
130+
if self.session:
131+
loop = asyncio.get_event_loop()
132+
asyncio.run_coroutine_threadsafe(self.session.__aexit__(None, None, None), loop)
133+
134+
def _start_ping_thread(self):
135+
"""Simulate ping/pong in async loop."""
136+
if self.ping_interval:
137+
loop = asyncio.get_event_loop()
138+
asyncio.run_coroutine_threadsafe(self._send_ping_async(), loop)
139+
140+
def _stop_ping_thread(self):
141+
"""No-op, ping handled in async loop."""
142+
pass
143+
144+
async def _send_ping_async(self):
145+
"""Send periodic pings."""
146+
while self.running and self.ping_interval:
147+
self.last_ping_tm = time.time()
148+
try:
149+
await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload)
150+
_log_debug("Sending ping")
151+
except Exception as e:
152+
_log_debug(f"Failed to send ping: {e}")
153+
await asyncio.sleep(self.ping_interval)
154+
155+
def ready(self):
156+
"""Check if connection is active."""
157+
return self.ws is not None and self.running
158+
159+
def run_forever(
160+
self,
161+
sockopt=None,
162+
sslopt=None,
163+
ping_interval=0,
164+
ping_timeout=None,
165+
ping_payload="",
166+
http_proxy_host=None,
167+
http_proxy_port=None,
168+
http_no_proxy=None,
169+
http_proxy_auth=None,
170+
http_proxy_timeout=None,
171+
skip_utf8_validation=False,
172+
host=None,
173+
origin=None,
174+
dispatcher=None,
175+
suppress_origin=False,
176+
proxy_type=None,
177+
reconnect=None,
178+
):
179+
"""Run the WebSocket event loop."""
180+
if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type:
181+
raise WebSocketException("Proxy and sockopt not supported in MicroPython")
182+
if dispatcher:
183+
raise WebSocketException("Custom dispatcher not supported")
184+
if ping_timeout is not None and ping_timeout <= 0:
185+
raise WebSocketException("Ensure ping_timeout > 0")
186+
if ping_interval is not None and ping_interval < 0:
187+
raise WebSocketException("Ensure ping_interval >= 0")
188+
if ping_timeout and ping_interval and ping_interval <= ping_timeout:
189+
raise WebSocketException("Ensure ping_interval > ping_timeout")
190+
191+
self.ping_interval = ping_interval
192+
self.ping_timeout = ping_timeout
193+
self.ping_payload = ping_payload
194+
self.running = True
195+
196+
# Start async event loop in a separate thread
197+
self.thread = _thread.start_new_thread(self._run_async_loop, ())
198+
199+
# Main thread processes callbacks
200+
try:
201+
while self.running:
202+
_process_callbacks()
203+
time.sleep(0.01) # Yield to other tasks
204+
except KeyboardInterrupt:
205+
self.close()
206+
return False
207+
return self.has_errored
208+
209+
def _run_async_loop(self):
210+
"""Run uasyncio event loop in a separate thread."""
211+
loop = asyncio.get_event_loop()
212+
loop.run_until_complete(self._async_main())
213+
loop.run_forever()
214+
215+
async def _async_main(self):
216+
"""Main async loop for WebSocket handling."""
217+
reconnect = 0 # Default, as RECONNECT may not be defined
218+
try:
219+
from websocket import RECONNECT
220+
reconnect = RECONNECT
221+
except ImportError:
222+
pass
223+
if reconnect is not None:
224+
reconnect = reconnect
225+
226+
while self.running:
227+
try:
228+
await self._connect_and_run()
229+
except Exception as e:
230+
print(f"_async_main got exception {e}")
231+
self.has_errored = True
232+
_run_callback(self.on_error, self, e)
233+
if not reconnect:
234+
break
235+
_log_debug(f"Reconnecting after error: {e}")
236+
await asyncio.sleep(reconnect)
237+
if self.on_reconnect:
238+
_run_callback(self.on_reconnect, self)
239+
240+
# Cleanup
241+
self.running = False
242+
if self.ws:
243+
print("websocket.py: closing...")
244+
await self.ws.close()
245+
if self.session:
246+
await self.session.__aexit__(None, None, None)
247+
_run_callback(self.on_close, self, None, None)
248+
249+
async def _connect_and_run(self):
250+
"""Connect and handle WebSocket messages."""
251+
ssl_context = None
252+
if self.url.startswith("wss://"):
253+
import ssl
254+
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
255+
ssl_context.verify_mode = ssl.CERT_NONE
256+
257+
self.session = aiohttp.ClientSession(headers=self.header)
258+
async with self.session.ws_connect(self.url, ssl=ssl_context) as ws:
259+
self.ws = ws
260+
_run_callback(self.on_open, self)
261+
self._start_ping_thread()
262+
263+
async for msg in ws:
264+
if not self.running:
265+
break
266+
267+
# Handle ping/pong timeout
268+
if self.ping_timeout and self.last_ping_tm:
269+
if time.time() - self.last_ping_tm > self.ping_timeout:
270+
raise WebSocketTimeoutException("ping/pong timed out")
271+
272+
# Process message
273+
if msg.type == WSMsgType.TEXT:
274+
data = msg.data
275+
_run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True)
276+
_run_callback(self.on_message, self, data)
277+
elif msg.type == WSMsgType.BINARY:
278+
data = msg.data
279+
_run_callback(self.on_data, self, data, ABNF.OPCODE_BINARY, True)
280+
_run_callback(self.on_message, self, data)
281+
elif msg.type == WSMsgType.ERROR or ws.ws.closed:
282+
raise WebSocketConnectionClosedException("WebSocket closed")
283+
284+
async def _send_async(self, data, opcode):
285+
"""Async send implementation."""
286+
try:
287+
if opcode == ABNF.OPCODE_TEXT:
288+
await self.ws.send_str(data)
289+
elif opcode == ABNF.OPCODE_BINARY:
290+
await self.ws.send_bytes(data)
291+
else:
292+
raise WebSocketException(f"Unsupported opcode: {opcode}")
293+
except Exception as e:
294+
_run_callback(self.on_error, self, e)
295+
296+
def _callback(self, callback, *args):
297+
"""Compatibility wrapper for callback execution."""
298+
_run_callback(callback, self, *args)
299+
300+
def _get_close_args(self, close_frame):
301+
"""Extract close code and reason (simplified)."""
302+
return [None, None] # aiohttp doesn't provide close frame details
303+
304+
def create_dispatcher(self, ping_timeout, dispatcher, is_ssl, handleDisconnect):
305+
"""Not supported."""
306+
raise WebSocketException("Custom dispatcher not supported")

0 commit comments

Comments
 (0)