|
- #!/usr/bin/env python3
-
- # OctoTray Linux Qt System Tray OctoPrint client
- #
- # depends on:
- # - python-pyqt5
- #
- # see also:
- # https://doc.qt.io/qt-5/qtwidgets-widgets-imageviewer-example.html
- # https://stackoverflow.com/a/22618496
-
- import json
- import sys
- import os
- import time
- import urllib.parse
- import urllib.request
- from PyQt5 import QtWidgets, QtGui, QtCore, QtNetwork
- from PyQt5.QtWidgets import QSystemTrayIcon, QAction, QMenu, QMessageBox, QWidget, QLabel, QVBoxLayout, QHBoxLayout, QDesktopWidget, QSizePolicy, QSlider, QLayout, QTableWidget, QTableWidgetItem, QPushButton
- from PyQt5.QtGui import QIcon, QPixmap, QImageReader, QDesktopServices
- from PyQt5.QtCore import QCoreApplication, QSettings, QUrl, QTimer, QSize, Qt, QSettings
-
- class SettingsWindow(QWidget):
- def __init__(self, parent, *args, **kwargs):
- super(SettingsWindow, self).__init__(*args, **kwargs)
- self.parent = parent
-
- self.setWindowTitle(parent.name + " Settings")
- self.setWindowIcon(parent.icon)
-
- box = QVBoxLayout()
- self.setLayout(box)
-
- buttons = QHBoxLayout()
- box.addLayout(buttons, 0)
-
- self.add = QPushButton("&Add Printer")
- self.add.clicked.connect(self.addPrinter)
- buttons.addWidget(self.add)
-
- self.remove = QPushButton("&Remove Printer")
- self.remove.clicked.connect(self.removePrinter)
- buttons.addWidget(self.remove)
-
- printers = self.parent.readSettings()
- self.rows = len(printers)
- self.table = QTableWidget(self.rows, 2)
- box.addWidget(self.table, 1)
-
- for i in range(0, self.rows):
- p = printers[i]
- for j in range(0, 2):
- item = QTableWidgetItem(p[j])
- self.table.setItem(i, j, item)
-
- self.table.setSizeAdjustPolicy(QtWidgets.QAbstractScrollArea.AdjustToContents)
- self.table.resizeColumnsToContents()
-
- def tableToList(self):
- printers = []
- for i in range(0, self.rows):
- p = [self.table.item(i, 0).text(), self.table.item(i, 1).text()]
- printers.append(p)
- return printers
-
- def closeEvent(self, event):
- oldPrinters = [item[0:2] for item in self.parent.printers]
- newPrinters = self.tableToList()
- if oldPrinters != newPrinters:
- r = self.parent.showDialog(self.parent.name + " Settings Changed", "Do you want to save the new list of printers?", "This will restart the application!", True, False, False)
- if r == True:
- self.parent.writeSettings(newPrinters)
- QCoreApplication.exit(42)
- self.parent.removeSettingsWindow()
-
- def addPrinter(self):
- self.rows += 1
- self.table.setRowCount(self.rows)
- self.table.setItem(self.rows - 1, 0, QTableWidgetItem("HOSTNAME"))
- self.table.setItem(self.rows - 1, 1, QTableWidgetItem("API_KEY"))
-
- def removePrinter(self):
- r = self.table.currentRow()
- if (r >= 0) and (r < self.rows):
- self.rows -= 1
- self.table.removeRow(r)
-
- class AspectRatioPixmapLabel(QLabel):
- def __init__(self, *args, **kwargs):
- super(AspectRatioPixmapLabel, self).__init__(*args, **kwargs)
- self.setMinimumSize(1, 1)
- self.setScaledContents(False)
- self.pix = QPixmap(0, 0)
-
- def setPixmap(self, p):
- self.pix = p
- super(AspectRatioPixmapLabel, self).setPixmap(self.scaledPixmap())
-
- def heightForWidth(self, width):
- if self.pix.isNull():
- return self.height()
- else:
- return (self.pix.height() * width) / self.pix.width()
-
- def sizeHint(self):
- w = self.width()
- return QSize(int(w), int(self.heightForWidth(w)))
-
- def scaledPixmap(self):
- return self.pix.scaled(self.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
-
- def resizeEvent(self, e):
- if not self.pix.isNull():
- super(AspectRatioPixmapLabel, self).setPixmap(self.scaledPixmap())
-
- class CamWindow(QWidget):
- reloadDelayDefault = 1000 # in ms
- statusDelay = 10 * 1000 # in ms
- addSize = 100
- reloadOn = True
-
- def __init__(self, parent, printer, *args, **kwargs):
- super(CamWindow, self).__init__(*args, **kwargs)
- self.app = parent.app
- self.manager = parent.manager
- self.manager.finished.connect(self.handleResponse)
- self.parent = parent
- self.printer = printer
- self.host = self.printer[0]
- self.url = "http://" + self.host + ":8080/?action=snapshot"
-
- self.setWindowTitle(parent.name + " Webcam Stream")
- self.setWindowIcon(parent.icon)
-
- box = QVBoxLayout()
- self.setLayout(box)
-
- label = QLabel(self.url)
- box.addWidget(label, 0)
- box.setAlignment(label, Qt.AlignHCenter)
-
- self.img = AspectRatioPixmapLabel()
- self.img.setPixmap(QPixmap(640, 480))
- box.addWidget(self.img, 1)
-
- slide = QHBoxLayout()
- box.addLayout(slide, 0)
-
- self.slider = QSlider(Qt.Horizontal)
- self.slider.setMinimum(0)
- self.slider.setMaximum(2000)
- self.slider.setTickInterval(100)
- self.slider.setPageStep(100)
- self.slider.setSingleStep(100)
- self.slider.setTickPosition(QSlider.TicksBelow)
- self.slider.setValue(self.reloadDelayDefault)
- self.slider.valueChanged.connect(self.sliderChanged)
- slide.addWidget(self.slider, 1)
-
- self.slideLabel = QLabel(str(self.reloadDelayDefault) + "ms")
- slide.addWidget(self.slideLabel, 0)
-
- self.statusLabel = QLabel("Status: unavailable")
- box.addWidget(self.statusLabel, 0)
- box.setAlignment(label, Qt.AlignHCenter)
-
- size = self.size()
- size.setHeight(size.height() + self.addSize)
- self.resize(size)
-
- self.loadImage()
- self.loadStatus()
-
- def getHost(self):
- return self.host
-
- def sliderChanged(self):
- self.slideLabel.setText(str(self.slider.value()) + "ms")
-
- def closeEvent(self, event):
- self.reloadOn = False
- self.url = ""
- self.parent.removeWebcamWindow(self)
-
- def scheduleLoadImage(self):
- if self.reloadOn:
- QTimer.singleShot(self.slider.value(), self.loadImage)
-
- def scheduleLoadStatus(self):
- if self.reloadOn:
- QTimer.singleShot(self.statusDelay, self.loadStatus)
-
- def loadImage(self):
- url = QUrl(self.url)
- request = QtNetwork.QNetworkRequest(url)
- self.manager.get(request)
-
- def loadStatus(self):
- s = "Status: "
- t = self.parent.getTemperatureString(self.host, self.printer[1])
- if len(t) > 0:
- s += t
- else:
- s += "Unknown"
-
- progress = self.parent.getProgress(self.host, self.printer[1])
- if ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress) and (progress["completion"] != None) and (progress["printTime"] != None) and (progress["printTimeLeft"] != None):
- s += " - %.1f%%" % progress["completion"]
- s += " - runtime "
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTime"]))
- s += " - "
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTimeLeft"])) + " left"
-
- self.statusLabel.setText(s)
- self.scheduleLoadStatus()
-
- def handleResponse(self, reply):
- if reply.url().url() == self.url:
- if reply.error() == QtNetwork.QNetworkReply.NoError:
- reader = QImageReader(reply)
- reader.setAutoTransform(True)
- image = reader.read()
- if image != None:
- if image.colorSpace().isValid():
- image.convertToColorSpace(QColorSpace.SRgb)
- self.img.setPixmap(QPixmap.fromImage(image))
- self.scheduleLoadImage()
- else:
- print("Error decoding image: " + reader.errorString())
- else:
- print("Error loading image: " + reply.errorString())
-
- class OctoTray():
- name = "OctoTray"
- vendor = "xythobuz"
- version = "0.2"
-
- iconPath = "/usr/share/pixmaps/"
- iconName = "octotray_icon.png"
-
- networkTimeout = 2.0 # in s
-
- # list of lists, inner lists contain printer data:
- # 0=host 1=key (2=system-commands 3=menu 4+=actions)
- printers = []
-
- statesWithWarning = [
- "Printing", "Pausing", "Paused"
- ]
-
- camWindows = []
- settingsWindow = None
-
- def __init__(self, app):
- QCoreApplication.setApplicationName(self.name)
- self.app = app
-
- if not QSystemTrayIcon.isSystemTrayAvailable():
- self.showDialog("OctoTray Error", "System Tray is not available on this platform!", "", False, False, True)
- sys.exit(0)
-
- self.manager = QtNetwork.QNetworkAccessManager()
- self.menu = QMenu()
- self.printers = self.readSettings()
-
- unknownCount = 0
- for p in self.printers:
- method = self.getMethod(p[0], p[1])
- print("Printer " + p[0] + " has method " + method)
- if method == "unknown":
- unknownCount += 1
- continue
-
- commands = self.getSystemCommands(p[0], p[1])
- p.append(commands)
-
- menu = QMenu(self.getName(p[0], p[1]))
- p.append(menu)
- self.menu.addMenu(menu)
-
- if method == "psucontrol":
- action = QAction("Turn On PSU")
- action.triggered.connect(lambda chk, x=p: self.printerOnAction(x))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Turn Off PSU")
- action.triggered.connect(lambda chk, x=p: self.printerOffAction(x))
- p.append(action)
- menu.addAction(action)
-
- for i in range(0, len(commands)):
- action = QAction(commands[i].title())
- action.triggered.connect(lambda chk, x=p, y=i: self.printerSystemCommandAction(x, y))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Get Status")
- action.triggered.connect(lambda chk, x=p: self.printerStatusAction(x))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Show Webcam")
- action.triggered.connect(lambda chk, x=p: self.printerWebcamAction(x))
- p.append(action)
- menu.addAction(action)
-
- action = QAction("Open Web UI")
- action.triggered.connect(lambda chk, x=p: self.printerWebAction(x))
- p.append(action)
- menu.addAction(action)
-
- self.settingsAction = QAction("&Settings")
- self.settingsAction.triggered.connect(self.showSettingsAction)
- self.menu.addAction(self.settingsAction)
-
- self.quitAction = QAction("&Quit")
- self.quitAction.triggered.connect(self.exit)
- self.menu.addAction(self.quitAction)
-
- self.iconPathName = ""
- if os.path.isfile(self.iconName):
- self.iconPathName = self.iconName
- elif os.path.isfile(self.iconPath + self.iconName):
- self.iconPathName = self.iconPath + self.iconName
- else:
- self.showDialog("OctoTray Error", "Icon file has not been found! found", "", False, False, True)
- sys.exit(0)
-
- self.icon = QIcon()
- self.pic = QPixmap(32, 32)
- self.pic.load(self.iconPathName)
- self.icon = QIcon(self.pic)
-
- self.trayIcon = QSystemTrayIcon(self.icon)
- self.trayIcon.setToolTip(self.name + " " + self.version)
- self.trayIcon.setContextMenu(self.menu)
- self.trayIcon.setVisible(True)
-
- def readSettings(self):
- settings = QSettings(self.vendor, self.name)
- printers = []
- l = settings.beginReadArray("printers")
- for i in range(0, l):
- settings.setArrayIndex(i)
- p = []
- p.append(settings.value("host"))
- p.append(settings.value("key"))
- printers.append(p)
- settings.endArray()
- return printers
-
- def writeSettings(self, printers):
- settings = QSettings(self.vendor, self.name)
- settings.remove("printers")
- settings.beginWriteArray("printers")
- for i in range(0, len(printers)):
- p = printers[i]
- settings.setArrayIndex(i)
- settings.setValue("host", p[0])
- settings.setValue("key", p[1])
- settings.endArray()
- del settings
-
- def openBrowser(self, url):
- QDesktopServices.openUrl(QUrl("http://" + url))
-
- def showDialog(self, title, text1, text2 = "", question = False, warning = False, error = False):
- msg = QMessageBox()
-
- if error:
- msg.setIcon(QMessageBox.Critical)
- elif warning:
- msg.setIcon(QMessageBox.Warning)
- elif question:
- msg.setIcon(QMessageBox.Question)
- else:
- msg.setIcon(QMessageBox.Information)
-
- msg.setWindowTitle(title)
- msg.setText(text1)
-
- if text2 is not None:
- msg.setInformativeText(text2)
-
- if question:
- msg.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
- else:
- msg.setStandardButtons(QMessageBox.Ok)
-
- retval = msg.exec_()
- if retval == QMessageBox.Yes:
- return True
- else:
- return False
-
- def sendRequest(self, host, headers, path, content = None):
- url = "http://" + host + "/api/" + path
- if content == None:
- request = urllib.request.Request(url, None, headers)
- else:
- data = content.encode('ascii')
- request = urllib.request.Request(url, data, headers)
- try:
- with urllib.request.urlopen(request, None, self.networkTimeout) as response:
- text = response.read()
- return text
- except urllib.error.URLError:
- pass
- except urllib.error.HTTPError:
- pass
- return ""
-
- def sendPostRequest(self, host, key, path, content):
- headers = {
- "Content-Type": "application/json",
- "X-Api-Key": key
- }
- return self.sendRequest(host, headers, path, content)
-
- def sendGetRequest(self, host, key, path):
- headers = {
- "X-Api-Key": key
- }
- return self.sendRequest(host, headers, path)
-
- def getTemperatureString(self, host, key):
- r = self.sendGetRequest(host, key, "printer")
- s = ""
- try:
- rd = json.loads(r)
-
- if ("state" in rd) and ("text" in rd["state"]):
- s += rd["state"]["text"]
- if "temperature" in rd:
- s += " - "
-
- if "temperature" in rd:
- if "bed" in rd["temperature"]:
- if "actual" in rd["temperature"]["bed"]:
- s += "B"
- s += "%.1f" % rd["temperature"]["bed"]["actual"]
- if "target" in rd["temperature"]["bed"]:
- s += "/"
- s += "%.1f" % rd["temperature"]["bed"]["target"]
- s += " "
-
- if "tool0" in rd["temperature"]:
- if "actual" in rd["temperature"]["tool0"]:
- s += "T"
- s += "%.1f" % rd["temperature"]["tool0"]["actual"]
- if "target" in rd["temperature"]["tool0"]:
- s += "/"
- s += "%.1f" % rd["temperature"]["tool0"]["target"]
- s += " "
-
- if "tool1" in rd["temperature"]:
- if "actual" in rd["temperature"]["tool1"]:
- s += "T"
- s += "%.1f" % rd["temperature"]["tool1"]["actual"]
- if "target" in rd["temperature"]["tool1"]:
- s += "/"
- s += "%.1f" % rd["temperature"]["tool1"]["target"]
- s += " "
- except json.JSONDecodeError:
- pass
- return s.strip()
-
- def getState(self, host, key):
- r = self.sendGetRequest(host, key, "job")
- try:
- rd = json.loads(r)
- if "state" in rd:
- return rd["state"]
- except json.JSONDecodeError:
- pass
- return "Unknown"
-
- def getProgress(self, host, key):
- r = self.sendGetRequest(host, key, "job")
- try:
- rd = json.loads(r)
- if "progress" in rd:
- return rd["progress"]
- except json.JSONDecodeError:
- pass
- return "Unknown"
-
- def getName(self, host, key):
- r = self.sendGetRequest(host, key, "printerprofiles")
- try:
- rd = json.loads(r)
- if "profiles" in rd:
- p = next(iter(rd["profiles"]))
- if "name" in rd["profiles"][p]:
- return rd["profiles"][p]["name"]
- except json.JSONDecodeError:
- pass
- return host
-
- def getMethod(self, host, key):
- r = self.sendGetRequest(host, key, "plugin/psucontrol")
- try:
- rd = json.loads(r)
- if "isPSUOn" in rd:
- return "psucontrol"
- except json.JSONDecodeError:
- pass
-
- r = self.sendGetRequest(host, key, "system/commands/custom")
- try:
- rd = json.loads(r)
- for c in rd:
- if "action" in c:
- # we have some custom commands and no psucontrol
- # so lets try to use that instead of skipping
- # the printer completely with 'unknown'
- return "system"
- except json.JSONDecodeError:
- pass
-
- return "unknown"
-
- def getSystemCommands(self, host, key):
- l = []
- r = self.sendGetRequest(host, key, "system/commands/custom")
- try:
- rd = json.loads(r)
-
- if len(rd) > 0:
- print("system commands available for " + host + ":")
-
- for c in rd:
- if "action" in c:
- print(" - " + c["action"])
- l.append(c["action"])
- except json.JSONDecodeError:
- pass
- return l
-
- def setPSUControl(self, host, key, state):
- cmd = "turnPSUOff"
- if state:
- cmd = "turnPSUOn"
- return self.sendPostRequest(host, key, "plugin/psucontrol", '{ "command":"' + cmd + '" }')
-
- def setSystemCommand(self, host, key, cmd):
- cmd = urllib.parse.quote(cmd)
- return self.sendPostRequest(host, key, "system/commands/custom/" + cmd, '')
-
- def exit(self):
- QCoreApplication.quit()
-
- def printerSystemCommandAction(self, item, index):
- if "off" in item[2][index].lower():
- state = self.getState(item[0], item[1])
- if state in self.statesWithWarning:
- if self.showDialog("OctoTray Warning", "The printer seems to be running currently!", "Do you really want to run '" + item[2][index] + "'?", True, True) == True:
- self.setSystemCommand(item[0], item[1], item[2][index])
- else:
- return
- self.setSystemCommand(item[0], item[1], item[2][index])
-
- def printerOnAction(self, item):
- self.setPSUControl(item[0], item[1], True)
-
- def printerOffAction(self, item):
- state = self.getState(item[0], item[1])
- if state in self.statesWithWarning:
- if self.showDialog("OctoTray Warning", "The printer seems to be running currently!", "Do you really want to turn it off?", True, True) == True:
- self.setPSUControl(item[0], item[1], False)
- else:
- self.setPSUControl(item[0], item[1], False)
-
- def printerWebAction(self, item):
- self.openBrowser(item[0])
-
- def printerStatusAction(self, item):
- progress = self.getProgress(item[0], item[1])
- s = ""
- warning = False
- if ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress) and (progress["completion"] != None) and (progress["printTime"] != None) and (progress["printTimeLeft"] != None):
- s = "%.1f%% Completion\n" % progress["completion"]
- s += "Printing since " + time.strftime("%H:%M:%S", time.gmtime(progress["printTime"])) + "\n"
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTimeLeft"])) + " left"
- elif ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress):
- s = "No job is currently running"
- else:
- s = "Could not read printer status!"
- warning = True
- t = self.getTemperatureString(item[0], item[1])
- if len(t) > 0:
- s += "\n" + t
- self.showDialog("OctoTray Status", s, None, False, warning)
-
- def printerWebcamAction(self, item):
- for cw in self.camWindows:
- if cw.getHost() == item[0]:
- cw.show()
- cw.activateWindow()
- return
-
- window = CamWindow(self, item)
- self.camWindows.append(window)
-
- window.show()
- window.activateWindow()
-
- screenGeometry = QDesktopWidget().screenGeometry()
- x = (screenGeometry.width() - window.width()) / 2
- y = (screenGeometry.height() - window.height()) / 2
- x += screenGeometry.x()
- y += screenGeometry.y()
- window.setGeometry(int(x), int(y), int(window.width()), int(window.height()))
-
- def removeWebcamWindow(self, window):
- self.camWindows.remove(window)
-
- def showSettingsAction(self):
- if self.settingsWindow != None:
- self.settingsWindow.show()
- self.settingsWindow.activateWindow()
- return
-
- self.settingsWindow = SettingsWindow(self)
- self.settingsWindow.show()
- self.settingsWindow.activateWindow()
-
- screenGeometry = QDesktopWidget().screenGeometry()
- x = (screenGeometry.width() - self.settingsWindow.width()) / 2
- y = (screenGeometry.height() - self.settingsWindow.height()) / 2
- x += screenGeometry.x()
- y += screenGeometry.y()
- self.settingsWindow.setGeometry(int(x), int(y), int(self.settingsWindow.width()), int(self.settingsWindow.height()))
-
- def removeSettingsWindow(self):
- self.settingsWindow = None
-
- def closeAll(self):
- for cw in self.camWindows:
- cw.close()
-
- if self.settingsWindow != None:
- self.settingsWindow.close()
-
- self.trayIcon.setVisible(False)
-
- if __name__ == "__main__":
- app = QtWidgets.QApplication(sys.argv)
-
- tray = OctoTray(app)
- rc = app.exec_()
-
- while rc == 42:
- tray.closeAll()
- tray = OctoTray(app)
- rc = app.exec_()
-
- sys.exit(rc)
|