diff --git a/internal_filesystem/lib/mpos/board/linux.py b/internal_filesystem/lib/mpos/board/linux.py index b8cf1495..a1f464b5 100644 --- a/internal_filesystem/lib/mpos/board/linux.py +++ b/internal_filesystem/lib/mpos/board/linux.py @@ -122,12 +122,9 @@ def adc_to_voltage(adc_value): # LightsManager will not be initialized (functions will return False) # === SENSOR HARDWARE === -# Note: Desktop builds have no sensor hardware from mpos import SensorManager -# Initialize with no I2C bus - will detect MCU temp if available -# (On Linux desktop, this will fail gracefully but set _initialized flag) -SensorManager.init(None) +SensorManager.init_iio() # === CAMERA HARDWARE === diff --git a/internal_filesystem/lib/mpos/sensor_manager.py b/internal_filesystem/lib/mpos/sensor_manager.py index fa5eab57..d5d3d7e1 100644 --- a/internal_filesystem/lib/mpos/sensor_manager.py +++ b/internal_filesystem/lib/mpos/sensor_manager.py @@ -19,6 +19,7 @@ """ import time +import os try: import _thread _lock = _thread.allocate_lock() @@ -28,6 +29,7 @@ # Sensor type constants (matching Android SensorManager) TYPE_ACCELEROMETER = 1 # Units: m/s² (meters per second squared) +TYPE_MAGNETIC_FIELD = 2 # Units: μT (micro teslas) TYPE_GYROSCOPE = 4 # Units: deg/s (degrees per second) TYPE_TEMPERATURE = 13 # Units: °C (generic, returns first available - deprecated) TYPE_IMU_TEMPERATURE = 14 # Units: °C (IMU chip temperature) @@ -101,6 +103,7 @@ class SensorManager: # Class-level constants TYPE_ACCELEROMETER = TYPE_ACCELEROMETER + TYPE_MAGNETIC_FIELD = TYPE_MAGNETIC_FIELD TYPE_GYROSCOPE = TYPE_GYROSCOPE TYPE_TEMPERATURE = TYPE_TEMPERATURE TYPE_IMU_TEMPERATURE = TYPE_IMU_TEMPERATURE @@ -120,7 +123,7 @@ def get(cls): if cls._instance is None: cls._instance = cls() return cls._instance - + def init(self, i2c_bus, address=0x6B, mounted_position=FACING_SKY): """Initialize SensorManager. MCU temperature initializes immediately, IMU initializes on first use. @@ -146,6 +149,43 @@ def init(self, i2c_bus, address=0x6B, mounted_position=FACING_SKY): self._initialized = True return True + + def init_iio(self): + self._imu_driver = _IIODriver() + self._sensor_list = [ + Sensor( + name="Accelerometer", + sensor_type=TYPE_ACCELEROMETER, + vendor="Linux IIO", + version=1, + max_range="?", + resolution="?", + power_ma=10 + ), + Sensor( + name="Gyroscope", + sensor_type=TYPE_GYROSCOPE, + vendor="Linux IIO", + version=1, + max_range="?", + resolution="?", + power_ma=10 + ), + Sensor( + name="Temperature", + sensor_type=TYPE_IMU_TEMPERATURE, + vendor="Linux IIO", + version=1, + max_range="?", + resolution="?", + power_ma=10 + ) + ] + + self._load_calibration() + + self._initialized = True + return True def _ensure_imu_initialized(self): """Perform IMU initialization on first use (lazy initialization). @@ -227,7 +267,35 @@ def get_default_sensor(self, sensor_type): if sensor.type == sensor_type: return sensor return None - + + def read_sensor_once(self, sensor): + if sensor.type == TYPE_ACCELEROMETER: + if self._imu_driver: + ax, ay, az = self._imu_driver.read_acceleration() + if self._mounted_position == FACING_EARTH: + az *= -1 + return (ax, ay, az) + elif sensor.type == TYPE_GYROSCOPE: + if self._imu_driver: + return self._imu_driver.read_gyroscope() + elif sensor.type == TYPE_IMU_TEMPERATURE: + if self._imu_driver: + return self._imu_driver.read_temperature() + elif sensor.type == TYPE_SOC_TEMPERATURE: + if self._has_mcu_temperature: + import esp32 + return esp32.mcu_temperature() + elif sensor.type == TYPE_TEMPERATURE: + # Generic temperature - return first available (backward compatibility) + if self._imu_driver: + temp = self._imu_driver.read_temperature() + if temp is not None: + return temp + if self._has_mcu_temperature: + import esp32 + return esp32.mcu_temperature() + return None + def read_sensor(self, sensor): """Read sensor data synchronously. @@ -258,32 +326,7 @@ def read_sensor(self, sensor): for attempt in range(max_retries): try: - if sensor.type == TYPE_ACCELEROMETER: - if self._imu_driver: - ax, ay, az = self._imu_driver.read_acceleration() - if self._mounted_position == FACING_EARTH: - az *= -1 - return (ax, ay, az) - elif sensor.type == TYPE_GYROSCOPE: - if self._imu_driver: - return self._imu_driver.read_gyroscope() - elif sensor.type == TYPE_IMU_TEMPERATURE: - if self._imu_driver: - return self._imu_driver.read_temperature() - elif sensor.type == TYPE_SOC_TEMPERATURE: - if self._has_mcu_temperature: - import esp32 - return esp32.mcu_temperature() - elif sensor.type == TYPE_TEMPERATURE: - # Generic temperature - return first available (backward compatibility) - if self._imu_driver: - temp = self._imu_driver.read_temperature() - if temp is not None: - return temp - if self._has_mcu_temperature: - import esp32 - return esp32.mcu_temperature() - return None + return self.read_sensor_once(sensor) except Exception as e: error_msg = str(e) # Retry if sensor data not ready, otherwise fail immediately @@ -292,6 +335,7 @@ def read_sensor(self, sensor): time.sleep_ms(retry_delay_ms) continue else: + print("Exception reading sensor:", error_msg) return None return None @@ -717,6 +761,138 @@ def set_calibration(self, accel_offsets, gyro_offsets): raise NotImplementedError +class _IIODriver(_IMUDriver): + """ + Read sensor data via Linux IIO sysfs. + + Typical base path: + /sys/bus/iio/devices/iio:device0 + """ + accel_path: str + + def __init__(self): + self.accel_path = self.find_iio_device_with_file("in_accel_x_raw") + print("path:", self.accel_path) + + def _p(self, name: str): + return self.accel_path + "/" + name + + def _exists(self, name): + try: + os.stat(name) + return True + except OSError: + return False + + def _is_dir(self, path): + # MicroPython: stat tuple, mode is [0] + try: + st = os.stat(path) + mode = st[0] + # directory bit (POSIX): 0o040000 + return (mode & 0o170000) == 0o040000 + except OSError: + return False + + def find_iio_device_with_file(self, filename, base_dir="/sys/bus/iio/devices/"): + """ + Returns full path to iio:deviceX that contains given filename, + e.g. "/sys/bus/iio/devices/iio:device0" + + Returns None if not found. + """ + + print("Is dir? ", self._is_dir(base_dir), base_dir) + try: + entries = os.listdir(base_dir) + except OSError: + print("Error listing dir") + return None + + for e in entries: + print("Entry:", e) + if not e.startswith("iio:device"): + continue + + print("Entry:", e) + + dev_path = base_dir + "/" + e + if not self._is_dir(dev_path): + continue + + if self._exists(dev_path + "/" + filename): + return dev_path + + return None + + def _read_text(self, name: str) -> str: + p = name + print("Read: ", p) + f = open(p, "r") + try: + return f.readline().strip() + finally: + f.close() + + def _read_float(self, name: str) -> float: + return float(self._read_text(name)) + + def _read_int(self, name: str) -> int: + return int(self._read_text(name), 10) + + def _read_raw_scaled(self, raw_name: str, scale_name: str) -> float: + raw = self._read_int(raw_name) + scale = self._read_float(scale_name) + return raw * scale + + # ---------------------------- + # Public API (replacing I2C) + # ---------------------------- + + def read_temperature(self) -> float: + """ + Tries common IIO patterns: + - in_temp_input (already scaled, usually millidegree C) + - in_temp_raw + in_temp_scale + """ + if False: # os.path.exists(self._p("in_temp_input")): + v = self._read_float(self.accel_path + "/" + "in_temp_input") + # Many drivers expose millidegree Celsius here. + if abs(v) > 200: # heuristic: 25000 means 25°C + return v / 1000.0 + return v + + # Fallback: raw + scale + return self._read_raw_scaled(self.accel_path + "/" + "in_temp_raw", self.accel_path + "/" + "in_temp_scale") + + def read_acceleration(self) -> tuple[float, float, float]: + """ + Returns acceleration in m/s^2 if the kernel driver uses standard IIO scale. + Common names: + in_accel_{x,y,z}_raw + in_accel_scale + """ + scale_name = self.accel_path + "/" + "in_accel_scale" + + ax = self._read_raw_scaled(self.accel_path + "/" + "in_accel_x_raw", scale_name) + ay = self._read_raw_scaled(self.accel_path + "/" + "in_accel_y_raw", scale_name) + az = self._read_raw_scaled(self.accel_path + "/" + "in_accel_z_raw", scale_name) + + return (ax, ay, az) + + def read_gyroscope(self) -> tuple[float, float, float]: + """ + Returns angular velocity in rad/s if the kernel driver uses standard IIO scale. + Common names: + in_anglvel_{x,y,z}_raw + in_anglvel_scale + """ + scale_name = self.accel_path + "/" + "in_anglvel_scale" + + gx = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_x_raw", scale_name) + gy = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_y_raw", scale_name) + gz = self._read_raw_scaled(self.accel_path + "/" + "in_anglvel_z_raw", scale_name) + + return (gx, gy, gz) + class _QMI8658Driver(_IMUDriver): """Wrapper for QMI8658 IMU (Waveshare board).""" @@ -918,8 +1094,8 @@ def set_calibration(self, accel_offsets, gyro_offsets): _original_methods = {} _methods_to_delegate = [ - 'init', 'is_available', 'get_sensor_list', 'get_default_sensor', - 'read_sensor', 'calibrate_sensor', 'check_calibration_quality', + 'init', 'init_iio', 'is_available', 'get_sensor_list', 'get_default_sensor', + 'read_sensor', 'read_sensor_once', 'calibrate_sensor', 'check_calibration_quality', 'check_stationarity' ]