Conversion for Raspberry Pi Pico
Python | MicroPython | ||
---|---|---|---|
1 | """ | 1 | """ |
2 | Sending and receiving 433/315Mhz signals with low-cost GPIO RF Modules on a Raspberry Pi. | 2 | Sending and receiving 433/315Mhz signals with low-cost GPIO RF Modules on a Raspberry Pi. |
3 | This is adjusted code for raspberry pi pico, original is from: https://github.com/milaq/rpi-rf | ||
3 | """ | 4 | """ |
4 | 5 | ||
5 | import logging | 6 | import config |
6 | import time | 7 | import time |
8 | from machine import Pin | ||
7 | from collections import namedtuple | 9 | from collections import namedtuple |
8 | 10 | ||
9 | from RPi import GPIO | ||
10 | |||
11 | MAX_CHANGES = 67 | 11 | MAX_CHANGES = 67 |
12 | 12 | ||
13 | _LOGGER = logging.getLogger(__name__) | ||
14 | |||
15 | Protocol = namedtuple('Protocol', | 13 | Protocol = namedtuple('Protocol', |
16 | ['pulselength', | 14 | ['pulselength', |
17 | 'sync_high', 'sync_low', | 15 | 'sync_high', 'sync_low', |
18 | 'zero_high', 'zero_low', | 16 | 'zero_high', 'zero_low', |
19 | 'one_high', 'one_low']) | 17 | 'one_high', 'one_low']) |
20 | PROTOCOLS = (None, | 18 | PROTOCOLS = (None, |
21 | Protocol(350, 1, 31, 1, 3, 3, 1), | 19 | Protocol(350, 1, 31, 1, 3, 3, 1), |
22 | Protocol(650, 1, 10, 1, 2, 2, 1), | 20 | Protocol(650, 1, 10, 1, 2, 2, 1), |
23 | Protocol(100, 30, 71, 4, 11, 9, 6), | 21 | Protocol(100, 30, 71, 4, 11, 9, 6), |
24 | Protocol(380, 1, 6, 1, 3, 3, 1), | 22 | Protocol(380, 1, 6, 1, 3, 3, 1), |
25 | Protocol(500, 6, 14, 1, 2, 2, 1), | 23 | Protocol(500, 6, 14, 1, 2, 2, 1), |
26 | Protocol(200, 1, 10, 1, 5, 1, 1)) | 24 | Protocol(200, 1, 10, 1, 5, 1, 1)) |
27 | 25 | ||
28 | 26 | ||
29 | class RFDevice: | 27 | class RFDevice: |
30 | """Representation of a GPIO RF device.""" | 28 | """Representation of a GPIO RF device.""" |
31 | 29 | ||
32 | # pylint: disable=too-many-instance-attributes,too-many-arguments | 30 | # pylint: disable=too-many-instance-attributes,too-many-arguments |
33 | def __init__(self, gpio, | 31 | def __init__(self, gpio = None, |
34 | tx_proto=1, tx_pulselength=None, tx_repeat=10, tx_length=24, rx_tolerance=80): | 32 | tx_proto=1, tx_pulselength=None, tx_repeat=10, tx_length=24, rx_tolerance=80): |
35 | """Initialize the RF device.""" | 33 | """Initialize the RF device.""" |
36 | self.gpio = gpio | 34 | self.gpio = gpio |
37 | self.tx_enabled = False | 35 | self.tx_enabled = False |
38 | self.tx_proto = tx_proto | 36 | self.tx_proto = tx_proto |
39 | if tx_pulselength: | 37 | if tx_pulselength: |
40 | self.tx_pulselength = tx_pulselength | 38 | self.tx_pulselength = tx_pulselength |
41 | else: | 39 | else: |
42 | self.tx_pulselength = PROTOCOLS[tx_proto].pulselength | 40 | self.tx_pulselength = PROTOCOLS[tx_proto].pulselength |
43 | self.tx_repeat = tx_repeat | 41 | self.tx_repeat = tx_repeat |
44 | self.tx_length = tx_length | 42 | self.tx_length = tx_length |
45 | self.rx_enabled = False | 43 | self.rx_enabled = False |
46 | self.rx_tolerance = rx_tolerance | 44 | self.rx_tolerance = rx_tolerance |
47 | # internal values | 45 | # internal values |
48 | self._rx_timings = [0] * (MAX_CHANGES + 1) | 46 | self._rx_timings = [0] * (MAX_CHANGES + 1) |
49 | self._rx_last_timestamp = 0 | 47 | self._rx_last_timestamp = 0 |
50 | self._rx_change_count = 0 | 48 | self._rx_change_count = 0 |
51 | self._rx_repeat_count = 0 | 49 | self._rx_repeat_count = 0 |
52 | # successful RX values | 50 | # successful RX values |
53 | self.rx_code = None | 51 | self.rx_code = None |
54 | self.rx_code_timestamp = None | 52 | self.rx_code_timestamp = None |
55 | self.rx_proto = None | 53 | self.rx_proto = None |
56 | self.rx_bitlength = None | 54 | self.rx_bitlength = None |
57 | self.rx_pulselength = None | 55 | self.rx_pulselength = None |
58 | 56 | ||
59 | GPIO.setmode(GPIO.BCM) | ||
60 | _LOGGER.debug("Using GPIO " + str(gpio)) | ||
61 | |||
62 | def cleanup(self): | 57 | def cleanup(self): |
63 | """Disable TX and RX and clean up GPIO.""" | 58 | """Disable TX and RX and clean up GPIO.""" |
64 | if self.tx_enabled: | 59 | if self.tx_enabled: |
65 | self.disable_tx() | 60 | self.disable_tx() |
66 | if self.rx_enabled: | 61 | if self.rx_enabled: |
67 | self.disable_rx() | 62 | self.disable_rx() |
68 | _LOGGER.debug("Cleanup") | 63 | print("Cleanup") |
69 | GPIO.cleanup() | ||
70 | 64 | ||
71 | def enable_tx(self): | 65 | def enable_tx(self): |
72 | """Enable TX, set up GPIO.""" | 66 | """Enable TX, set up GPIO.""" |
73 | if self.rx_enabled: | 67 | if self.rx_enabled: |
74 | _LOGGER.error("RX is enabled, not enabling TX") | 68 | print("RX is enabled, not enabling TX") |
75 | return False | 69 | return False |
76 | if not self.tx_enabled: | 70 | if not self.tx_enabled: |
77 | self.tx_enabled = True | 71 | self.tx_enabled = True |
78 | GPIO.setup(self.gpio, GPIO.OUT) | 72 | self.gpio = Pin(config.TX_PIN, Pin.OUT) |
79 | _LOGGER.debug("TX enabled") | 73 | print("TX enabled") |
80 | return True | 74 | return True |
81 | 75 | ||
82 | def disable_tx(self): | 76 | def disable_tx(self): |
83 | """Disable TX, reset GPIO.""" | 77 | """Disable TX, reset GPIO.""" |
84 | if self.tx_enabled: | 78 | if self.tx_enabled: |
85 | # set up GPIO pin as input for safety | 79 | # set up GPIO pin as input for safety |
86 | GPIO.setup(self.gpio, GPIO.IN) | 80 | self.gpio = Pin(config.TX_PIN, Pin.IN, Pin.PULL_DOWN) |
87 | self.tx_enabled = False | 81 | self.tx_enabled = False |
88 | _LOGGER.debug("TX disabled") | 82 | print("TX disabled") |
89 | return True | 83 | return True |
90 | 84 | ||
91 | def tx_code(self, code, tx_proto=None, tx_pulselength=None, tx_length=None): | 85 | def tx_code(self, code, tx_proto=None, tx_pulselength=None, tx_length=None): |
92 | """ | 86 | """ |
93 | Send a decimal code. | 87 | Send a decimal code. |
94 | 88 | ||
95 | Optionally set protocol, pulselength and code length. | 89 | Optionally set protocol, pulselength and code length. |
96 | When none given reset to default protocol, default pulselength and set code length to 24 bits. | 90 | When none given reset to default protocol, default pulselength and set code length to 24 bits. |
97 | """ | 91 | """ |
92 | self.us_sleep = 0 | ||
93 | self.start = time.ticks_us() | ||
94 | |||
98 | if tx_proto: | 95 | if tx_proto: |
99 | self.tx_proto = tx_proto | 96 | self.tx_proto = tx_proto |
100 | else: | 97 | else: |
101 | self.tx_proto = 1 | 98 | self.tx_proto = 1 |
102 | if tx_pulselength: | 99 | if tx_pulselength: |
103 | self.tx_pulselength = tx_pulselength | 100 | self.tx_pulselength = tx_pulselength |
104 | elif not self.tx_pulselength: | 101 | elif not self.tx_pulselength: |
105 | self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength | 102 | self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength |
106 | if tx_length: | 103 | if tx_length: |
107 | self.tx_length = tx_length | 104 | self.tx_length = tx_length |
108 | elif self.tx_proto == 6: | 105 | elif self.tx_proto == 6: |
109 | self.tx_length = 32 | 106 | self.tx_length = 32 |
110 | elif (code > 16777216): | 107 | elif (code > 16777216): |
111 | self.tx_length = 32 | 108 | self.tx_length = 32 |
112 | else: | 109 | else: |
113 | self.tx_length = 24 | 110 | self.tx_length = 24 |
114 | rawcode = format(code, '#0{}b'.format(self.tx_length + 2))[2:] | 111 | rawcode = "{{0:{}b}}".format(self.tx_length + 2).format(code)[2:] |
115 | if self.tx_proto == 6: | 112 | if self.tx_proto == 6: |
116 | nexacode = "" | 113 | nexacode = "" |
117 | for b in rawcode: | 114 | for b in rawcode: |
118 | if b == '0': | 115 | if b == '0': |
119 | nexacode = nexacode + "01" | 116 | nexacode = nexacode + "01" |
120 | if b == '1': | 117 | if b == '1': |
121 | nexacode = nexacode + "10" | 118 | nexacode = nexacode + "10" |
122 | rawcode = nexacode | 119 | rawcode = nexacode |
123 | self.tx_length = 64 | 120 | self.tx_length = 64 |
124 | _LOGGER.debug("TX code: " + str(code)) | 121 | print("TX code: ", str(code), " binary: ", rawcode) |
125 | return self.tx_bin(rawcode) | 122 | status = self.tx_bin(rawcode) |
126 | 123 | ||
124 | # We must not transmit too often, we also need to make sure we don't corrupt our current message, so wait a bit more between messages if some are chained by caller | ||
125 | # 500ms seems excessive, should test and reduce when time allows. | ||
126 | self.us_sleep = self.us_sleep + 500000 | ||
127 | |||
128 | # finish up any pending sleep for 0 bytes - so next transmit code doesn't mess our currently sent one. | ||
129 | while time.ticks_us() - self.start < self.us_sleep: | ||
130 | time.sleep_us(1) | ||
131 | |||
132 | return status | ||
133 | |||
134 | |||
127 | def tx_bin(self, rawcode): | 135 | def tx_bin(self, rawcode): |
128 | """Send a binary code.""" | 136 | """Send a binary code.""" |
129 | _LOGGER.debug("TX bin: " + str(rawcode)) | ||
130 | for _ in range(0, self.tx_repeat): | 137 | for _ in range(0, self.tx_repeat): |
131 | if self.tx_proto == 6: | 138 | if self.tx_proto == 6: |
132 | if not self.tx_sync(): | 139 | if not self.tx_sync(): |
133 | return False | 140 | return False |
134 | for byte in range(0, self.tx_length): | 141 | for byte in range(0, self.tx_length): |
135 | if rawcode[byte] == '0': | 142 | if rawcode[byte] == '0': |
136 | if not self.tx_l0(): | 143 | if not self.tx_l0(): |
137 | return False | 144 | return False |
138 | else: | 145 | else: |
139 | if not self.tx_l1(): | 146 | if not self.tx_l1(): |
140 | return False | 147 | return False |
141 | if not self.tx_sync(): | 148 | if not self.tx_sync(): |
142 | return False | 149 | return False |
143 | 150 | ||
144 | return True | 151 | return True |
145 | 152 | ||
146 | def tx_l0(self): | 153 | def tx_l0(self): |
147 | """Send a '0' bit.""" | 154 | """Send a '0' bit.""" |
148 | if not 0 < self.tx_proto < len(PROTOCOLS): | 155 | if not 0 < self.tx_proto < len(PROTOCOLS): |
149 | _LOGGER.error("Unknown TX protocol") | 156 | print("Unknown TX protocol") |
150 | return False | 157 | return False |
151 | return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high, | 158 | return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high, |
152 | PROTOCOLS[self.tx_proto].zero_low) | 159 | PROTOCOLS[self.tx_proto].zero_low) |
153 | 160 | ||
154 | def tx_l1(self): | 161 | def tx_l1(self): |
155 | """Send a '1' bit.""" | 162 | """Send a '1' bit.""" |
156 | if not 0 < self.tx_proto < len(PROTOCOLS): | 163 | if not 0 < self.tx_proto < len(PROTOCOLS): |
157 | _LOGGER.error("Unknown TX protocol") | 164 | print("Unknown TX protocol") |
158 | return False | 165 | return False |
159 | return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high, | 166 | return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high, |
160 | PROTOCOLS[self.tx_proto].one_low) | 167 | PROTOCOLS[self.tx_proto].one_low) |
161 | 168 | ||
162 | def tx_sync(self): | 169 | def tx_sync(self): |
163 | """Send a sync.""" | 170 | """Send a sync.""" |
164 | if not 0 < self.tx_proto < len(PROTOCOLS): | 171 | if not 0 < self.tx_proto < len(PROTOCOLS): |
165 | _LOGGER.error("Unknown TX protocol") | 172 | print("Unknown TX protocol") |
166 | return False | 173 | return False |
167 | return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high, | 174 | return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high, |
168 | PROTOCOLS[self.tx_proto].sync_low) | 175 | PROTOCOLS[self.tx_proto].sync_low) |
169 | 176 | ||
170 | def tx_waveform(self, highpulses, lowpulses): | 177 | def tx_waveform(self, highpulses, lowpulses): |
171 | """Send basic waveform.""" | 178 | """Send basic waveform.""" |
172 | if not self.tx_enabled: | 179 | if not self.tx_enabled: |
173 | _LOGGER.error("TX is not enabled, not sending data") | 180 | print("TX is not enabled, not sending data") |
174 | return False | 181 | return False |
175 | GPIO.output(self.gpio, GPIO.HIGH) | 182 | |
176 | self._sleep((highpulses * self.tx_pulselength) / 1000000) | 183 | while time.ticks_us() - self.start < self.us_sleep: |
177 | GPIO.output(self.gpio, GPIO.LOW) | 184 | time.sleep_us(1) |
178 | self._sleep((lowpulses * self.tx_pulselength) / 1000000) | 185 | |
186 | self.gpio.value(1) | ||
187 | self.start = time.ticks_us() | ||
188 | |||
189 | self.us_sleep = int(highpulses * self.tx_pulselength) # - int(int(highpulses * self.tx_pulselength)/100) | ||
190 | while time.ticks_us() - self.start < self.us_sleep: | ||
191 | time.sleep_us(1) | ||
192 | |||
193 | self.gpio.value(0) | ||
194 | self.start = time.ticks_us() | ||
195 | self.us_sleep = int(lowpulses * self.tx_pulselength) # - int(int(lowpulses * self.tx_pulselength)/100) | ||
196 | |||
179 | return True | 197 | return True |
180 | 198 | ||
181 | def enable_rx(self): | 199 | def enable_rx(self): |
182 | """Enable RX, set up GPIO and add event detection.""" | 200 | """Enable RX, set up GPIO and add event detection.""" |
183 | if self.tx_enabled: | 201 | if self.tx_enabled: |
184 | _LOGGER.error("TX is enabled, not enabling RX") | 202 | print("TX is enabled, not enabling RX") |
185 | return False | 203 | return False |
186 | if not self.rx_enabled: | 204 | if not self.rx_enabled: |
187 | self.rx_enabled = True | 205 | self.rx_enabled = True |
188 | GPIO.setup(self.gpio, GPIO.IN) | 206 | self.gpio = Pin(config.RX_PIN, Pin.IN, Pin.PULL_DOWN) |
189 | GPIO.add_event_detect(self.gpio, GPIO.BOTH) | 207 | self.gpio.irq(handler=self.rx_callback, trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING) |
190 | GPIO.add_event_callback(self.gpio, self.rx_callback) | 208 | print("RX enabled, pin: " + str(config.RX_PIN)) |
191 | _LOGGER.debug("RX enabled") | ||
192 | return True | 209 | return True |
193 | 210 | ||
194 | def disable_rx(self): | 211 | def disable_rx(self): |
195 | """Disable RX, remove GPIO event detection.""" | 212 | """Disable RX, remove GPIO event detection.""" |
196 | if self.rx_enabled: | 213 | if self.rx_enabled: |
197 | GPIO.remove_event_detect(self.gpio) | 214 | self.gpio.irq(None) |
198 | self.rx_enabled = False | 215 | self.rx_enabled = False |
199 | _LOGGER.debug("RX disabled") | 216 | print("RX disabled") |
200 | return True | 217 | return True |
201 | 218 | ||
202 | # pylint: disable=unused-argument | 219 | # pylint: disable=unused-argument |
203 | def rx_callback(self, gpio): | 220 | def rx_callback(self, gpio): |
204 | """RX callback for GPIO event detection. Handle basic signal detection.""" | 221 | """RX callback for GPIO event detection. Handle basic signal detection.""" |
205 | timestamp = int(time.perf_counter() * 1000000) | 222 | timestamp = time.ticks_us() |
206 | duration = timestamp - self._rx_last_timestamp | 223 | duration = timestamp - self._rx_last_timestamp |
207 | |||
208 | if duration > 5000: | 224 | if duration > 5000: |
209 | if abs(duration - self._rx_timings[0]) < 200: | 225 | if abs(duration - self._rx_timings[0]) < 200: |
210 | self._rx_repeat_count += 1 | 226 | self._rx_repeat_count += 1 |
211 | self._rx_change_count -= 1 | 227 | self._rx_change_count -= 1 |
212 | if self._rx_repeat_count == 2: | 228 | if self._rx_repeat_count == 2: |
213 | for pnum in range(1, len(PROTOCOLS)): | 229 | for pnum in range(1, len(PROTOCOLS)): |
214 | if self._rx_waveform(pnum, self._rx_change_count, timestamp): | 230 | if self._rx_waveform(pnum, self._rx_change_count, timestamp): |
215 | _LOGGER.debug("RX code " + str(self.rx_code)) | ||
216 | break | 231 | break |
217 | self._rx_repeat_count = 0 | 232 | self._rx_repeat_count = 0 |
218 | self._rx_change_count = 0 | 233 | self._rx_change_count = 0 |
219 | 234 | ||
220 | if self._rx_change_count >= MAX_CHANGES: | 235 | if self._rx_change_count >= MAX_CHANGES: |
221 | self._rx_change_count = 0 | 236 | self._rx_change_count = 0 |
222 | self._rx_repeat_count = 0 | 237 | self._rx_repeat_count = 0 |
223 | self._rx_timings[self._rx_change_count] = duration | 238 | self._rx_timings[self._rx_change_count] = duration |
224 | self._rx_change_count += 1 | 239 | self._rx_change_count += 1 |
225 | self._rx_last_timestamp = timestamp | 240 | self._rx_last_timestamp = timestamp |
226 | 241 | ||
227 | def _rx_waveform(self, pnum, change_count, timestamp): | 242 | def _rx_waveform(self, pnum, change_count, timestamp): |
228 | """Detect waveform and format code.""" | 243 | """Detect waveform and format code.""" |
229 | code = 0 | 244 | code = 0 |
230 | delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low) | 245 | delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low) |
231 | delay_tolerance = delay * self.rx_tolerance / 100 | 246 | delay_tolerance = delay * self.rx_tolerance / 100 |
232 | 247 | ||
233 | for i in range(1, change_count, 2): | 248 | for i in range(1, change_count, 2): |
234 | if (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high) < delay_tolerance and | 249 | if (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high) < delay_tolerance and |
235 | abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low) < delay_tolerance): | 250 | abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low) < delay_tolerance): |
236 | code <<= 1 | 251 | code <<= 1 |
237 | elif (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high) < delay_tolerance and | 252 | elif (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high) < delay_tolerance and |
238 | abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low) < delay_tolerance): | 253 | abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low) < delay_tolerance): |
239 | code <<= 1 | 254 | code <<= 1 |
240 | code |= 1 | 255 | code |= 1 |
241 | else: | 256 | else: |
242 | return False | 257 | return False |
243 | 258 | ||
244 | if self._rx_change_count > 6 and code != 0: | 259 | if self._rx_change_count > 6 and code != 0: |
245 | self.rx_code = code | 260 | self.rx_code = code |
246 | self.rx_code_timestamp = timestamp | 261 | self.rx_code_timestamp = timestamp |
247 | self.rx_bitlength = int(change_count / 2) | 262 | self.rx_bitlength = int(change_count / 2) |
248 | self.rx_pulselength = delay | 263 | self.rx_pulselength = delay |
249 | self.rx_proto = pnum | 264 | self.rx_proto = pnum |
250 | return True | 265 | return True |
251 | 266 | ||
252 | return False | 267 | return False |
253 | |||
254 | def _sleep(self, delay): | ||
255 | _delay = delay / 100 | ||
256 | end = time.time() + delay - _delay | ||
257 | while time.time() < end: | ||
258 | time.sleep(_delay) |