123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- #!/usr/bin/env python3
-
- # Uses the Gitea API to fetch the latest revision of the project from a repo.
- #
- # Inspired by:
- # https://github.com/olivergregorius/micropython_ota
- #
- # ----------------------------------------------------------------------------
- # "THE BEER-WARE LICENSE" (Revision 42):
- # <xythobuz@xythobuz.de> wrote this file. As long as you retain this notice
- # you can do whatever you want with this stuff. If we meet some day, and you
- # think this stuff is worth it, you can buy me a beer in return. Thomas Buck
- # ----------------------------------------------------------------------------
-
- import util
- import sys
- import os
- import gc
-
- # to check if we're actually running on MicroPython
- on_pico = False
- try:
- import machine
- on_pico = True
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
-
- class PicoOTA:
- def __init__(self, host, repo, branch = None):
- self.host = host
- self.repo = repo
- self.branch = branch
-
- self.get = None
- self.update_path = "."
- self.exe_path = ""
- self.version_file = "ota_version"
- self.blacklist = []
-
- self.gui = None
- self.text = None
-
- def ui(self, g, t):
- self.gui = g
- self.text = t
-
- def path(self, p):
- self.update_path = p
-
- def exe(self, e):
- self.exe_path = e
-
- def ignore(self, path):
- if not path in self.blacklist:
- self.blacklist.append(path)
-
- def fetch(self, url):
- # lazily initialize WiFi
- if self.get == None:
- self.get = util.getRequests()
- if self.get == None:
- return None
-
- try:
- #print("GET " + url)
- r = self.get(url)
-
- # explitic close on Response object not needed,
- # handled internally by r.content / r.text / r.json()
- # to avoid this automatic behaviour, first access r.content
- # to trigger caching it in response object, then close
- # socket.
- tmp = r.content
- if hasattr(r, "raw"):
- if r.raw != None:
- r.raw.close()
- r.raw = None
-
- return r
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
- return None
-
- def get_stored_commit(self):
- current = "unknown"
- try:
- f = open(self.update_path + "/" + self.version_file, "r")
- current = f.readline().strip()
- f.close()
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
- return current
-
- def get_previous_commit(self, commit):
- r = self.fetch(self.host + "/" + self.repo + "/commit/" + commit).text
- for line in r.splitlines():
- if not (self.repo + "/commit/") in line:
- continue
-
- line = line[line.find("/commit/") : ][8 : ][ : 40]
- if line != commit:
- return line
- return "unknown"
-
- def check(self, verbose = False):
- if self.branch == None:
- # get default branch
- r = self.fetch(self.host + "/api/v1/repos/" + self.repo).json()
- self.branch = r["default_branch"]
-
- if verbose:
- print("Selected default branch " + self.branch)
-
- # check for latest commit in branch
- r = self.fetch(self.host + "/api/v1/repos/" + self.repo + "/branches/" + self.branch).json()
- commit = r["commit"]["id"]
-
- if verbose:
- print("Latest commit is " + commit)
-
- current = self.get_stored_commit()
-
- if verbose:
- if current != commit:
- print("Current commit " + current + " is different!")
- else:
- print("No update required")
-
- return (current != commit, commit)
-
- def update_to_commit(self, commit, verbose = False):
- # list all files for a commit
- r = self.fetch(self.host + "/api/v1/repos/" + self.repo + "/git/trees/" + commit).json()
-
- # TODO does not support sub-folders
-
- if verbose:
- if len(r["tree"]) > 0:
- print(str(len(r["tree"])) + " files in repo:")
- for f in r["tree"]:
- if f["path"] in self.blacklist:
- print(" - (IGNORED) " + f["path"])
- else:
- print(" - " + f["path"])
- else:
- print("No files in repo?!")
-
- for f in r["tree"]:
- if f["path"] in self.blacklist:
- continue
-
- if (self.gui != None) and (self.text != None):
- self.gui.loop_start()
-
- self.text.fg = (255, 255, 0)
- self.text.setText("OTA", "bitmap6")
- self.text.draw(0, 6 * 0, False)
-
- self.text.fg = (255, 0, 255)
- self.text.setText(commit[ 0 : 5], "bitmap6")
- self.text.draw(0, 6 * 1, False)
-
- self.text.fg = (0, 255, 255)
- self.text.setText("Get", "bitmap6")
- self.text.draw(0, 6 * 2, False)
-
- self.text.fg = (0, 255, 0)
- self.text.setText(f["path"][0 : 5], "bitmap6")
- self.text.draw(0, 6 * 3, False)
-
- self.text.setText(f["path"][5 : ], "bitmap6")
- self.text.draw(0, 6 * 4, False)
-
- self.gui.loop_end()
-
- gc.collect()
- if hasattr(gc, "mem_free"):
- print("Collected Garbage:", gc.mem_free())
-
- # get a file from a commit
- r = self.fetch(self.host + "/" + self.repo + "/raw/commit/" + commit + "/" + f["path"]).text
-
- if verbose:
- print("Writing " + f["path"] + " to " + self.update_path)
-
- if (self.gui != None) and (self.text != None):
- self.gui.loop_start()
-
- self.text.fg = (255, 255, 0)
- self.text.setText("OTA", "bitmap6")
- self.text.draw(0, 6 * 0, False)
-
- self.text.fg = (255, 0, 255)
- self.text.setText(commit[ 0 : 5], "bitmap6")
- self.text.draw(0, 6 * 1, False)
-
- self.text.fg = (255, 0, 0)
- self.text.setText("Write", "bitmap6")
- self.text.draw(0, 6 * 2, False)
-
- self.text.fg = (0, 255, 0)
- self.text.setText(f["path"][0 : 5], "bitmap6")
- self.text.draw(0, 6 * 3, False)
-
- self.text.setText(f["path"][5 : ], "bitmap6")
- self.text.draw(0, 6 * 4, False)
-
- self.gui.loop_end()
-
- # overwrite existing file
- fo = open(self.update_path + "/" + f["path"], "w")
- fo.write(r)
- fo.close()
-
- if f["path"] == self.exe_path:
- if verbose:
- print("Writing " + f["path"] + " to main.py")
-
- fo = open(self.update_path + "/main.py", "w")
- fo.write(r)
- fo.close()
-
- # Write new commit id to local file
- f = open(self.update_path + "/" + self.version_file, "w")
- f.write(commit + "\n")
- f.close()
-
- def non_pico_ota_test(ota):
- if not os.path.exists("tmp"):
- os.makedirs("tmp")
- ota.path("tmp")
-
- print("Checking for updates")
- newer, commit = ota.check(True)
- print()
-
- # Just for testing
- previous = ota.get_previous_commit(commit)
- print("Previous commit (-1):", previous)
- previous = ota.get_previous_commit(previous)
- print("Previous commit (-2):", previous)
- print()
-
- if newer:
- print("Updating")
- ota.update_to_commit(commit, True)
- else:
- print("No update required")
-
- if on_pico:
- from pico import PicoText
- i = util.getInput()
- t = util.getTarget(i)
- s = PicoText(t)
-
- def pico_ota_run(ota):
- gc.collect()
- print("Collected Garbage:", gc.mem_free())
-
- t.loop_start()
-
- s.fg = (255, 255, 0)
- s.setText("OTA", "bitmap6")
- s.draw(0, 6 * 0, False)
-
- s.fg = (0, 255, 255)
- s.setText("Check", "bitmap6")
- s.draw(0, 6 * 2, False)
-
- t.loop_end()
-
- gc.collect()
- print("Collected Garbage:", gc.mem_free())
-
- print("Checking for updates")
- newer, commit = ota.check(True)
-
- gc.collect()
- print("Collected Garbage:", gc.mem_free())
-
- if newer:
- t.loop_start()
-
- s.fg = (255, 255, 0)
- s.setText("OTA", "bitmap6")
- s.draw(0, 6 * 0, False)
-
- s.fg = (255, 0, 255)
- s.setText(commit[ 0 : 5], "bitmap6")
- s.draw(0, 6 * 1, False)
-
- s.setText(commit[ 5 : 10], "bitmap6")
- s.draw(0, 6 * 2, False)
-
- s.setText(commit[10 : 15], "bitmap6")
- s.draw(0, 6 * 3, False)
-
- s.setText(commit[15 : 20], "bitmap6")
- s.draw(0, 6 * 4, False)
-
- t.loop_end()
-
- print("Updating to:", commit)
- ota.update_to_commit(commit, True)
-
- print("Resetting")
- machine.soft_reset()
- else:
- t.loop_start()
-
- s.fg = (255, 255, 0)
- s.setText("OTA", "bitmap6")
- s.draw(0, 6 * 0, False)
-
- s.fg = (0, 255, 0)
- s.setText("Done!", "bitmap6")
- s.draw(0, 6 * 1, False)
-
- s.fg = (255, 0, 255)
- s.setText("Boot", "bitmap6")
- s.draw(0, 6 * 2, False)
-
- s.fg = (0, 255, 255)
- s.setText("camp_pico.py"[0 : 5], "bitmap6")
- s.draw(0, 6 * 3, False)
-
- s.setText("camp_pico.py"[5 : ], "bitmap6")
- s.draw(0, 6 * 4, False)
-
- t.loop_end()
-
- fallback = False
-
- try:
- gc.collect()
- print("Collected Garbage:", gc.mem_free())
-
- print("Starting Application")
- import camp_pico
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
-
- print("Falling back to previous")
- fallback = True
-
- # TODO this would immediately cause another update on reboot
- # TODO set a flag to prevent updates after fallbacks?
- # TODO or better, blacklist failed commit_id!
- #if fallback:
- # previous = ota.get_previous_commit(commit, True)
- # ota.update_to_commit(previous, True)
- # machine.soft_reset()
-
- ota = PicoOTA("https://git.xythobuz.de", "thomas/rgb-matrix-visualizer")
-
- # stuff not needed on Pico
- ota.ignore(".gitignore")
- ota.ignore("README.md")
- ota.ignore("copy.sh")
- ota.ignore("config.py")
- ota.ignore("fonts")
- ota.ignore("hardware")
- ota.ignore("images")
- ota.ignore("bdf.py")
- ota.ignore("camp_small.py")
- ota.ignore("gamepad.py")
- ota.ignore("pi.py")
- ota.ignore("test.py")
-
- if not on_pico:
- non_pico_ota_test(ota)
- else:
- ota.exe("pico_ota.py")
- ota.ui(t, s)
- pico_ota_run(ota)
|