Browse Source

add simplepyble implementation to master branch

Thomas Buck 3 months ago
parent
commit
b484c1ae53
5 changed files with 269 additions and 0 deletions
  1. 1
    0
      test_bleak/.gitignore
  2. 1
    0
      test_simplepyble/.gitignore
  3. 112
    0
      test_simplepyble/flow.py
  4. 114
    0
      test_simplepyble/poll.py
  5. 41
    0
      test_simplepyble/scan.py

+ 1
- 0
test_bleak/.gitignore View File

@@ -0,0 +1 @@
1
+venv

+ 1
- 0
test_simplepyble/.gitignore View File

@@ -0,0 +1 @@
1
+venv

+ 112
- 0
test_simplepyble/flow.py View File

@@ -0,0 +1,112 @@
1
+#!/usr/bin/env python
2
+
3
+import sys
4
+import time
5
+import os
6
+
7
+from poll import (
8
+    ble_conn,
9
+    get_current_temp,
10
+    get_target_temp, set_target_temp,
11
+    get_unit_is_fahrenheit,
12
+    get_state, set_state
13
+)
14
+
15
+terminal_width = os.get_terminal_size().columns - 15
16
+
17
+def print_bar(value, start, end, unit):
18
+    width = terminal_width
19
+    s = "\r"
20
+    s += "#" * int((value - start) / (end - start) * width)
21
+    s += "-" * (width - int((value - start) / (end - start) * width))
22
+    s += " {}{}".format(value, unit)
23
+    print(s, end="", flush=True)
24
+
25
+def sleep(t):
26
+    w = terminal_width
27
+    if t < w:
28
+        w = int(t)
29
+    print_bar(0, 0, w, "s")
30
+    for i in range(0, w):
31
+        time.sleep(t / w)
32
+        print_bar(i + 1, 0, w, "s")
33
+    print()
34
+
35
+def wait_for_temp(client, temp):
36
+    print("Setting temperature {}".format(temp))
37
+    set_target_temp(client, temp)
38
+
39
+    print("Waiting for temperature to rise...")
40
+    start = get_current_temp(client)
41
+    curr = start
42
+    print_bar(curr, start, temp, " degC")
43
+    while curr < temp:
44
+        time.sleep(1.0)
45
+        curr = get_current_temp(client)
46
+        print_bar(curr, start, temp, " degC")
47
+    print()
48
+
49
+    print("Reached temperature {}".format(temp))
50
+
51
+def flow_step(client, temp, t_wait, t_pump):
52
+    wait_for_temp(client, temp)
53
+
54
+    print("Waiting {}s for heat to settle...".format(t_wait))
55
+    sleep(t_wait)
56
+
57
+    print("Pumping for {}s".format(t_pump))
58
+    set_state(client, (True, True)) # turn on pump
59
+    sleep(t_pump)
60
+    set_state(client, (True, False)) # turn off pump
61
+
62
+def flow(client):
63
+    print("Turning on heater")
64
+    set_state(client, (True, False))
65
+
66
+    flow_step(client, 185.0, 10.0, 5.0)
67
+    flow_step(client, 195.0, 5.0, 20.0)
68
+    flow_step(client, 205.0, 5.0, 20.0)
69
+
70
+    print("Notification by pumping three times...")
71
+    for i in range(0, 3):
72
+        time.sleep(1.0)
73
+        set_state(client, (True, True)) # turn on pump
74
+        time.sleep(1.0)
75
+        set_state(client, (True, False)) # turn off pump
76
+
77
+    print("Turning heater off")
78
+    set_state(client, (False, False)) # turn off heater and pump
79
+
80
+    print("Resetting temperature")
81
+    set_target_temp(client, 190.0)
82
+
83
+if __name__ == "__main__":
84
+    def main(address, adapter):
85
+        client = ble_conn(address, adapter)
86
+
87
+        try:
88
+            if get_unit_is_fahrenheit(client):
89
+                raise RuntimeError("Imperial American scum is currently not supported :P")
90
+
91
+            print("Starting Workflow")
92
+            flow(client)
93
+        except:
94
+            print("\nTurning heater and pump off")
95
+            set_state(client, (False, False)) # turn off heater and pump
96
+
97
+            print("Disconnecting")
98
+            client.disconnect()
99
+
100
+            raise
101
+
102
+        print("Disconnecting")
103
+        client.disconnect()
104
+
105
+    adapter = None
106
+    mac = None
107
+    if len(sys.argv) > 1:
108
+        adapter = int(sys.argv[1])
109
+    if len(sys.argv) > 2:
110
+        mac = sys.argv[2]
111
+
112
+    main(mac, adapter)

+ 114
- 0
test_simplepyble/poll.py View File

