Source code for max30001

import sys_max30001
import interrupt
import ucollections


[docs]class MAX30001: """ The MAX30001 class provides a stram interface to the MAX30001 ECG and BIO-Z sensor. .. code-block:: python import max30001 m = max30001.MAX30001() m.read() m.close() """ def __init__( self, usb=False, bias=True, sample_rate=128, callback=None, sample_buffer_len=256, ): """ Initializes the MAX30001 (if it is not already running). :param usb: True if all ECG leads should use the USB-C connector :param bias: True if the internal bias of the ECG should be used. Mandatory if the card10 is not attached to the body and the USB-C connector is used. :param sample_rate: Selected sample rate in Hz. Supported values: 128 and 256. :param callback: If not None: A callback which is called with the data when ever new data is available :param sample_buffer: Length of the internal buffer (in samples) """ self.sample_rate = sample_rate self.callback = callback self.sample_buffer_len = sample_buffer_len self.interrupt_id = interrupt.MAX30001_ECG self.usb = usb self.bias = bias self._callback = callback self.enable_sensor()
[docs] def enable_sensor(self): """ Enables the sensor. Automatically called by __init__. """ interrupt.disable_callback(self.interrupt_id) interrupt.set_callback(self.interrupt_id, self._interrupt) self.stream_id = sys_max30001.enable_sensor( self.usb, self.bias, self.sample_rate, self.sample_buffer_len ) if self.stream_id < 0: raise ValueError("Enable sensor returned %i", self.stream_id) self.active = True if self._callback: interrupt.enable_callback(self.interrupt_id)
def __enter__(self): return self def __exit__(self, _et, _ev, _t): self.close()
[docs] def close(self): """ Close the currently open connection to the sensor. """ if self.active: self.active = False ret = sys_max30001.disable_sensor() if ret < 0: raise ValueError("Disable sensor returned %i", ret) interrupt.disable_callback(self.interrupt_id) interrupt.set_callback(self.interrupt_id, None)
[docs] def read(self): """ Read as many samples (signed integer) as currently available. """ if self.active: return sys_max30001.read_sensor(self.stream_id) return []
def _interrupt(self, _): if self.active: data = self.read() if self._callback: self._callback(data)