123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- #!/usr/bin/env python3
-
- # Uses the Gitea API to fetch the latest revision of the project from a repo.
- #
- # Inspired by:
- # https://github.com/olivergregorius/micropython_ota
- #
- # ----------------------------------------------------------------------------
- # "THE BEER-WARE LICENSE" (Revision 42):
- # <xythobuz@xythobuz.de> wrote this file. As long as you retain this notice
- # you can do whatever you want with this stuff. If we meet some day, and you
- # think this stuff is worth it, you can buy me a beer in return. Thomas Buck
- # ----------------------------------------------------------------------------
-
- import util
- import sys
- import os
-
- # to check if we're actually running on MicroPython
- on_pico = False
- try:
- import machine
- on_pico = True
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
-
- class PicoOTA:
- def __init__(self, host, repo, branch = None):
- self.host = host
- self.repo = repo
- self.branch = branch
-
- self.get = None
- self.update_path = "."
- self.exe_path = ""
- self.version_file = "ota_version"
- self.blacklist = []
-
- def path(self, p):
- self.update_path = p
-
- def exe(self, e):
- self.exe_path = e
-
- def ignore(self, path):
- if not path in self.blacklist:
- self.blacklist.append(path)
-
- def fetch(self, url):
- # lazily initialize WiFi
- if self.get == None:
- self.get = util.getRequests()
- if self.get == None:
- return None
-
- try:
- #print("GET " + url)
- r = self.get(url)
- r.close()
- return r
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
- return None
-
- def get_stored_commit(self):
- current = "unknown"
- try:
- f = open(os.path.join(self.update_path, self.version_file), "r")
- current = f.readline().strip()
- f.close()
- except Exception as e:
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
- return current
-
- def get_previous_commit(self, commit):
- r = self.fetch(self.host + "/" + self.repo + "/commit/" + commit).text
- for line in r.splitlines():
- if not (self.repo + "/commit/") in line:
- continue
-
- line = line[line.find("/commit/") : ][8 : ][ : 40]
- if line != commit:
- return line
- return "unknown"
-
- def check(self, verbose = False):
- if self.branch == None:
- # get default branch
- r = self.fetch(self.host + "/api/v1/repos/" + self.repo).json()
- self.branch = r["default_branch"]
-
- if verbose:
- print("Selected default branch " + self.branch)
-
- # check for latest commit in branch
- r = self.fetch(self.host + "/api/v1/repos/" + self.repo + "/branches/" + self.branch).json()
- commit = r["commit"]["id"]
-
- if verbose:
- print("Latest commit is " + commit)
-
- current = self.get_stored_commit()
-
- if verbose:
- if current != commit:
- print("Current commit " + current + " is different!")
- else:
- print("No update required")
-
- return (current != commit, commit)
-
- def update_to_commit(self, commit, verbose = False):
- # list all files for a commit
- r = self.fetch(self.host + "/api/v1/repos/" + self.repo + "/git/trees/" + commit).json()
-
- # TODO does not support sub-folders
-
- if verbose:
- if len(r["tree"]) > 0:
- print(str(len(r["tree"])) + " files in repo:")
- for f in r["tree"]:
- if f["path"] in self.blacklist:
- print(" - (IGNORED) " + f["path"])
- else:
- print(" - " + f["path"])
- else:
- print("No files in repo?!")
-
- for f in r["tree"]:
- if f["path"] in self.blacklist:
- continue
-
- # get a file from a commit
- r = self.fetch(self.host + "/" + self.repo + "/raw/commit/" + commit + "/" + f["path"]).text
-
- if verbose:
- print("Writing " + f["path"] + " to " + self.update_path)
-
- # overwrite existing file
- fo = open(os.path.join(self.update_path, f["path"]), "w")
- fo.write(r)
- fo.close()
-
- if f["path"] == self.exe_path:
- if verbose:
- print("Writing " + f["path"] + " to main.py")
-
- fo = open(os.path.join(self.update_path, "main.py"), "w")
- fo.write(r)
- fo.close()
-
- # Write new commit id to local file
- f = open(os.path.join(self.update_path, self.version_file), "w")
- f.write(commit + os.linesep)
- f.close()
-
- def non_pico_ota_test(ota):
- if not os.path.exists("tmp"):
- os.makedirs("tmp")
- ota.path("tmp")
-
- print("Checking for updates")
- newer, commit = ota.check(True)
- print()
-
- # Just for testing
- previous = ota.get_previous_commit(commit)
- print("Previous commit (-1):", previous)
- previous = ota.get_previous_commit(previous)
- print("Previous commit (-2):", previous)
- print()
-
- if newer:
- print("Updating")
- ota.update_to_commit(commit, True)
- else:
- print("No update required")
-
- def pico_ota_run(ota):
- newer, commit = ota.check(True)
-
- if newer:
- ota.update_to_commit(commit, True)
- machine.soft_reset()
-
- fallback = False
-
- try:
- import camp_pico
- except Exception as e:
- fallback = True
- print()
- if hasattr(sys, "print_exception"):
- sys.print_exception(e)
- else:
- print(e)
- print()
-
- # TODO this would immediately cause another update on reboot
- # TODO set a flag to prevent updates after fallbacks?
- # TODO or better, blacklist failed commit_id!
- #if fallback:
- # previous = ota.get_previous_commit(commit, True)
- # ota.update_to_commit(previous, True)
- # machine.soft_reset()
-
- if __name__ == "__main__":
- ota = PicoOTA("https://git.xythobuz.de", "thomas/rgb-matrix-visualizer")
-
- # stuff not needed on Pico
- ota.ignore(".gitignore")
- ota.ignore("README.md")
- ota.ignore("copy.sh")
- ota.ignore("config.py")
- ota.ignore("fonts")
- ota.ignore("images")
- ota.ignore("bdf.py")
- ota.ignore("camp_small.py")
- ota.ignore("gamepad.py")
- ota.ignore("pi.py")
- ota.ignore("test.py")
-
- ota.exe("camp_pico.py")
-
- if not on_pico:
- non_pico_ota_test(ota)
- else:
- pico_ota_run(ota)
|