@@ -0,0 +1,114 @@
1
+#!/usr/bin/env python
2
+
3
+import simplepyble
4
+from scan import ble_scan
5
+import time
6
+
7
+serviceUuidVolcano3 = "10100000-5354-4f52-5a26-4249434b454c"
8
+serviceUuidVolcano4 = "10110000-5354-4f52-5a26-4249434b454c"
9
+
10
+def ble_conn(address, adapter):
11
+    dev = ble_scan(address, adapter)
12
+
13
+    if dev != None:
14
+        address = dev.address()
15
+        print("Connecting to '{}'...".format(address))
16
+        dev.connect()
17
+
18
+    return dev
19
+
20
+def get_current_temp(device):
21
+    val = device.read(serviceUuidVolcano4, "10110001-5354-4f52-5a26-4249434b454c")
22
+    num = int.from_bytes(val, byteorder="little")
23
+    return num / 10.0
24
+
25
+def get_target_temp(device):
26
+    val = device.read(serviceUuidVolcano4, "10110003-5354-4f52-5a26-4249434b454c")
27
+    num = int.from_bytes(val, byteorder="little")
28
+    return num / 10.0
29
+
30
+def set_target_temp(device, temp):
31
+    val = int(temp * 10.0)
32
+    d = val.to_bytes(4, byteorder="little")
33
+    device.write_request(serviceUuidVolcano4, "10110003-5354-4f52-5a26-4249434b454c", d)
34
+
35
+def get_unit_is_fahrenheit(device):
36
+    val = device.read(serviceUuidVolcano3, "1010000d-5354-4f52-5a26-4249434b454c")
37
+    num = int.from_bytes(val, byteorder="little")
38
+    return (num & 0x200) != 0
39
+
40
+def get_state(device):
41
+    val = device.read(serviceUuidVolcano3, "1010000c-5354-4f52-5a26-4249434b454c")
42
+    num = int.from_bytes(val, byteorder="little")
43
+    heater = (num & 0x0020) != 0
44
+    pump = (num & 0x2000) != 0
45
+    return (heater, pump)
46
+
47
+def set_state(device, state):
48
+    heater, pump = state
49
+    if heater:
50
+        device.write_request(serviceUuidVolcano4, "1011000f-5354-4f52-5a26-4249434b454c", 0)
51
+    else:
52
+        device.write_request(serviceUuidVolcano4, "10110010-5354-4f52-5a26-4249434b454c", 0)
53
+    if pump:
54
+        device.write_request(serviceUuidVolcano4, "10110013-5354-4f52-5a26-4249434b454c", 0)
55
+    else:
56
+        device.write_request(serviceUuidVolcano4, "10110014-5354-4f52-5a26-4249434b454c", 0)
57
+
58
+if __name__ == "__main__":
59
+    def test_poll(device):
60
+        temp = get_current_temp(device)
61
+        print("Current Temperature: {}".format(temp))
62
+
63
+        target = get_target_temp(device)
64
+        print("Target Temperature: {}".format(target))
65
+
66
+        fahrenheit = get_unit_is_fahrenheit(device)
67
+        if fahrenheit:
68
+            print("Unit is Fahrenheit")
69
+        else:
70
+            print("Unit is Celsius")
71
+
72
+        heater, pump = get_state(device)
73
+        if heater:
74
+            print("Heater is On")
75
+        else:
76
+            print("Heater is Off")
77
+        if pump:
78
+            print("Pump is On")
79
+        else:
80
+            print("Pump is Off")
81
+
82
+    def test(address, adapter):
83
+        device = ble_conn(address, adapter)
84
+        if device == None:
85
+            return
86
+
87
+        try:
88
+            print("Writing...")
89
+            set_target_temp(device, 190.0)
90
+
91
+            print("Reading...")
92
+            for i in range(0, 5):
93
+                test_poll(device)
94
+                print()
95
+                time.sleep(2.0)
96
+        except:
97
+            print("Disconnecting")
98
+            client.disconnect()
99
+
100
+            raise
101
+
102
+        print("Disconnecting")
103
+        client.disconnect()
104
+
105
+    import sys
106
+
107
+    adapter = None
108
+    mac = None
109
+    if len(sys.argv) > 1:
110
+        adapter = int(sys.argv[1])
111
+    if len(sys.argv) > 2:
112
+        mac = sys.argv[2]
113
+
114
+    test(mac, adapter)

+ 41
- 0
test_simplepyble/scan.py View File

@@ -0,0 +1,41 @@
1
+import simplepyble
2
+
3
+def ble_scan(addr = None, adapterIndex = 0, timeout = 1):
4
+    adapters = simplepyble.Adapter.get_adapters()
5
+
6
+    if len(adapters) == 0:
7
+        print("No adapters found")
8
+        return None
9
+
10
+    adapter = adapters[adapterIndex]
11
+    print("Selected adapter: {} [{}]".format(adapter.identifier(), adapter.address()))
12
+
13
+    # TODO abort scan when found?
14
+    print("Scanning for '{}' for {}s...".format(addr, timeout))
15
+    adapter.scan_for(timeout * 1000)
16
+
17
+    peripherals = adapter.scan_get_results()
18
+    for peripheral in peripherals:
19
+        if addr != None:
20
+            if addr == peripheral.address():
21
+                return peripheral
22
+        else:
23
+            if peripheral.identifier() == "S&B VOLCANO H":
24
+                return peripheral
25
+
26
+    print("No device found")
27
+    return None
28
+
29
+if __name__ == "__main__":
30
+    import sys
31
+
32
+    adapter = None
33
+    mac = None
34
+    if len(sys.argv) > 1:
35
+        adapter = int(sys.argv[1])
36
+    if len(sys.argv) > 2:
37
+        mac = sys.argv[2]
38
+
39
+    dev = ble_scan(mac, adapter)
40
+    if dev != None:
41
+        print("{} {}".format(dev.identifier(), dev.address()))

Loading…
Cancel
Save