import sys_max86150
import uerrno
import interrupt
import ucollections
Max86150Data = ucollections.namedtuple("Max86150Data", ["red", "infrared", "ecg"])
[docs]class MAX86150:
"""
The MAX86150 class provides a stream interface to the MAX86150 PPG and ECG.
**Example**:
.. code-block:: python
import max86150
m = max86150.MAX86150()
data = m.read()
for sample in data:
print("Red: {} Infrared: {} ECG: {}", sample.red, sample.infrared, sample.ecg)
m.close()
.. versionadded:: 1.16
"""
def __init__(
self, callback=None, sample_buffer_len=128, sample_rate=200, prox_callback=None
):
"""
Initializes the MAX86150 (if it is not already running).
:param callback: If not None: A callback which is called with the data
when ever new data is available
"""
self.active = False
self.stream_id = -uerrno.ENODEV
self.interrupt_id = interrupt.MAX86150
self._callback = callback
self._prox_callback = prox_callback
self.sample_rate = sample_rate
self.sample_buffer_len = sample_buffer_len
self.enable_sensor()
[docs] def enable_sensor(self):
"""
Enables the sensor.
Automatically called when instanciating the sensor object.
"""
interrupt.disable_callback(self.interrupt_id)
interrupt.disable_callback(interrupt.MAX86150_PROX)
interrupt.set_callback(self.interrupt_id, self._interrupt)
interrupt.set_callback(interrupt.MAX86150_PROX, self._prox_interrupt)
self.stream_id = sys_max86150.enable_sensor(
self.sample_buffer_len, self.sample_rate
)
self.active = True
if self._callback:
interrupt.enable_callback(self.interrupt_id)
if self._prox_callback:
interrupt.enable_callback(interrupt.MAX86150_PROX)
def __enter__(self):
return self
def __exit__(self, _et, _ev, _t):
self.close()
[docs] def close(self):
"""
Close the currently open connection to the sensor.
"""
if self.active:
self.active = False
sys_max86150.disable_sensor()
interrupt.disable_callback(self.interrupt_id)
interrupt.disable_callback(interrupt.MAX86150)
interrupt.set_callback(self.interrupt_id, None)
interrupt.set_callback(interrupt.MAX86150, None)
[docs] def read(self):
"""
Read as many samples as currently available.
"""
assert self.active, "Sensor is inactive"
result = []
for sample in sys_max86150.read_sensor(self.stream_id):
result.append(self._convert(sample))
return result
def _convert(self, sample):
return Max86150Data(sample[0], sample[1], sample[2])
def _interrupt(self, _):
if self.active:
data = self.read()
if self._callback:
self._callback(data)
def _prox_interrupt(self, _):
if self.active:
if self._prox_callback:
self._prox_callback()