Source code for max86150

import sys_max86150
import uerrno
import interrupt
import ucollections

Max86150Data = ucollections.namedtuple("Max86150Data", ["red", "infrared", "ecg"])


[docs]class MAX86150: """ The MAX86150 class provides a stream interface to the MAX86150 PPG and ECG. **Example**: .. code-block:: python import max86150 m = max86150.MAX86150() data = m.read() for sample in data: print("Red: {} Infrared: {} ECG: {}", sample.red, sample.infrared, sample.ecg) m.close() .. versionadded:: 1.16 """ def __init__( self, callback=None, sample_buffer_len=128, sample_rate=200, prox_callback=None ): """ Initializes the MAX86150 (if it is not already running). :param callback: If not None: A callback which is called with the data when ever new data is available """ self.active = False self.stream_id = -uerrno.ENODEV self.interrupt_id = interrupt.MAX86150 self._callback = callback self._prox_callback = prox_callback self.sample_rate = sample_rate self.sample_buffer_len = sample_buffer_len self.enable_sensor()
[docs] def enable_sensor(self): """ Enables the sensor. Automatically called when instanciating the sensor object. """ interrupt.disable_callback(self.interrupt_id) interrupt.disable_callback(interrupt.MAX86150_PROX) interrupt.set_callback(self.interrupt_id, self._interrupt) interrupt.set_callback(interrupt.MAX86150_PROX, self._prox_interrupt) self.stream_id = sys_max86150.enable_sensor( self.sample_buffer_len, self.sample_rate ) self.active = True if self._callback: interrupt.enable_callback(self.interrupt_id) if self._prox_callback: interrupt.enable_callback(interrupt.MAX86150_PROX)
def __enter__(self): return self def __exit__(self, _et, _ev, _t): self.close()
[docs] def close(self): """ Close the currently open connection to the sensor. """ if self.active: self.active = False sys_max86150.disable_sensor() interrupt.disable_callback(self.interrupt_id) interrupt.disable_callback(interrupt.MAX86150) interrupt.set_callback(self.interrupt_id, None) interrupt.set_callback(interrupt.MAX86150, None)
[docs] def read(self): """ Read as many samples as currently available. """ assert self.active, "Sensor is inactive" result = [] for sample in sys_max86150.read_sensor(self.stream_id): result.append(self._convert(sample)) return result
def _convert(self, sample): return Max86150Data(sample[0], sample[1], sample[2]) def _interrupt(self, _): if self.active: data = self.read() if self._callback: self._callback(data) def _prox_interrupt(self, _): if self.active: if self._prox_callback: self._prox_callback()