|
| 1 | +# websocket.py |
| 2 | +# MicroPython WebSocketApp implementation for python-nostr port |
| 3 | +# Compatible with websocket-client's WebSocketApp API, using MicroPython aiohttp |
| 4 | + |
| 5 | +import uasyncio as asyncio |
| 6 | +import _thread |
| 7 | +import time |
| 8 | +import ucollections |
| 9 | +import aiohttp |
| 10 | +from aiohttp import WSMsgType, ClientWebSocketResponse |
| 11 | + |
| 12 | +# Simplified logging for MicroPython |
| 13 | +def _log_debug(msg): |
| 14 | + print(f"DEBUG: {msg}") |
| 15 | + |
| 16 | +def _log_error(msg): |
| 17 | + print(f"ERROR: {msg}") |
| 18 | + |
| 19 | +# Simplified ABNF for opcode compatibility |
| 20 | +class ABNF: |
| 21 | + OPCODE_TEXT = 1 |
| 22 | + OPCODE_BINARY = 2 |
| 23 | + OPCODE_CLOSE = 8 |
| 24 | + OPCODE_PING = 9 |
| 25 | + OPCODE_PONG = 10 |
| 26 | + |
| 27 | +# Exceptions |
| 28 | +class WebSocketException(Exception): |
| 29 | + pass |
| 30 | + |
| 31 | +class WebSocketConnectionClosedException(WebSocketException): |
| 32 | + pass |
| 33 | + |
| 34 | +class WebSocketTimeoutException(WebSocketException): |
| 35 | + pass |
| 36 | + |
| 37 | +# Queue for cross-thread callback dispatching |
| 38 | +_callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100 |
| 39 | + |
| 40 | +def _run_callback(callback, *args): |
| 41 | + """Add callback to queue for main thread execution.""" |
| 42 | + try: |
| 43 | + _callback_queue.append((callback, args)) |
| 44 | + except IndexError: |
| 45 | + _log_error("Callback queue full, dropping callback") |
| 46 | + |
| 47 | +def _process_callbacks(): |
| 48 | + """Process queued callbacks in the main thread.""" |
| 49 | + while _callback_queue: |
| 50 | + try: |
| 51 | + callback, args = _callback_queue.popleft() |
| 52 | + if callback is not None: |
| 53 | + try: |
| 54 | + callback(*args) |
| 55 | + except Exception as e: |
| 56 | + _log_error(f"Error in callback {callback}: {e}") |
| 57 | + else: |
| 58 | + print("Not calling None callback") |
| 59 | + except IndexError: |
| 60 | + break # Queue is empty |
| 61 | + |
| 62 | +class WebSocketApp: |
| 63 | + def __init__( |
| 64 | + self, |
| 65 | + url, |
| 66 | + header=None, |
| 67 | + on_open=None, |
| 68 | + on_reconnect=None, |
| 69 | + on_message=None, |
| 70 | + on_error=None, |
| 71 | + on_close=None, |
| 72 | + on_ping=None, |
| 73 | + on_pong=None, |
| 74 | + on_cont_message=None, |
| 75 | + keep_running=True, # Ignored for compatibility |
| 76 | + get_mask_key=None, |
| 77 | + cookie=None, |
| 78 | + subprotocols=None, |
| 79 | + on_data=None, |
| 80 | + socket=None, |
| 81 | + ): |
| 82 | + self.url = url |
| 83 | + self.header = header if header is not None else {} |
| 84 | + self.cookie = cookie |
| 85 | + self.on_open = on_open |
| 86 | + self.on_reconnect = on_reconnect |
| 87 | + self.on_message = on_message |
| 88 | + self.on_data = on_data |
| 89 | + self.on_error = on_error |
| 90 | + self.on_close = on_close |
| 91 | + self.on_ping = on_ping |
| 92 | + self.on_pong = on_pong |
| 93 | + self.on_cont_message = on_cont_message |
| 94 | + self.get_mask_key = get_mask_key |
| 95 | + self.subprotocols = subprotocols |
| 96 | + self.prepared_socket = socket # Ignored, not supported |
| 97 | + self.ws = None |
| 98 | + self.session = None |
| 99 | + self.running = False |
| 100 | + self.thread = None |
| 101 | + self.ping_interval = 0 |
| 102 | + self.ping_timeout = None |
| 103 | + self.ping_payload = "" |
| 104 | + self.last_ping_tm = 0 |
| 105 | + self.last_pong_tm = 0 |
| 106 | + self.has_errored = False |
| 107 | + |
| 108 | + def send(self, data, opcode=ABNF.OPCODE_TEXT): |
| 109 | + """Send a message.""" |
| 110 | + if not self.ws or not self.running: |
| 111 | + raise WebSocketConnectionClosedException("Connection is already closed.") |
| 112 | + # Schedule send in async loop |
| 113 | + loop = asyncio.get_event_loop() |
| 114 | + asyncio.run_coroutine_threadsafe(self._send_async(data, opcode), loop) |
| 115 | + |
| 116 | + def send_text(self, text_data): |
| 117 | + """Send UTF-8 text.""" |
| 118 | + self.send(text_data, ABNF.OPCODE_TEXT) |
| 119 | + |
| 120 | + def send_bytes(self, data): |
| 121 | + """Send binary data.""" |
| 122 | + self.send(data, ABNF.OPCODE_BINARY) |
| 123 | + |
| 124 | + def close(self, **kwargs): |
| 125 | + """Close the WebSocket connection.""" |
| 126 | + self.running = False |
| 127 | + if self.ws: |
| 128 | + loop = asyncio.get_event_loop() |
| 129 | + asyncio.run_coroutine_threadsafe(self.ws.close(), loop) |
| 130 | + if self.session: |
| 131 | + loop = asyncio.get_event_loop() |
| 132 | + asyncio.run_coroutine_threadsafe(self.session.__aexit__(None, None, None), loop) |
| 133 | + |
| 134 | + def _start_ping_thread(self): |
| 135 | + """Simulate ping/pong in async loop.""" |
| 136 | + if self.ping_interval: |
| 137 | + loop = asyncio.get_event_loop() |
| 138 | + asyncio.run_coroutine_threadsafe(self._send_ping_async(), loop) |
| 139 | + |
| 140 | + def _stop_ping_thread(self): |
| 141 | + """No-op, ping handled in async loop.""" |
| 142 | + pass |
| 143 | + |
| 144 | + async def _send_ping_async(self): |
| 145 | + """Send periodic pings.""" |
| 146 | + while self.running and self.ping_interval: |
| 147 | + self.last_ping_tm = time.time() |
| 148 | + try: |
| 149 | + await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload) |
| 150 | + _log_debug("Sending ping") |
| 151 | + except Exception as e: |
| 152 | + _log_debug(f"Failed to send ping: {e}") |
| 153 | + await asyncio.sleep(self.ping_interval) |
| 154 | + |
| 155 | + def ready(self): |
| 156 | + """Check if connection is active.""" |
| 157 | + return self.ws is not None and self.running |
| 158 | + |
| 159 | + def run_forever( |
| 160 | + self, |
| 161 | + sockopt=None, |
| 162 | + sslopt=None, |
| 163 | + ping_interval=0, |
| 164 | + ping_timeout=None, |
| 165 | + ping_payload="", |
| 166 | + http_proxy_host=None, |
| 167 | + http_proxy_port=None, |
| 168 | + http_no_proxy=None, |
| 169 | + http_proxy_auth=None, |
| 170 | + http_proxy_timeout=None, |
| 171 | + skip_utf8_validation=False, |
| 172 | + host=None, |
| 173 | + origin=None, |
| 174 | + dispatcher=None, |
| 175 | + suppress_origin=False, |
| 176 | + proxy_type=None, |
| 177 | + reconnect=None, |
| 178 | + ): |
| 179 | + """Run the WebSocket event loop.""" |
| 180 | + if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type: |
| 181 | + raise WebSocketException("Proxy and sockopt not supported in MicroPython") |
| 182 | + if dispatcher: |
| 183 | + raise WebSocketException("Custom dispatcher not supported") |
| 184 | + if ping_timeout is not None and ping_timeout <= 0: |
| 185 | + raise WebSocketException("Ensure ping_timeout > 0") |
| 186 | + if ping_interval is not None and ping_interval < 0: |
| 187 | + raise WebSocketException("Ensure ping_interval >= 0") |
| 188 | + if ping_timeout and ping_interval and ping_interval <= ping_timeout: |
| 189 | + raise WebSocketException("Ensure ping_interval > ping_timeout") |
| 190 | + |
| 191 | + self.ping_interval = ping_interval |
| 192 | + self.ping_timeout = ping_timeout |
| 193 | + self.ping_payload = ping_payload |
| 194 | + self.running = True |
| 195 | + |
| 196 | + # Start async event loop in a separate thread |
| 197 | + self.thread = _thread.start_new_thread(self._run_async_loop, ()) |
| 198 | + |
| 199 | + # Main thread processes callbacks |
| 200 | + try: |
| 201 | + while self.running: |
| 202 | + _process_callbacks() |
| 203 | + time.sleep(0.01) # Yield to other tasks |
| 204 | + except KeyboardInterrupt: |
| 205 | + self.close() |
| 206 | + return False |
| 207 | + return self.has_errored |
| 208 | + |
| 209 | + def _run_async_loop(self): |
| 210 | + """Run uasyncio event loop in a separate thread.""" |
| 211 | + loop = asyncio.get_event_loop() |
| 212 | + loop.run_until_complete(self._async_main()) |
| 213 | + loop.run_forever() |
| 214 | + |
| 215 | + async def _async_main(self): |
| 216 | + """Main async loop for WebSocket handling.""" |
| 217 | + reconnect = 0 # Default, as RECONNECT may not be defined |
| 218 | + try: |
| 219 | + from websocket import RECONNECT |
| 220 | + reconnect = RECONNECT |
| 221 | + except ImportError: |
| 222 | + pass |
| 223 | + if reconnect is not None: |
| 224 | + reconnect = reconnect |
| 225 | + |
| 226 | + while self.running: |
| 227 | + try: |
| 228 | + await self._connect_and_run() |
| 229 | + except Exception as e: |
| 230 | + print(f"_async_main got exception {e}") |
| 231 | + self.has_errored = True |
| 232 | + _run_callback(self.on_error, self, e) |
| 233 | + if not reconnect: |
| 234 | + break |
| 235 | + _log_debug(f"Reconnecting after error: {e}") |
| 236 | + await asyncio.sleep(reconnect) |
| 237 | + if self.on_reconnect: |
| 238 | + _run_callback(self.on_reconnect, self) |
| 239 | + |
| 240 | + # Cleanup |
| 241 | + self.running = False |
| 242 | + if self.ws: |
| 243 | + print("websocket.py: closing...") |
| 244 | + await self.ws.close() |
| 245 | + if self.session: |
| 246 | + await self.session.__aexit__(None, None, None) |
| 247 | + _run_callback(self.on_close, self, None, None) |
| 248 | + |
| 249 | + async def _connect_and_run(self): |
| 250 | + """Connect and handle WebSocket messages.""" |
| 251 | + ssl_context = None |
| 252 | + if self.url.startswith("wss://"): |
| 253 | + import ssl |
| 254 | + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| 255 | + ssl_context.verify_mode = ssl.CERT_NONE |
| 256 | + |
| 257 | + self.session = aiohttp.ClientSession(headers=self.header) |
| 258 | + async with self.session.ws_connect(self.url, ssl=ssl_context) as ws: |
| 259 | + self.ws = ws |
| 260 | + _run_callback(self.on_open, self) |
| 261 | + self._start_ping_thread() |
| 262 | + |
| 263 | + async for msg in ws: |
| 264 | + if not self.running: |
| 265 | + break |
| 266 | + |
| 267 | + # Handle ping/pong timeout |
| 268 | + if self.ping_timeout and self.last_ping_tm: |
| 269 | + if time.time() - self.last_ping_tm > self.ping_timeout: |
| 270 | + raise WebSocketTimeoutException("ping/pong timed out") |
| 271 | + |
| 272 | + # Process message |
| 273 | + if msg.type == WSMsgType.TEXT: |
| 274 | + data = msg.data |
| 275 | + _run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True) |
| 276 | + _run_callback(self.on_message, self, data) |
| 277 | + elif msg.type == WSMsgType.BINARY: |
| 278 | + data = msg.data |
| 279 | + _run_callback(self.on_data, self, data, ABNF.OPCODE_BINARY, True) |
| 280 | + _run_callback(self.on_message, self, data) |
| 281 | + elif msg.type == WSMsgType.ERROR or ws.ws.closed: |
| 282 | + raise WebSocketConnectionClosedException("WebSocket closed") |
| 283 | + |
| 284 | + async def _send_async(self, data, opcode): |
| 285 | + """Async send implementation.""" |
| 286 | + try: |
| 287 | + if opcode == ABNF.OPCODE_TEXT: |
| 288 | + await self.ws.send_str(data) |
| 289 | + elif opcode == ABNF.OPCODE_BINARY: |
| 290 | + await self.ws.send_bytes(data) |
| 291 | + else: |
| 292 | + raise WebSocketException(f"Unsupported opcode: {opcode}") |
| 293 | + except Exception as e: |
| 294 | + _run_callback(self.on_error, self, e) |
| 295 | + |
| 296 | + def _callback(self, callback, *args): |
| 297 | + """Compatibility wrapper for callback execution.""" |
| 298 | + _run_callback(callback, self, *args) |
| 299 | + |
| 300 | + def _get_close_args(self, close_frame): |
| 301 | + """Extract close code and reason (simplified).""" |
| 302 | + return [None, None] # aiohttp doesn't provide close frame details |
| 303 | + |
| 304 | + def create_dispatcher(self, ping_timeout, dispatcher, is_ssl, handleDisconnect): |
| 305 | + """Not supported.""" |
| 306 | + raise WebSocketException("Custom dispatcher not supported") |
0 commit comments