S&B Volcano vaporizer remote control with Pi Pico W
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

state_notify.py 2.6KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!/usr/bin/env python3
  2. # ----------------------------------------------------------------------------
  3. # Copyright (c) 2023 Thomas Buck (thomas@xythobuz.de)
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # See <http://www.gnu.org/licenses/>.
  16. # ----------------------------------------------------------------------------
  17. import time
  18. import uasyncio as asyncio
  19. from poll import set_state
  20. from state_wait_temp import draw_graph
  21. class StateNotify:
  22. def __init__(self, lcd):
  23. self.lcd = lcd
  24. self.lock = asyncio.Lock()
  25. def enter(self, val = None):
  26. self.value = val
  27. self.done = False
  28. self.step = 0
  29. self.max = 0
  30. self.notifier = asyncio.create_task(self.notify())
  31. def exit(self):
  32. self.notifier.cancel()
  33. if self.lock.locked():
  34. self.lock.release()
  35. return (self.value[0], self.value[1], self.value[2])
  36. async def notify(self):
  37. device, workflow, index = self.value
  38. count, duration = workflow["notify"]
  39. async with self.lock:
  40. self.max = count * 6
  41. for i in range(0, count):
  42. await asyncio.sleep_ms(int(duration * 500))
  43. async with self.lock:
  44. self.step += 1
  45. await asyncio.sleep_ms(int(duration * 500))
  46. async with self.lock:
  47. self.step += 1
  48. await set_state(device, (None, True))
  49. async with self.lock:
  50. self.step += 1
  51. await asyncio.sleep_ms(int(duration * 500))
  52. async with self.lock:
  53. self.step += 1
  54. await asyncio.sleep_ms(int(duration * 500))
  55. async with self.lock:
  56. self.step += 1
  57. await set_state(device, (None, False))
  58. async with self.lock:
  59. self.step += 1
  60. async with self.lock:
  61. self.done = True
  62. async def draw(self):
  63. self.lcd.text("Running Workflow - Notify", 0, 10, self.lcd.red)
  64. keys = self.lcd.buttons()
  65. if keys.once("y"):
  66. return 4
  67. async with self.lock:
  68. draw_graph(self.lcd, 0, self.step, self.max)
  69. if self.done:
  70. return 4
  71. return -1