123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- #!/usr/bin/env python3
-
- # OctoTray Linux Qt System Tray OctoPrint client
- #
- # depends on:
- # - python-pyqt5
- # - curl
- # - xdg-open
- #
- # see also:
- # https://doc.qt.io/qt-5/qtwidgets-widgets-imageviewer-example.html
- # https://stackoverflow.com/a/22618496
-
- import json
- import subprocess
- import sys
- import os
- import threading
- import time
- import urllib.parse
- from PyQt5 import QtWidgets, QtGui, QtCore, QtNetwork
- from PyQt5.QtWidgets import QSystemTrayIcon, QAction, QMenu, QMessageBox, QWidget, QLabel, QVBoxLayout, QHBoxLayout, QDesktopWidget, QSizePolicy, QSlider, QLayout
- from PyQt5.QtGui import QIcon, QPixmap, QImageReader
- from PyQt5.QtCore import QCoreApplication, QSettings, QUrl, QTimer, QSize, Qt
-
- class AspectRatioPixmapLabel(QLabel):
- def __init__(self, *args, **kwargs):
- super(AspectRatioPixmapLabel, self).__init__(*args, **kwargs)
- self.setMinimumSize(1, 1)
- self.setScaledContents(False)
- self.pix = QPixmap(0, 0)
-
- def setPixmap(self, p):
- self.pix = p
- super(AspectRatioPixmapLabel, self).setPixmap(self.scaledPixmap())
-
- def heightForWidth(self, width):
- if self.pix.isNull():
- return self.height()
- else:
- return (self.pix.height() * width) / self.pix.width()
-
- def sizeHint(self):
- w = self.width()
- return QSize(int(w), int(self.heightForWidth(w)))
-
- def scaledPixmap(self):
- return self.pix.scaled(self.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
-
- def resizeEvent(self, e):
- if not self.pix.isNull():
- super(AspectRatioPixmapLabel, self).setPixmap(self.scaledPixmap())
-
- class CamWindow(QWidget):
- reloadDelayDefault = 1000 # in ms
- statusDelay = 10 * 1000 # in ms
- addSize = 100
- reloadOn = True
-
- def __init__(self, parent, name, icon, app, manager, printer, *args, **kwargs):
- super(CamWindow, self).__init__(*args, **kwargs)
- self.app = app
- self.manager = manager
- self.manager.finished.connect(self.handleResponse)
- self.parent = parent
- self.printer = printer
- self.host = self.printer[0]
- self.url = "http://" + self.host + ":8080/?action=snapshot"
-
- self.setWindowTitle(name + " Webcam Stream")
- self.setWindowIcon(icon)
-
- box = QVBoxLayout()
- self.setLayout(box)
-
- label = QLabel(self.url)
- box.addWidget(label, 0)
- box.setAlignment(label, Qt.AlignHCenter)
-
- self.img = AspectRatioPixmapLabel()
- self.img.setPixmap(QPixmap(640, 480))
- box.addWidget(self.img, 1)
-
- slide = QHBoxLayout()
- box.addLayout(slide, 0)
-
- self.slider = QSlider(Qt.Horizontal)
- self.slider.setMinimum(0)
- self.slider.setMaximum(2000)
- self.slider.setTickInterval(100)
- self.slider.setPageStep(100)
- self.slider.setSingleStep(100)
- self.slider.setTickPosition(QSlider.TicksBelow)
- self.slider.setValue(self.reloadDelayDefault)
- self.slider.valueChanged.connect(self.sliderChanged)
- slide.addWidget(self.slider, 1)
-
- self.slideLabel = QLabel(str(self.reloadDelayDefault) + "ms")
- slide.addWidget(self.slideLabel, 0)
-
- self.statusLabel = QLabel("Status: unavailable")
- box.addWidget(self.statusLabel, 0)
- box.setAlignment(label, Qt.AlignHCenter)
-
- size = self.size()
- size.setHeight(size.height() + self.addSize)
- self.resize(size)
-
- self.loadImage()
- self.loadStatus()
-
- def getHost(self):
- return self.host
-
- def sliderChanged(self):
- self.slideLabel.setText(str(self.slider.value()) + "ms")
-
- def closeEvent(self, event):
- self.reloadOn = False
- self.url = ""
- self.parent.removeWebcamWindow(self)
-
- def scheduleLoadImage(self):
- if self.reloadOn:
- QTimer.singleShot(self.slider.value(), self.loadImage)
-
- def scheduleLoadStatus(self):
- if self.reloadOn:
- QTimer.singleShot(self.statusDelay, self.loadStatus)
-
- def loadImage(self):
- url = QUrl(self.url)
- request = QtNetwork.QNetworkRequest(url)
- self.manager.get(request)
-
- def loadStatus(self):
- s = "Status: "
- t = self.parent.getTemperatureString(self.host, self.printer[1])
- if len(t) > 0:
- s += t
- else:
- s += "Unknown"
-
- progress = self.parent.getProgress(self.host, self.printer[1])
- if ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress) and (progress["completion"] != None) and (progress["printTime"] != None) and (progress["printTimeLeft"] != None):
- s += " - %.1f%%" % progress["completion"]
- s += " - runtime "
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTime"]))
- s += " - "
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTimeLeft"])) + " left"
-
- self.statusLabel.setText(s)
- self.scheduleLoadStatus()
-
- def handleResponse(self, reply):
- if reply.url().url() == self.url:
- if reply.error() == QtNetwork.QNetworkReply.NoError:
- reader = QImageReader(reply)
- reader.setAutoTransform(True)
- image = reader.read()
- if image != None:
- if image.colorSpace().isValid():
- image.convertToColorSpace(QColorSpace.SRgb)
- self.img.setPixmap(QPixmap.fromImage(image))
- self.scheduleLoadImage()
- else:
- print("Error decoding image: " + reader.errorString())
- else:
- print("Error loading image: " + reply.errorString())
-
- class OctoTray():
- name = "OctoTray"
- vendor = "xythobuz"
- version = "0.2"
-
- iconPath = "/usr/share/pixmaps/"
- iconName = "octotray_icon.png"
-
- # 0=host, 1=key
- # (2=system-commands, 3=menu, 4...=actions)
- printers = [
- [ "PRINTER_HOST_HERE", "PRINTER_API_KEY_HERE" ]
- ]
-
- statesWithWarning = [
- "Printing", "Pausing", "Paused"
- ]
-
- camWindows = []
-
- def __init__(self):
- self.app = QtWidgets.QApplication(sys.argv)
- QCoreApplication.setApplicationName(self.name)
-
- if not QSystemTrayIcon.isSystemTrayAvailable():
- self.showDialog("OctoTray Error", "System Tray is not available on this platform!", "", False, False, True)
- sys.exit(0)
-
- self.manager = QtNetwork.QNetworkAccessManager()
- self.menu = QMenu()
-
- unknownCount = 0
- for p in self.printers:
- method = self.getMethod(p[0], p[1])
- print("Printer " + p[0] + " has method " + method)
- if method == "unknown":
- unknownCount += 1
- continue
-
- commands = self.getSystemCommands(p[0], p[1])
- p.append(commands)
-
- menu = QMenu(self.getName(p[0], p[1]))
- p.append(menu)
- self.menu.addMenu(menu)
-
- if method == "psucontrol":
- action = QAction("Turn On PSU")
- action.triggered.connect(lambda chk, x=p: self.printerOnAction(x))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Turn Off PSU")
- action.triggered.connect(lambda chk, x=p: self.printerOffAction(x))
- p.append(action)
- menu.addAction(action)
-
- for i in range(0, len(commands)):
- action = QAction(commands[i].title())
- action.triggered.connect(lambda chk, x=p, y=i: self.printerSystemCommandAction(x, y))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Get Status")
- action.triggered.connect(lambda chk, x=p: self.printerStatusAction(x))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Show Webcam")
- action.triggered.connect(lambda chk, x=p: self.printerWebcamAction(x))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Open Web UI")
- action.triggered.connect(lambda chk, x=p: self.printerWebAction(x))
- p.append(action)
- menu.addAction(action)
-
- if (len(self.printers) <= 0) or (unknownCount >= len(self.printers)):
- self.showDialog("OctoTray Error", "No printers available!", "", False, False, True)
- sys.exit(0)
-
- self.quitAction = QAction("&Quit")
- self.quitAction.triggered.connect(self.exit)
- self.menu.addAction(self.quitAction)
-
- iconPathName = ""
- if os.path.isfile(self.iconName):
- iconPathName = self.iconName
- elif os.path.isfile(self.iconPath + self.iconName):
- iconPathName = self.iconPath + self.iconName
- else:
- self.showDialog("OctoTray Error", "Icon file has not been found! found", "", False, False, True)
- sys.exit(0)
-
- self.icon = QIcon()
- pic = QPixmap(32, 32)
- pic.load(iconPathName)
- self.icon = QIcon(pic)
-
- trayIcon = QSystemTrayIcon(self.icon)
- trayIcon.setToolTip(self.name + " " + self.version)
- trayIcon.setContextMenu(self.menu)
- trayIcon.setVisible(True)
-
- sys.exit(self.app.exec_())
-
- def openBrowser(self, url):
- os.system("xdg-open http://" + url)
-
- def showDialog(self, title, text1, text2 = "", question = False, warning = False, error = False):
- msg = QMessageBox()
-
- if error:
- msg.setIcon(QMessageBox.Critical)
- elif warning:
- msg.setIcon(QMessageBox.Warning)
- elif question:
- msg.setIcon(QMessageBox.Question)
- else:
- msg.setIcon(QMessageBox.Information)
-
- msg.setWindowTitle(title)
- msg.setText(text1)
-
- if text2 is not None:
- msg.setInformativeText(text2)
-
- if question:
- msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
- else:
- msg.setStandardButtons(QMessageBox.Ok)
-
- retval = msg.exec_()
- if retval == QMessageBox.Yes:
- return True
- else:
- return False
-
- def sendRequest(self, host, headers, path, content = None):
- cmdline = 'curl -s -m 1'
- for h in headers:
- cmdline += " -H \"" + h + "\""
- if content == None:
- cmdline += " -X GET"
- else:
- cmdline += " -X POST"
- cmdline += " -d '" + content + "'"
- cmdline += " http://" + host + "/api/" + path
- r = subprocess.run(cmdline, shell=True, capture_output=True, timeout=10, text=True)
- return r.stdout
-
- def sendPostRequest(self, host, key, path, content):
- headers = [ "Content-Type: application/json",
- "X-Api-Key: " + key ]
- return self.sendRequest(host, headers, path, content)
-
- def sendGetRequest(self, host, key, path):
- headers = [ "X-Api-Key: " + key ]
- return self.sendRequest(host, headers, path)
-
- def getTemperatureString(self, host, key):
- r = self.sendGetRequest(host, key, "printer")
- s = ""
- try:
- rd = json.loads(r)
-
- if ("state" in rd) and ("text" in rd["state"]):
- s += rd["state"]["text"]
- if "temperature" in rd:
- s += " - "
-
- if "temperature" in rd:
- if "bed" in rd["temperature"]:
- if "actual" in rd["temperature"]["bed"]:
- s += "B"
- s += "%.1f" % rd["temperature"]["bed"]["actual"]
- if "target" in rd["temperature"]["bed"]:
- s += "/"
- s += "%.1f" % rd["temperature"]["bed"]["target"]
- s += " "
-
- if "tool0" in rd["temperature"]:
- if "actual" in rd["temperature"]["tool0"]:
- s += "T"
- s += "%.1f" % rd["temperature"]["tool0"]["actual"]
- if "target" in rd["temperature"]["tool0"]:
- s += "/"
- s += "%.1f" % rd["temperature"]["tool0"]["target"]
- s += " "
-
- if "tool1" in rd["temperature"]:
- if "actual" in rd["temperature"]["tool1"]:
- s += "T"
- s += "%.1f" % rd["temperature"]["tool1"]["actual"]
- if "target" in rd["temperature"]["tool1"]:
- s += "/"
- s += "%.1f" % rd["temperature"]["tool1"]["target"]
- s += " "
- except json.JSONDecodeError:
- pass
- return s.strip()
-
- def getState(self, host, key):
- r = self.sendGetRequest(host, key, "job")
- try:
- rd = json.loads(r)
- if "state" in rd:
- return rd["state"]
- except json.JSONDecodeError:
- pass
- return "Unknown"
-
- def getProgress(self, host, key):
- r = self.sendGetRequest(host, key, "job")
- try:
- rd = json.loads(r)
- if "progress" in rd:
- return rd["progress"]
- except json.JSONDecodeError:
- pass
- return "Unknown"
-
- def getName(self, host, key):
- r = self.sendGetRequest(host, key, "printerprofiles")
- try:
- rd = json.loads(r)
- if "profiles" in rd:
- p = next(iter(rd["profiles"]))
- if "name" in rd["profiles"][p]:
- return rd["profiles"][p]["name"]
- except json.JSONDecodeError:
- pass
- return host
-
- def getMethod(self, host, key):
- r = self.sendGetRequest(host, key, "plugin/psucontrol")
- try:
- rd = json.loads(r)
- if "isPSUOn" in rd:
- return "psucontrol"
- except json.JSONDecodeError:
- pass
-
- r = self.sendGetRequest(host, key, "system/commands/custom")
- try:
- rd = json.loads(r)
- for c in rd:
- if "action" in c:
- # we have some custom commands and no psucontrol
- # so lets try to use that instead of skipping
- # the printer completely with 'unknown'
- return "system"
- except json.JSONDecodeError:
- pass
-
- return "unknown"
-
- def getSystemCommands(self, host, key):
- l = []
- r = self.sendGetRequest(host, key, "system/commands/custom")
- try:
- rd = json.loads(r)
-
- if len(rd) > 0:
- print("system commands available for " + host + ":")
-
- for c in rd:
- if "action" in c:
- print(" - " + c["action"])
- l.append(c["action"])
- except json.JSONDecodeError:
- pass
- return l
-
- def setPSUControl(self, host, key, state):
- cmd = "turnPSUOff"
- if state:
- cmd = "turnPSUOn"
- return self.sendPostRequest(host, key, "plugin/psucontrol", '{ "command":"' + cmd + '" }')
-
- def setSystemCommand(self, host, key, cmd):
- cmd = urllib.parse.quote(cmd)
- return self.sendPostRequest(host, key, "system/commands/custom/" + cmd, '')
-
- def exit(self):
- QCoreApplication.quit()
-
- def printerSystemCommandAction(self, item, index):
- if "off" in item[2][index].lower():
- state = self.getState(item[0], item[1])
- if state in self.statesWithWarning:
- if self.showDialog("OctoTray Warning", "The printer seems to be running currently!", "Do you really want to run '" + item[2][index] + "'?", True, True) == True:
- self.setSystemCommand(item[0], item[1], item[2][index])
- else:
- return
- self.setSystemCommand(item[0], item[1], item[2][index])
-
- def printerOnAction(self, item):
- self.setPSUControl(item[0], item[1], True)
-
- def printerOffAction(self, item):
- state = self.getState(item[0], item[1])
- if state in self.statesWithWarning:
- if self.showDialog("OctoTray Warning", "The printer seems to be running currently!", "Do you really want to turn it off?", True, True) == True:
- self.setPSUControl(item[0], item[1], False)
- else:
- self.setPSUControl(item[0], item[1], False)
-
- def printerWebAction(self, item):
- self.openBrowser(item[0])
-
- def printerStatusAction(self, item):
- progress = self.getProgress(item[0], item[1])
- s = ""
- warning = False
- if ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress) and (progress["completion"] != None) and (progress["printTime"] != None) and (progress["printTimeLeft"] != None):
- s = "%.1f%% Completion\n" % progress["completion"]
- s += "Printing since " + time.strftime("%H:%M:%S", time.gmtime(progress["printTime"])) + "\n"
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTimeLeft"])) + " left"
- elif ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress):
- s = "No job is currently running"
- else:
- s = "Could not read printer status!"
- warning = True
- t = self.getTemperatureString(item[0], item[1])
- if len(t) > 0:
- s += "\n" + t
- self.showDialog("OctoTray Status", s, None, False, warning)
-
- def printerWebcamAction(self, item):
- for cw in self.camWindows:
- if cw.getHost() == item[0]:
- cw.show()
- cw.activateWindow()
- return
-
- window = CamWindow(self, self.name, self.icon, self.app, self.manager, item)
- self.camWindows.append(window)
-
- screenGeometry = QDesktopWidget().screenGeometry()
- width = screenGeometry.width()
- height = screenGeometry.height()
- x = (width - window.width()) / 2
- y = (height - window.height()) / 2
- window.setGeometry(int(x), int(y), int(window.width()), int(window.height()))
-
- window.show()
- window.activateWindow()
-
- def removeWebcamWindow(self, window):
- self.camWindows.remove(window)
-
- if __name__ == "__main__":
- tray = OctoTray()
|