S&B Volcano vaporizer remote control with Pi Pico W
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

state_scan.py 4.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #!/usr/bin/env python
  2. import uasyncio as asyncio
  3. from scan import ble_scan
  4. import time
  5. class StateScan:
  6. def __init__(self, lcd):
  7. self.lcd = lcd
  8. self.lock = asyncio.Lock()
  9. def enter(self, val = None):
  10. n = None
  11. self.results = []
  12. self.current = None
  13. self.menuOff = 0
  14. self.scanner = asyncio.create_task(self.scan())
  15. def exit(self):
  16. self.scanner.cancel()
  17. if self.lock.locked():
  18. self.lock.release()
  19. return self.results[self.current][4]
  20. async def scan(self):
  21. while True:
  22. new = await ble_scan(None, None, 0.25)
  23. for n in new:
  24. name = n.name()
  25. mac = n.device.addr_hex()
  26. rssi = n.rssi
  27. value = [name, mac, rssi, time.time(), n]
  28. async with self.lock:
  29. found = False
  30. for i in range(0, len(self.results)):
  31. if self.results[i][1] == mac:
  32. found = True
  33. self.results[i][0] = name
  34. self.results[i][2] = rssi
  35. self.results[i][3] = time.time()
  36. self.results[i][4] = n
  37. break
  38. if found == False:
  39. self.results.append(value)
  40. def draw_list(self):
  41. if len(self.results) <= 0:
  42. self.lcd.textC("No devices found yet", int(self.lcd.width / 2), int(self.lcd.height / 2), self.lcd.white)
  43. return
  44. for i, d in enumerate(self.results):
  45. if i < self.menuOff:
  46. continue
  47. off = (i - self.menuOff) * 25 + 30
  48. if off >= (self.lcd.height - 10):
  49. break
  50. selection = " "
  51. if self.current == i:
  52. selection = "->"
  53. name, mac, rssi, timeout, device = self.results[i]
  54. age = int(time.time() - timeout)
  55. s1 = "{}: {}".format(i + 1, name)
  56. s2 = "{} [{}] {} {}".format(selection, mac, rssi, age)
  57. c1 = self.lcd.white
  58. if name == "S&B VOLCANO H":
  59. c1 = self.lcd.green
  60. if self.current == None:
  61. self.current = i
  62. elif name == "STORZ&BICKEL":
  63. c1 = self.lcd.yellow
  64. elif self.current == i:
  65. c1 = self.lcd.red
  66. c2 = self.lcd.white
  67. if self.current == i:
  68. c2 = self.lcd.red
  69. self.lcd.hline(0, off, self.lcd.width, self.lcd.blue)
  70. self.lcd.text(s1, 0, off + 2, c1)
  71. self.lcd.text(s2, 0, off + 12, c2)
  72. async def draw(self):
  73. self.lcd.text("Scanning for Bluetooth devices", 0, 10, self.lcd.red)
  74. keys = self.lcd.buttons()
  75. async with self.lock:
  76. if keys.once("enter") or keys.once("a"):
  77. if self.current < len(self.results):
  78. return 2
  79. elif keys.once("up"):
  80. if self.current == None:
  81. self.current = len(self.results) - 1
  82. else:
  83. self.current -= 1
  84. elif keys.once("down"):
  85. if self.current == None:
  86. self.current = 0
  87. else:
  88. self.current += 1
  89. if self.current != None:
  90. while self.current < 0:
  91. self.current += len(self.results)
  92. while self.current >= len(self.results):
  93. self.current -= len(self.results)
  94. while self.current < self.menuOff:
  95. self.menuOff -= 1
  96. while self.current >= (self.menuOff + int((self.lcd.height - 30 - 10) / 25)):
  97. self.menuOff += 1
  98. # remove entries after timeout
  99. self.results = [x for x in self.results if (time.time() - x[3]) < 10.0]
  100. # filter out incompatible devices
  101. self.results = [x for x in self.results if (x[0] != None) and (("S&B" in x[0]) or ("STORZ&BICKEL" == x[0]))]
  102. if self.current != None:
  103. if self.current >= len(self.results):
  104. self.current = len(self.results) - 1
  105. self.draw_list()
  106. return -1