S&B Volcano vaporizer remote control with Pi Pico W
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

state_scan.py 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. #!/usr/bin/env python3
  2. # ----------------------------------------------------------------------------
  3. # Copyright (c) 2023 Thomas Buck (thomas@xythobuz.de)
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # See <http://www.gnu.org/licenses/>.
  16. # ----------------------------------------------------------------------------
  17. import uasyncio as asyncio
  18. from scan import ble_scan
  19. import time
  20. class StateScan:
  21. def __init__(self, lcd):
  22. self.lcd = lcd
  23. self.lock = asyncio.Lock()
  24. def enter(self, val = None):
  25. n = None
  26. self.results = []
  27. self.current = None
  28. self.menuOff = 0
  29. self.scanner = asyncio.create_task(self.scan())
  30. def exit(self):
  31. self.scanner.cancel()
  32. if self.lock.locked():
  33. self.lock.release()
  34. return self.results[self.current][4]
  35. async def scan(self):
  36. while True:
  37. new = await ble_scan(None, None, 0.25)
  38. for n in new:
  39. name = n.name()
  40. mac = n.device.addr_hex()
  41. rssi = n.rssi
  42. value = [name, mac, rssi, time.time(), n]
  43. async with self.lock:
  44. found = False
  45. for i in range(0, len(self.results)):
  46. if self.results[i][1] == mac:
  47. found = True
  48. self.results[i][0] = name
  49. self.results[i][2] = rssi
  50. self.results[i][3] = time.time()
  51. self.results[i][4] = n
  52. break
  53. if found == False:
  54. self.results.append(value)
  55. def draw_list(self):
  56. if len(self.results) <= 0:
  57. self.lcd.textC("No devices found yet", int(self.lcd.width / 2), int(self.lcd.height / 2), self.lcd.white)
  58. return
  59. for i, d in enumerate(self.results):
  60. if i < self.menuOff:
  61. continue
  62. off = (i - self.menuOff) * 25 + 30
  63. if off >= (self.lcd.height - 10):
  64. break
  65. selection = " "
  66. if self.current == i:
  67. selection = "->"
  68. name, mac, rssi, timeout, device = self.results[i]
  69. age = int(time.time() - timeout)
  70. s1 = "{}: {}".format(i + 1, name)
  71. s2 = "{} [{}] {} {}".format(selection, mac, rssi, age)
  72. c1 = self.lcd.white
  73. if name == "S&B VOLCANO H":
  74. c1 = self.lcd.green
  75. if self.current == None:
  76. self.current = i
  77. elif name == "STORZ&BICKEL":
  78. c1 = self.lcd.yellow
  79. elif self.current == i:
  80. c1 = self.lcd.red
  81. c2 = self.lcd.white
  82. if self.current == i:
  83. c2 = self.lcd.red
  84. self.lcd.hline(0, off, self.lcd.width, self.lcd.blue)
  85. self.lcd.text(s1, 0, off + 2, c1)
  86. self.lcd.text(s2, 0, off + 12, c2)
  87. async def draw(self):
  88. self.lcd.text("Scanning for Bluetooth devices", 0, 10, self.lcd.red)
  89. keys = self.lcd.buttons()
  90. async with self.lock:
  91. if keys.once("enter") or keys.once("a"):
  92. if self.current < len(self.results):
  93. return 2
  94. elif keys.once("up"):
  95. if self.current == None:
  96. self.current = len(self.results) - 1
  97. else:
  98. self.current -= 1
  99. elif keys.once("down"):
  100. if self.current == None:
  101. self.current = 0
  102. else:
  103. self.current += 1
  104. elif keys.once("y"):
  105. return 0
  106. if self.current != None:
  107. while self.current < 0:
  108. self.current += len(self.results)
  109. while self.current >= len(self.results):
  110. self.current -= len(self.results)
  111. while self.current < self.menuOff:
  112. self.menuOff -= 1
  113. while self.current >= (self.menuOff + int((self.lcd.height - 30 - 10) / 25)):
  114. self.menuOff += 1
  115. # remove entries after timeout
  116. self.results = [x for x in self.results if (time.time() - x[3]) < 10.0]
  117. # filter out incompatible devices
  118. self.results = [x for x in self.results if (x[0] != None) and (("S&B" in x[0]) or ("STORZ&BICKEL" == x[0]))]
  119. if self.current != None:
  120. if self.current >= len(self.results):
  121. self.current = len(self.results) - 1
  122. self.draw_list()
  123. return -1