44import time
55import _thread
66
7- import mpos .apps
7+ from mpos .apps import Activity
88import mpos .info
99import mpos .ui
1010
11- main_screen = None
1211
13- status_label = None
14- install_button = None
12+ class OSUpdate (Activity ):
13+
14+ status_label = None
15+ install_button = None
16+ main_screen = None
17+ keep_running = True
18+
19+ def onCreate (self ):
20+ self .main_screen = lv .obj ()
21+ install_button = lv .button (self .main_screen )
22+ install_button .align (lv .ALIGN .TOP_RIGHT , 0 , mpos .ui .NOTIFICATION_BAR_HEIGHT )
23+ install_button .add_flag (lv .obj .FLAG .HIDDEN ) # button will be shown if there is an update available
24+ install_button .set_size (lv .SIZE_CONTENT , lv .pct (20 ))
25+ install_label = lv .label (install_button )
26+ install_label .set_text ("Update OS" )
27+ install_label .center ()
28+ self .status_label = lv .label (self .main_screen )
29+ self .status_label .align (lv .ALIGN .TOP_LEFT ,0 ,mpos .ui .NOTIFICATION_BAR_HEIGHT )
30+ self .setContentView (self .main_screen )
31+
32+ def onStart (self , screen ):
33+ network_connected = True
34+ try :
35+ import network
36+ network_connected = network .WLAN (network .STA_IF ).isconnected ()
37+ except Exception as e :
38+ print ("Warning: could not check WLAN status:" , str (e ))
39+
40+ if not network_connected :
41+ self .status_label .set_text ("Error: WiFi is not connected." )
42+ time .sleep (10 )
43+ else :
44+ print ("Showing update info..." )
45+ self .show_update_info ()
46+
47+ print ("osupdate.py finished" )
48+
49+ def onStop ():
50+ self .keep_running = False # this is checked by the update_with_lvgl thread
51+
52+ def show_update_info (self ):
53+ self .status_label .set_text ("Checking for OS updates..." )
54+ # URL of the JSON file
55+ url = "http://demo.lnpiggy.com:2121/osupdate.json" # Adjust if the actual JSON URL differs
56+ print (f"osudpate.py: fetching { url } " )
57+ try :
58+ print ("doing requests.get()" )
59+ # Download the JSON
60+ response = requests .get (url )
61+ # Check if request was successful
62+ if response .status_code == 200 :
63+ # Parse JSON
64+ osupdate = ujson .loads (response .text )
65+ # Access attributes
66+ version = osupdate ["version" ]
67+ download_url = osupdate ["download_url" ]
68+ changelog = osupdate ["changelog" ]
69+ # Print the values
70+ print ("Version:" , version )
71+ print ("Download URL:" , download_url )
72+ print ("Changelog:" , changelog )
73+ self .handle_update_info (version , download_url , changelog )
74+ else :
75+ print ("Failed to download JSON. Status code:" , response .status_code )
76+ # Close response
77+ response .close ()
78+ except Exception as e :
79+ print ("Error:" , str (e ))
80+
81+ def handle_update_info (self , version , download_url , changelog ):
82+ label = f"Installed OS version: { mpos .info .CURRENT_OS_VERSION } \n "
83+ if compare_versions (version , mpos .info .CURRENT_OS_VERSION ):
84+ label += "Available new"
85+ self .install_button .remove_flag (lv .obj .FLAG .HIDDEN )
86+ self .install_button .add_event_cb (lambda e , u = download_url : self .install_button_click (u ), lv .EVENT .CLICKED , None )
87+ else :
88+ label += "isn't older than latest"
89+ label += f" version: { version } \n \n Details:\n \n { changelog } "
90+ self .status_label .set_text (label )
91+
92+
93+ def install_button_click (self , download_url ):
94+ print (f"install_button_click for url { download_url } " )
95+ try :
96+ _thread .stack_size (mpos .apps .good_stack_size ())
97+ _thread .start_new_thread (self .update_with_lvgl , (download_url ,))
98+ except Exception as e :
99+ print ("Could not start update_with_lvgl thread: " , e )
100+
101+ # Custom OTA update with LVGL progress
102+ def update_with_lvgl (self , url ):
103+ self .install_button .add_flag (lv .obj .FLAG .HIDDEN ) # or change to cancel button?
104+ self .status_label .set_text ("Update in progress.\n Navigate away to cancel." )
105+ import ota .update
106+ import ota .status
107+ ota .status .status ()
108+ from esp32 import Partition
109+ current_partition = Partition (Partition .RUNNING )
110+ print (f"Current partition: { current_partition } " )
111+ next_partition = current_partition .get_next_update ()
112+ print (f"Next partition: { next_partition } " )
113+ label = lv .label (self .main_screen )
114+ label .set_text ("OS Update: 0.00%" )
115+ label .align (lv .ALIGN .CENTER , 0 , - 30 )
116+ progress_bar = lv .bar (self .main_screen )
117+ progress_bar .set_size (200 , 20 )
118+ progress_bar .align (lv .ALIGN .BOTTOM_MID , 0 , - 50 )
119+ progress_bar .set_range (0 , 100 )
120+ progress_bar .set_value (0 , lv .ANIM .OFF )
121+ def progress_callback (percent ):
122+ print (f"OTA Update: { percent :.1f} %" )
123+ label .set_text (f"OTA Update: { percent :.2f} %" )
124+ progress_bar .set_value (int (percent ), lv .ANIM .ON )
125+ current = Partition (Partition .RUNNING )
126+ next_partition = current .get_next_update ()
127+ response = requests .get (url , stream = True )
128+ total_size = int (response .headers .get ('Content-Length' , 0 ))
129+ bytes_written = 0
130+ chunk_size = 4096
131+ i = 0
132+ print (f"Starting OTA update of size: { total_size } " )
133+ while self .keep_running : # stop if the user navigates away
134+ time .sleep_ms (100 ) # don't hog the CPU
135+ chunk = response .raw .read (chunk_size )
136+ if not chunk :
137+ print ("No chunk, breaking..." )
138+ break
139+ if len (chunk ) < chunk_size :
140+ print (f"Padding chunk { i } from { len (chunk )} to { chunk_size } bytes" )
141+ chunk = chunk + b'\xFF ' * (chunk_size - len (chunk ))
142+ print (f"Writing chunk { i } " )
143+ next_partition .writeblocks (i , chunk )
144+ bytes_written += len (chunk )
145+ i += 1
146+ if total_size :
147+ progress_callback (bytes_written / total_size * 100 )
148+ response .close ()
149+ if bytes_written >= total_size : # if the update was completely installed
150+ next_partition .set_boot ()
151+ import machine
152+ machine .reset ()
153+
154+
155+ # Non-class functions:
15156
16157def compare_versions (ver1 : str , ver2 : str ) -> bool :
17158 """Compare two version numbers (e.g., '1.2.3' vs '4.5.6').
@@ -33,137 +174,3 @@ def compare_versions(ver1: str, ver2: str) -> bool:
33174 return False
34175 print (f"Versions are equal or { ver1 } is not greater than { ver2 } " )
35176 return False
36-
37-
38- # Custom OTA update with LVGL progress
39- def update_with_lvgl (url ):
40- global install_button , status_label , main_screen
41- install_button .add_flag (lv .obj .FLAG .HIDDEN ) # or change to cancel button?
42- status_label .set_text ("Update in progress.\n Navigate away to cancel." )
43- import ota .update
44- import ota .status
45- ota .status .status ()
46- from esp32 import Partition
47- current_partition = Partition (Partition .RUNNING )
48- print (f"Current partition: { current_partition } " )
49- next_partition = current_partition .get_next_update ()
50- print (f"Next partition: { next_partition } " )
51- label = lv .label (main_screen )
52- label .set_text ("OS Update: 0.00%" )
53- label .align (lv .ALIGN .CENTER , 0 , - 30 )
54- progress_bar = lv .bar (main_screen )
55- progress_bar .set_size (200 , 20 )
56- progress_bar .align (lv .ALIGN .BOTTOM_MID , 0 , - 50 )
57- progress_bar .set_range (0 , 100 )
58- progress_bar .set_value (0 , lv .ANIM .OFF )
59- def progress_callback (percent ):
60- print (f"OTA Update: { percent :.1f} %" )
61- label .set_text (f"OTA Update: { percent :.2f} %" )
62- progress_bar .set_value (int (percent ), lv .ANIM .ON )
63- current = Partition (Partition .RUNNING )
64- next_partition = current .get_next_update ()
65- response = requests .get (url , stream = True )
66- total_size = int (response .headers .get ('Content-Length' , 0 ))
67- bytes_written = 0
68- chunk_size = 4096
69- i = 0
70- print (f"Starting OTA update of size: { total_size } " )
71- while main_screen == lv .screen_active (): # stop if the user navigates away
72- time .sleep_ms (100 ) # don't hog the CPU
73- chunk = response .raw .read (chunk_size )
74- if not chunk :
75- print ("No chunk, breaking..." )
76- break
77- if len (chunk ) < chunk_size :
78- print (f"Padding chunk { i } from { len (chunk )} to { chunk_size } bytes" )
79- chunk = chunk + b'\xFF ' * (chunk_size - len (chunk ))
80- print (f"Writing chunk { i } " )
81- next_partition .writeblocks (i , chunk )
82- bytes_written += len (chunk )
83- i += 1
84- if total_size :
85- progress_callback (bytes_written / total_size * 100 )
86- response .close ()
87- if bytes_written >= total_size : # if the update was completely installed
88- next_partition .set_boot ()
89- import machine
90- machine .reset ()
91-
92- def install_button_click (download_url ):
93- print (f"install_button_click for url { download_url } " )
94- try :
95- _thread .stack_size (mpos .apps .good_stack_size ())
96- _thread .start_new_thread (update_with_lvgl , (download_url ,))
97- except Exception as e :
98- print ("Could not start update_with_lvgl thread: " , e )
99-
100- def handle_update_info (version , download_url , changelog ):
101- global install_button , status_label
102- label = f"Installed OS version: { mpos .info .CURRENT_OS_VERSION } \n "
103- if compare_versions (version , mpos .info .CURRENT_OS_VERSION ):
104- label += "Available new"
105- install_button .remove_flag (lv .obj .FLAG .HIDDEN )
106- install_button .add_event_cb (lambda e , u = download_url : install_button_click (u ), lv .EVENT .CLICKED , None )
107- else :
108- label += "matches latest"
109- label += f" version: { version } \n \n Details:\n \n { changelog } "
110- status_label .set_text (label )
111-
112- def show_update_info ():
113- global status_label
114- status_label .set_text ("Checking for OS updates..." )
115- # URL of the JSON file
116- url = "http://demo.lnpiggy.com:2121/osupdate.json" # Adjust if the actual JSON URL differs
117- print (f"osudpate.py: fetching { url } " )
118- try :
119- print ("doing requests.get()" )
120- # Download the JSON
121- response = requests .get (url )
122- # Check if request was successful
123- if response .status_code == 200 :
124- # Parse JSON
125- osupdate = ujson .loads (response .text )
126- # Access attributes
127- version = osupdate ["version" ]
128- download_url = osupdate ["download_url" ]
129- changelog = osupdate ["changelog" ]
130- # Print the values
131- print ("Version:" , version )
132- print ("Download URL:" , download_url )
133- print ("Changelog:" , changelog )
134- handle_update_info (version , download_url , changelog )
135- else :
136- print ("Failed to download JSON. Status code:" , response .status_code )
137- # Close response
138- response .close ()
139- except Exception as e :
140- print ("Error:" , str (e ))
141-
142-
143- main_screen = lv .obj ()
144- install_button = lv .button (main_screen )
145- install_button .align (lv .ALIGN .TOP_RIGHT , 0 , mpos .ui .NOTIFICATION_BAR_HEIGHT )
146- install_button .add_flag (lv .obj .FLAG .HIDDEN ) # button will be shown if there is an update available
147- install_button .set_size (lv .SIZE_CONTENT , lv .pct (20 ))
148- install_label = lv .label (install_button )
149- install_label .set_text ("Update OS" )
150- install_label .center ()
151- status_label = lv .label (main_screen )
152- status_label .align (lv .ALIGN .TOP_LEFT ,0 ,mpos .ui .NOTIFICATION_BAR_HEIGHT )
153- mpos .ui .load_screen (main_screen )
154-
155- network_connected = True
156- try :
157- import network
158- network_connected = network .WLAN (network .STA_IF ).isconnected ()
159- except Exception as e :
160- print ("Warning: could not check WLAN status:" , str (e ))
161-
162- if not network_connected :
163- status_label .set_text ("Error: WiFi is not connected." )
164- time .sleep (10 )
165- else :
166- print ("Showing update info..." )
167- show_update_info ()
168-
169- print ("osupdate.py finished" )
0 commit comments