Comparison of rpi-rf

Conversion for Raspberry Pi Pico

PythonMicroPython
1"""1"""
2Sending and receiving 433/315Mhz signals with low-cost GPIO RF Modules on a Raspberry Pi.2Sending and receiving 433/315Mhz signals with low-cost GPIO RF Modules on a Raspberry Pi.
3This is adjusted code for raspberry pi pico, original is from: https://github.com/milaq/rpi-rf
3"""4"""
45
5import logging6import config
6import time7import time
8from machine import Pin
7from collections import namedtuple9from collections import namedtuple
810
9from RPi import GPIO
10
11MAX_CHANGES = 6711MAX_CHANGES = 67
1212
13_LOGGER = logging.getLogger(__name__)
14
15Protocol = namedtuple('Protocol',13Protocol = namedtuple('Protocol',
16 ['pulselength',14 ['pulselength',
17 'sync_high', 'sync_low',15 'sync_high', 'sync_low',
18 'zero_high', 'zero_low',16 'zero_high', 'zero_low',
19 'one_high', 'one_low'])17 'one_high', 'one_low'])
20PROTOCOLS = (None,18PROTOCOLS = (None,
21 Protocol(350, 1, 31, 1, 3, 3, 1),19 Protocol(350, 1, 31, 1, 3, 3, 1),
22 Protocol(650, 1, 10, 1, 2, 2, 1),20 Protocol(650, 1, 10, 1, 2, 2, 1),
23 Protocol(100, 30, 71, 4, 11, 9, 6),21 Protocol(100, 30, 71, 4, 11, 9, 6),
24 Protocol(380, 1, 6, 1, 3, 3, 1),22 Protocol(380, 1, 6, 1, 3, 3, 1),
25 Protocol(500, 6, 14, 1, 2, 2, 1),23 Protocol(500, 6, 14, 1, 2, 2, 1),
26 Protocol(200, 1, 10, 1, 5, 1, 1))24 Protocol(200, 1, 10, 1, 5, 1, 1))
2725
2826
29class RFDevice:27class RFDevice:
30 """Representation of a GPIO RF device."""28 """Representation of a GPIO RF device."""
3129
32 # pylint: disable=too-many-instance-attributes,too-many-arguments30 # pylint: disable=too-many-instance-attributes,too-many-arguments
33 def __init__(self, gpio,31 def __init__(self, gpio = None,
34 tx_proto=1, tx_pulselength=None, tx_repeat=10, tx_length=24, rx_tolerance=80):32 tx_proto=1, tx_pulselength=None, tx_repeat=10, tx_length=24, rx_tolerance=80):
35 """Initialize the RF device."""33 """Initialize the RF device."""
36 self.gpio = gpio34 self.gpio = gpio
37 self.tx_enabled = False35 self.tx_enabled = False
38 self.tx_proto = tx_proto36 self.tx_proto = tx_proto
39 if tx_pulselength:37 if tx_pulselength:
40 self.tx_pulselength = tx_pulselength38 self.tx_pulselength = tx_pulselength
41 else:39 else:
42 self.tx_pulselength = PROTOCOLS[tx_proto].pulselength40 self.tx_pulselength = PROTOCOLS[tx_proto].pulselength
43 self.tx_repeat = tx_repeat41 self.tx_repeat = tx_repeat
44 self.tx_length = tx_length42 self.tx_length = tx_length
45 self.rx_enabled = False43 self.rx_enabled = False
46 self.rx_tolerance = rx_tolerance44 self.rx_tolerance = rx_tolerance
47 # internal values45 # internal values
48 self._rx_timings = [0] * (MAX_CHANGES + 1)46 self._rx_timings = [0] * (MAX_CHANGES + 1)
49 self._rx_last_timestamp = 047 self._rx_last_timestamp = 0
50 self._rx_change_count = 048 self._rx_change_count = 0
51 self._rx_repeat_count = 049 self._rx_repeat_count = 0
52 # successful RX values50 # successful RX values
53 self.rx_code = None51 self.rx_code = None
54 self.rx_code_timestamp = None52 self.rx_code_timestamp = None
55 self.rx_proto = None53 self.rx_proto = None
56 self.rx_bitlength = None54 self.rx_bitlength = None
57 self.rx_pulselength = None55 self.rx_pulselength = None
5856
59 GPIO.setmode(GPIO.BCM)
60 _LOGGER.debug("Using GPIO " + str(gpio))
61
62 def cleanup(self):57 def cleanup(self):
63 """Disable TX and RX and clean up GPIO."""58 """Disable TX and RX and clean up GPIO."""
64 if self.tx_enabled:59 if self.tx_enabled:
65 self.disable_tx()60 self.disable_tx()
66 if self.rx_enabled:61 if self.rx_enabled:
67 self.disable_rx()62 self.disable_rx()
68 _LOGGER.debug("Cleanup")63 print("Cleanup")
69 GPIO.cleanup()
7064
71 def enable_tx(self):65 def enable_tx(self):
72 """Enable TX, set up GPIO."""66 """Enable TX, set up GPIO."""
73 if self.rx_enabled:67 if self.rx_enabled:
74 _LOGGER.error("RX is enabled, not enabling TX")68 print("RX is enabled, not enabling TX")
75 return False69 return False
76 if not self.tx_enabled:70 if not self.tx_enabled:
77 self.tx_enabled = True71 self.tx_enabled = True
78 GPIO.setup(self.gpio, GPIO.OUT)72 self.gpio = Pin(config.TX_PIN, Pin.OUT)
79 _LOGGER.debug("TX enabled")73 print("TX enabled")
80 return True74 return True
8175
82 def disable_tx(self):76 def disable_tx(self):
83 """Disable TX, reset GPIO."""77 """Disable TX, reset GPIO."""
84 if self.tx_enabled:78 if self.tx_enabled:
85 # set up GPIO pin as input for safety79 # set up GPIO pin as input for safety
86 GPIO.setup(self.gpio, GPIO.IN)80 self.gpio = Pin(config.TX_PIN, Pin.IN, Pin.PULL_DOWN)
87 self.tx_enabled = False81 self.tx_enabled = False
88 _LOGGER.debug("TX disabled")82 print("TX disabled")
89 return True83 return True
9084
91 def tx_code(self, code, tx_proto=None, tx_pulselength=None, tx_length=None):85 def tx_code(self, code, tx_proto=None, tx_pulselength=None, tx_length=None):
92 """86 """
93 Send a decimal code.87 Send a decimal code.
9488
95 Optionally set protocol, pulselength and code length.89 Optionally set protocol, pulselength and code length.
96 When none given reset to default protocol, default pulselength and set code length to 24 bits.90 When none given reset to default protocol, default pulselength and set code length to 24 bits.
97 """91 """
92 self.us_sleep = 0
93 self.start = time.ticks_us()
94
98 if tx_proto:95 if tx_proto:
99 self.tx_proto = tx_proto96 self.tx_proto = tx_proto
100 else:97 else:
101 self.tx_proto = 198 self.tx_proto = 1
102 if tx_pulselength:99 if tx_pulselength:
103 self.tx_pulselength = tx_pulselength100 self.tx_pulselength = tx_pulselength
104 elif not self.tx_pulselength:101 elif not self.tx_pulselength:
105 self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength102 self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength
106 if tx_length:103 if tx_length:
107 self.tx_length = tx_length104 self.tx_length = tx_length
108 elif self.tx_proto == 6:105 elif self.tx_proto == 6:
109 self.tx_length = 32106 self.tx_length = 32
110 elif (code > 16777216):107 elif (code > 16777216):
111 self.tx_length = 32108 self.tx_length = 32
112 else:109 else:
113 self.tx_length = 24110 self.tx_length = 24
114 rawcode = format(code, '#0{}b'.format(self.tx_length + 2))[2:]111 rawcode = "{{0:{}b}}".format(self.tx_length + 2).format(code)[2:]
115 if self.tx_proto == 6:112 if self.tx_proto == 6:
116 nexacode = ""113 nexacode = ""
117 for b in rawcode:114 for b in rawcode:
118 if b == '0':115 if b == '0':
119 nexacode = nexacode + "01"116 nexacode = nexacode + "01"
120 if b == '1':117 if b == '1':
121 nexacode = nexacode + "10"118 nexacode = nexacode + "10"
122 rawcode = nexacode119 rawcode = nexacode
123 self.tx_length = 64120 self.tx_length = 64
124 _LOGGER.debug("TX code: " + str(code))121 print("TX code: ", str(code), " binary: ", rawcode)
125 return self.tx_bin(rawcode)122 status = self.tx_bin(rawcode)
126123
124 # We must not transmit too often, we also need to make sure we don't corrupt our current message, so wait a bit more between messages if some are chained by caller
125 # 500ms seems excessive, should test and reduce when time allows.
126 self.us_sleep = self.us_sleep + 500000
127
128 # finish up any pending sleep for 0 bytes - so next transmit code doesn't mess our currently sent one.
129 while time.ticks_us() - self.start < self.us_sleep:
130 time.sleep_us(1)
131
132 return status
133
134
127 def tx_bin(self, rawcode):135 def tx_bin(self, rawcode):
128 """Send a binary code."""136 """Send a binary code."""
129 _LOGGER.debug("TX bin: " + str(rawcode))
130 for _ in range(0, self.tx_repeat):137 for _ in range(0, self.tx_repeat):
131 if self.tx_proto == 6:138 if self.tx_proto == 6:
132 if not self.tx_sync():139 if not self.tx_sync():
133 return False140 return False
134 for byte in range(0, self.tx_length):141 for byte in range(0, self.tx_length):
135 if rawcode[byte] == '0':142 if rawcode[byte] == '0':
136 if not self.tx_l0():143 if not self.tx_l0():
137 return False144 return False
138 else:145 else:
139 if not self.tx_l1():146 if not self.tx_l1():
140 return False147 return False
141 if not self.tx_sync():148 if not self.tx_sync():
142 return False149 return False
143150
144 return True151 return True
145152
146 def tx_l0(self):153 def tx_l0(self):
147 """Send a '0' bit."""154 """Send a '0' bit."""
148 if not 0 < self.tx_proto < len(PROTOCOLS):155 if not 0 < self.tx_proto < len(PROTOCOLS):
149 _LOGGER.error("Unknown TX protocol")156 print("Unknown TX protocol")
150 return False157 return False
151 return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high,158 return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high,
152 PROTOCOLS[self.tx_proto].zero_low)159 PROTOCOLS[self.tx_proto].zero_low)
153160
154 def tx_l1(self):161 def tx_l1(self):
155 """Send a '1' bit."""162 """Send a '1' bit."""
156 if not 0 < self.tx_proto < len(PROTOCOLS):163 if not 0 < self.tx_proto < len(PROTOCOLS):
157 _LOGGER.error("Unknown TX protocol")164 print("Unknown TX protocol")
158 return False165 return False
159 return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high,166 return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high,
160 PROTOCOLS[self.tx_proto].one_low)167 PROTOCOLS[self.tx_proto].one_low)
161168
162 def tx_sync(self):169 def tx_sync(self):
163 """Send a sync."""170 """Send a sync."""
164 if not 0 < self.tx_proto < len(PROTOCOLS):171 if not 0 < self.tx_proto < len(PROTOCOLS):
165 _LOGGER.error("Unknown TX protocol")172 print("Unknown TX protocol")
166 return False173 return False
167 return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high,174 return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high,
168 PROTOCOLS[self.tx_proto].sync_low)175 PROTOCOLS[self.tx_proto].sync_low)
169176
170 def tx_waveform(self, highpulses, lowpulses):177 def tx_waveform(self, highpulses, lowpulses):
171 """Send basic waveform."""178 """Send basic waveform."""
172 if not self.tx_enabled:179 if not self.tx_enabled:
173 _LOGGER.error("TX is not enabled, not sending data")180 print("TX is not enabled, not sending data")
174 return False181 return False
175 GPIO.output(self.gpio, GPIO.HIGH)182
176 self._sleep((highpulses * self.tx_pulselength) / 1000000)183 while time.ticks_us() - self.start < self.us_sleep:
177 GPIO.output(self.gpio, GPIO.LOW)184 time.sleep_us(1)
178 self._sleep((lowpulses * self.tx_pulselength) / 1000000)185
186 self.gpio.value(1)
187 self.start = time.ticks_us()
188
189 self.us_sleep = int(highpulses * self.tx_pulselength) # - int(int(highpulses * self.tx_pulselength)/100)
190 while time.ticks_us() - self.start < self.us_sleep:
191 time.sleep_us(1)
192
193 self.gpio.value(0)
194 self.start = time.ticks_us()
195 self.us_sleep = int(lowpulses * self.tx_pulselength) # - int(int(lowpulses * self.tx_pulselength)/100)
196
179 return True197 return True
180198
181 def enable_rx(self):199 def enable_rx(self):
182 """Enable RX, set up GPIO and add event detection."""200 """Enable RX, set up GPIO and add event detection."""
183 if self.tx_enabled:201 if self.tx_enabled:
184 _LOGGER.error("TX is enabled, not enabling RX")202 print("TX is enabled, not enabling RX")
185 return False203 return False
186 if not self.rx_enabled:204 if not self.rx_enabled:
187 self.rx_enabled = True205 self.rx_enabled = True
188 GPIO.setup(self.gpio, GPIO.IN)206 self.gpio = Pin(config.RX_PIN, Pin.IN, Pin.PULL_DOWN)
189 GPIO.add_event_detect(self.gpio, GPIO.BOTH)207 self.gpio.irq(handler=self.rx_callback, trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING)
190 GPIO.add_event_callback(self.gpio, self.rx_callback)208 print("RX enabled, pin: " + str(config.RX_PIN))
191 _LOGGER.debug("RX enabled")
192 return True209 return True
193210
194 def disable_rx(self):211 def disable_rx(self):
195 """Disable RX, remove GPIO event detection."""212 """Disable RX, remove GPIO event detection."""
196 if self.rx_enabled:213 if self.rx_enabled:
197 GPIO.remove_event_detect(self.gpio)214 self.gpio.irq(None)
198 self.rx_enabled = False215 self.rx_enabled = False
199 _LOGGER.debug("RX disabled")216 print("RX disabled")
200 return True217 return True
201218
202 # pylint: disable=unused-argument219 # pylint: disable=unused-argument
203 def rx_callback(self, gpio):220 def rx_callback(self, gpio):
204 """RX callback for GPIO event detection. Handle basic signal detection."""221 """RX callback for GPIO event detection. Handle basic signal detection."""
205 timestamp = int(time.perf_counter() * 1000000)222 timestamp = time.ticks_us()
206 duration = timestamp - self._rx_last_timestamp223 duration = timestamp - self._rx_last_timestamp
207
208 if duration > 5000:224 if duration > 5000:
209 if abs(duration - self._rx_timings[0]) < 200:225 if abs(duration - self._rx_timings[0]) < 200:
210 self._rx_repeat_count += 1226 self._rx_repeat_count += 1
211 self._rx_change_count -= 1227 self._rx_change_count -= 1
212 if self._rx_repeat_count == 2:228 if self._rx_repeat_count == 2:
213 for pnum in range(1, len(PROTOCOLS)):229 for pnum in range(1, len(PROTOCOLS)):
214 if self._rx_waveform(pnum, self._rx_change_count, timestamp):230 if self._rx_waveform(pnum, self._rx_change_count, timestamp):
215 _LOGGER.debug("RX code " + str(self.rx_code))
216 break231 break
217 self._rx_repeat_count = 0232 self._rx_repeat_count = 0
218 self._rx_change_count = 0233 self._rx_change_count = 0
219234
220 if self._rx_change_count >= MAX_CHANGES:235 if self._rx_change_count >= MAX_CHANGES:
221 self._rx_change_count = 0236 self._rx_change_count = 0
222 self._rx_repeat_count = 0237 self._rx_repeat_count = 0
223 self._rx_timings[self._rx_change_count] = duration238 self._rx_timings[self._rx_change_count] = duration
224 self._rx_change_count += 1239 self._rx_change_count += 1
225 self._rx_last_timestamp = timestamp240 self._rx_last_timestamp = timestamp
226241
227 def _rx_waveform(self, pnum, change_count, timestamp):242 def _rx_waveform(self, pnum, change_count, timestamp):
228 """Detect waveform and format code."""243 """Detect waveform and format code."""
229 code = 0244 code = 0
230 delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low)245 delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low)
231 delay_tolerance = delay * self.rx_tolerance / 100246 delay_tolerance = delay * self.rx_tolerance / 100
232247
233 for i in range(1, change_count, 2):248 for i in range(1, change_count, 2):
234 if (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high) < delay_tolerance and249 if (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high) < delay_tolerance and
235 abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low) < delay_tolerance):250 abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low) < delay_tolerance):
236 code <<= 1251 code <<= 1
237 elif (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high) < delay_tolerance and252 elif (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high) < delay_tolerance and
238 abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low) < delay_tolerance):253 abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low) < delay_tolerance):
239 code <<= 1254 code <<= 1
240 code |= 1255 code |= 1
241 else:256 else:
242 return False257 return False
243258
244 if self._rx_change_count > 6 and code != 0:259 if self._rx_change_count > 6 and code != 0:
245 self.rx_code = code260 self.rx_code = code
246 self.rx_code_timestamp = timestamp261 self.rx_code_timestamp = timestamp
247 self.rx_bitlength = int(change_count / 2)262 self.rx_bitlength = int(change_count / 2)
248 self.rx_pulselength = delay263 self.rx_pulselength = delay
249 self.rx_proto = pnum264 self.rx_proto = pnum
250 return True265 return True
251266
252 return False267 return False
253
254 def _sleep(self, delay):
255 _delay = delay / 100
256 end = time.time() + delay - _delay
257 while time.time() < end:
258 time.sleep(_delay)
Fork this page...