Python Django实现基于MAX30102光电容积脉搏心率检测详解

一、课程设计要求

1.1 基本要求
  • 研究基于光电容积脉搏波提取心率的基本原理;
  • 连接单片机和传感器(模拟),用通信模块发给服务端;
  • 服务端能接收数据,并经过分析计算,得出心率,能显示脉搏波图形;
  • 心率异常时可以发出警告;
  • 能显示历史数据和当前数据;
  • 能在 windows桌面程序或 WEB界面 、安卓APP上显示;
  • 1.2 发挥部分
  • 数据的去噪和预处理;
  • 算法优化,比较多个算法,采用最优或改进某算法;
  • 界面设计美观,可模拟实时监测场景;
  • 可以在移动端实时查看心率;
  • 采集端通过HTTP传输数据给服务端;
  • 支持多用户数据;
  • 1.3 整体架构

    img

    1.3.1 数据采集部分

    由于没有实际的硬件设备,数据采集通过一个数据采集客户端来模拟,数据采集客户端主要完成以下工作:

  • 读取脉搏波数据文件,数据文件包含不同用户在静坐、跑步、上楼运动后三种状态的数据;这些数据是通过MAX30102传感器采集到的;

  • 调用HTTP接口将采样的数据文件发送到服务端,上报的采样数据需要符合以下要求:

  • 脉搏波数据采样频率为50Hz
  • 采样数据包括红外和红光数据;
  • 数据格式如下:

    采样时间                      ired    red      采样用户
    2024-03-11 19:40:51.263292  85831   71924     user1
    2024-03-11 19:40:51.283700  84490   75423     user1
    2024-03-11 19:40:51.304108  90760   78584     user1
    ......
    1.3.2 服务端

    (1) 服务端需要提供数据采集服务,接口定义:

    POST /scada/heart/sample
    [{
    	"time":"采样时间",
        "infrared":"红外",
        "red": "红光",
        "user_name": "user1"
    }]

    客户端采集程序,1次上报50个采样点,即1秒上报一次。

    (2) 服务端需要根据采集到的红外、红光计算心率值,并保存到数据库;

    (3) 服务端在计算得到心率后需要向前端推送用户的实时数据,心率异常时需要推送告警信息,同时将告警数据入库;可以通过webstock实现;

    (4) 服务端需要提供心率历史数据查询接口,接口定义;

    GET /sacda/heart/rate?user_name=xxxx&&start_time=xxxx&&end_time=xxx
    1.3.3 数据表设计

    服务端需要提供将采集的数据存储到数据库,创建了一个名为 sensor_data 的数据表;

    CREATE TABLE sensor_data (
        id INT AUTO_INCREMENT PRIMARY KEY,
        user_name VARCHAR(255) NOT NULL,
        sampe_time DATETIME NOT NULL,
        infrared DECIMAL(10, 2) NOT NULL,
        red DECIMAL(10, 2) NOT NULL,
        create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );

    具体说明如下:

  • id:作为主键,使用AUTO_INCREMENT自增,确保每条记录有唯一的标识;
  • username:存储用户名的字段,采用VARCHAR类型,长度为255
  • sampling_time:存储采样时间的字段,采用DATETIME类型;
  • infraredred:分别存储红外和红光数据的字段,采用DECIMAL类型,保留两位小数;
  • create_time:存储创建时间的字段,使用TIMESTAMP类型,并设置默认值为当前时间。
  • 服务端需要存储分析后的心率值,创建一个名为heart_rate_data的数据表;

    CREATE TABLE heart_rate_data (
        id INT PRIMARY KEY AUTO_INCREMENT,
        user_name VARCHAR(255),
        sampe_time DATETIME NOT NULL,
        heart_rate_value FLOAT,
        create_time TIMESTAMP
    );

    其中:

  • id:主键字段,用于唯一标识每条记录。可以使用自增的整数类型(INT)来实现自动生成和唯一性;
  • user_name:用户名称字段,用于记录心率数据对应的用户名称;
  • sampling_time:存储采样时间的字段,采用DATETIME类型;
  • heart_rate_value:心率值字段,用于存储测量到的心率数值;
  • create_time:创建时间字段,用于记录心率数据的创建时间;
  • 服务端需要存储心率异常的信息,创建一个名为heart_rate_abnormal_data 的数据表;

    CREATE TABLE heart_rate_abnormal_data (
        id INT PRIMARY KEY AUTO_INCREMENT,
        user_name VARCHAR(255),
        heart_rate_id INT,
        abnormal_info TEXT,
        create_time TIMESTAMP
    );

    其中:

  • id:主键字段,用于唯一标识每条心率异常记录;
  • heart_rate_id:心率ID字段,用于关联到心率数据表中的相应心率数据;
  • abnormal_info:异常信息字段,用于记录心率异常的具体信息;
  • create_time:创建时间字段,用于记录心率异常数据的创建时间;
  • 1.3.4 页面原型

    二、MAX30102采集数据处理

    MAX30102传感器是一款集成脉搏血氧仪和心率监测器模块。MAX30102包括内部LED、光电探测器、光学元件以及低噪声电子元件,具有环境光反射特征。

    MAX30102采用单个1.8V电源和用于内部LED的独立3.3V电源。通信接口采用标准的I2C兼容接口。该模块可以通过软件进行关闭,达到零待机电流,使电源始终保持通电状态。

    2.1 PPG信号介绍

    当光照射到皮肤组织上时,血液的流动会导致对光的吸收率发生变化,因为血液对光的吸收性质不同于其他组织(如骨骼、肌肉)。这种现象被称为脉动光吸收,是通过测量皮肤表面的反射光信号来间接监测血液流动情况的一种方法。

    在脉动光吸收信号中,通常包含两部分信号:直流信号(DC)和交流信号(AC);

  • 直流信号代表了来自组织、骨骼、肌肉、静脉血等的对光的基本吸收,这部分信号相对稳定;
  • 交流信号则反映了动脉血流动引起的光吸收变化,即脉动信号,这部分信号随着心脏搏动而产生周期性变化;
  • 通过分析交流信号和直流信号,我们可以计算出一些生理参数,如心率和血氧饱和度。心率通常通过分析交流信号中的脉动频率来估计,而血氧饱和度则可以通过不同波长光的吸收比率来计算。

    如下图所示,直流信号DC反映的是组织、骨骼、肌肉、静脉血等等,交流信号AC则是反映了动脉血流动情况。根据ACDC信号能够计算出心率、血氧。

    2.2 硬件介绍
    2.2.1 总体架构

    MAX30102结构包括两个光电二极管(RED:红光IR:红外)、接收器、ADC通道、数字滤波器、数据寄存器和I2C通信模块。

    2.2.1 寄存器

    寄存器包括三大部分:状态寄存器、设置寄存器、温度寄存器。另外还有版本号和设备ID的寄存器,如下图;

    2.3 I2C通信

    有关I2C通信协议内容这里不做过多的介绍,有兴趣可以参考《通信协议-I2C》。

    2.3.1 设备地址

    MAX30102的从设备ID由七位固定位B7–B1组成(设置为0b1010111)。最高有效的从设备ID位(B7)首先传输,然后是剩余的位。下表显示了该设备可能的从设备ID

    B7 B6 B5 B4 B3 B2 B1 B0 写地址 读地址
    1 0 1 0 1 1 1 \(R/\overline{W}\) 0xAE 0XAF
    2.3.2 开始、停止、应答信号

    2.3.3 写数据

    流程如下

  • 主设备发送START起始信号;
  • 主设备首先向MAX30102发送从设备ID字节,等待ACK确认信号;
  • MAX30102发送ACK确认信号;
  • 主设备向MAX30102发送字节地址(要写入的I2C寄存器),等待ACK确认信号;
  • MAX30102发送ACK确认信号;
  • 主设备向MAX30102发送一个字节数据;等待ACK确认信号;
  • MAX30102发送ACK确认信号;
  • 主设备发起STOP终止信号;
  • 2.3.4 读数据

    流程如下

  • 主设备发送START起始信号;

  • 主设备首先向MAX30102发送从设备ID字节,等待ACK确认信号;

  • MAX30102发送ACK确认信号;

  • 主设备向MAX30102发送字节地址(要读取的I2C寄存器),等待ACK确认信号;

  • MAX30102发送ACK确认信号;

  • 主设备向MAX30102发送一个重复起始(Sr)条件,等待ACK确认信号;

  • MAX30102发送ACK确认信号;

  • 主设备接收来自MAX30102的数据,接收到数据后会向MAX30102发送一个无效响应(NACK);

  • 主设备发起STOP终止信号;

  • 其中:MAX30102开始发送数据,从第一个操作中选择的寄存器开始。读取指针会自动递增,因此设备会继续按顺序从其他寄存器发送数据,直到接收到停止(P)条件。唯一的例外是FIFO_DATA寄存器,在读取额外字节时读取指针不再递增。要在FIFO_DATA之后读取下一个寄存器,需要进行I2C写命令以更改读取指针的位置。

    下图展示了读取一个字节和多个字节数据的过程;

    2.4 红外和红光数据分析

    这里我们有一些采集到的红外和红光数据,比如采样者1静坐的采样数据,第一列是红外数据,第二列是红光数据;

    ired      red
    87580   73454
    85876   75277
    92812   78354
    92749   78335
    92458   78231
    92055   78090
    91681   77960
    .......

    接下来我们将对这些数据进行分析预处理,计算得到心率。

    我们新建一个项目文件夹名字为:基于光电容积脉搏的心率检测,在该文件夹下创建一个用于数据分析的文件sample_data_analysis.py

    2.4.1 原始信号波形图

    这里我们需要读取txt文件,加载数据;

    def __init__(self, file_name, sampling_rate):
    	"""
    	加载MAX30102采样数据
    	:param file_name 文件名
    	:param sampling_rate 采样率
    	:return:
    	"""
    	# 存放红外数据
    	self.ired = []
    	# 存放红光数据
    	self.red = []
    	self.__sampling_rate = sampling_rate
    	with open(file_name, 'r') as file:
    		first_line_skipped = False
    		for line in file:
    			if not first_line_skipped:
    				first_line_skipped = True
    				continue
    			columns = line.strip().split()
    			self.ired.append(int(columns[0]))
    			self.red.append(int(columns[1]))
    
    	# 计算时间序列
    	self.__t = [i / self.__sampling_rate for i in range(len(self.ired))]

    这里将采集到的两路原始数据绘制成折线图;

    2.4.2 信号预处理

    信号预处理一般包括去除直流分量,平滑信号等;

  • 去除直流分量:计算信号的平均值,然后从原始信号中减去平均值,得到去除直流成分的信号;
  • 平滑信号:对去除直流成分的信号进行4点移动平均;
  • 实现代码如下:

    def preprocessing(sequence):
        """
        预处理
        1. 去除直流分量:计算信号的平均值,然后从原始信号中减去平均值,得到去除直流成分的信号
        2. 平滑信号:对去除直流成分的信号进行4点移动平均
    
        :param sequence:
        :return:
        """
        # 去除直流分量
        mean = np.mean(sequence)
        data = sequence - mean
    
        # 4点移动平均
        MA4_SIZE = 4
        N = len(data)
        for k in range(N - MA4_SIZE):
            # 计算4个点的和,计算移动平均值并替换原始值
            data[k] = np.sum(data[k:k + MA4_SIZE]) // MA4_SIZE
    
        return data

    预处理之后的效果如下:

    2.4.2 频谱分析

    对于离散采样信号进行频谱分析一般采用离散傅立叶变换(Discrete Fourier Transform, DFT)来实现。

    有关连续非周期信号的傅里叶变换可以参考文章《第一节、信号的频域分析》。

    对满足绝对可积条件\(\int_{-\infty}^{\infty}|x(t)|dt<\infty\)的连续非周期信号\(|x(t)\) ,使用傅里叶变换得到频谱\(X(f)\),频谱图的横轴为频率,纵轴为幅值,然后通过分析频谱图的波峰以及相应的频率就可以确定目标的工作状态。同理,对频域信号进行傅里叶逆变换可以得到时域信号。傅里叶变换是由法国数学家傅里叶首次提出的,具体指的是任何一个连续非周期信号都能够分解成若干个幅值、频率以及初相位各不相同的正弦信号。

    傅里叶变换公式为:

    \[X(f)=\int_{-\infty}^{\infty}x(t)e^{-j2πft}dt \]

    傅里叶逆变换公式为:

    \[x(t)=\int_{-\infty}^{\infty}X(f)e^{j2πft}df \]

    在实际应用中,计算机处理的是离散信号,然而上述中的时域信号、频域信号都是连续信号,为此,需要采用离散傅里叶变换建立有限长时间序列和离散频谱之间的关系。对长度为N的离散时间序列\(x(n)(n=0,1,2,…,N-1)\)的傅里叶变换公式为:

    \[X(k)=\sum_{n=0}^{n=N-1}x(n)e^{-j\frac{2\pi}{N}kn} \]

    式中,\(\frac{2\pi k}{N}\)表示离散频率,\(k=0,1,2,…,N-1\)为离散频率点的序号,\(X(k)\)对于频率是周期性的,且周期为\(N\)。

    离散傅里叶逆变换公式为:

    \[x(n)=\frac{1}{N}\sum_{k=0}^{k=N-1}X(k)e^{j\frac{2\pi}{N}kn} \]

    式中,\(n=0,1,2,…,N-1\)为离散采样点的序号,\(x(n)\)对于时间是周期性的,且周期为\(N\)。离散傅里叶对本质上都是离散周期的,通常上只取一个周期内的主值。

    要对一个信号进行频谱分析,首先需要知道几个基本条件;

  • 采样频率fs;根据奈奎斯特采样定理可知,采样频率应当大于等于被测信号里最高频率的2倍,才能保证不失真,但是实际情况下,我们可能并不知道最高频率是多少,所以这个就是根据一定的经验或者搜索得到的,比如本次所使用到的心跳信号,心跳信号的主要频率集中在0.5Hz40Hz范围内,这里我们的采样频率是50Hz,相对来说采集的数据可能不是那么准确;
  • 信号长度N(信号的点数);这个一般很容易获得,因为我们经过采样得到的信号都是离散信号,如果是一维的,只需要使用len函数就可以直接获得信号的点数。
  • 这里我们采用python实现FFT绘制频谱图,具体可以参考《用python 实现FFT,绘制频谱图》;

    def frequency_spectrum(self, sequence):
    	"""
    	对离散采样信号进行频谱分析通常可以通过离散傅立叶变换(Discrete Fourier Transform, DFT)来实现
    	:param sequence: 原始信号
    	:return: 预处理之后的信号
    	"""
    	# 采样频率
    	fs = self.__sampling_rate
    	t = self.__t
    
    	# 去除直流分量
    	data = preprocessing(sequence)
    
    	# 采样点数
    	N = len(data)
    	fft_data = fft(data)
    
    	# 在python的计算方式中,fft结果的直接取模和真实信号的幅值不一样。
    	# 对于非直流量的频率,直接取模幅值会扩大N/2倍, 所以需要除了N乘以2。
    	# 对于直流量的频率(0Hz),直接取模幅值会扩大N倍,所以需要除了N。
    	fft_amp0 = np.array(np.abs(fft_data) / N * 2)  # 用于计算双边谱
    	fft_amp0[0] = 0.5 * fft_amp0[0]
    	N_2 = int(N / 2)
    	# 单边谱
    	fft_amp1 = fft_amp0[0:N_2]
    	# 将信号的零频移动到中间
    	fft_amp0_shift = fftshift(fft_amp0)
    
    	# 计算频谱的频率轴
    	list0 = np.array(range(0, N))
    	list1 = np.array(range(0, int(N / 2)))
    	list0_shift = np.array(range(0, N))
    	# 双边谱的频率轴
    	freq0 = fs * list0 / N
    	# 单边谱的频率轴
    	freq1 = fs * list1 / N
    	# 零频移动后的频率轴
    	freq0_shift = fs * list0_shift / N - fs / 2
    
    	# 绘制结果
    	plt.figure(figsize=(18, 9))
    
    	# 原信号
    	plt.subplot(221)
    	plt.plot(t, data)
    	plt.title('Original signal')
    	plt.xlim(0, t[-1])
    	plt.xlabel('Time(s)')
    	plt.ylabel('Amplitude')
    
    	# 双边谱
    	plt.subplot(222)
    	plt.plot(freq0, fft_amp0)
    	plt.title('spectrum two-sided')
    	plt.xlim(0, freq0[-1])
    	plt.xlabel('frequency  (Hz)')
    	plt.ylabel('Amplitude')
    
    	# 单边谱
    	plt.subplot(223)
    	plt.plot(freq1, fft_amp1)
    	plt.title('spectrum single-sided')
    	plt.xlim(0, freq1[-1])
    	plt.xlabel('frequency  (Hz)')
    	plt.ylabel('Amplitude')
    
    	# 移动零频后的双边谱
    	plt.subplot(224)
    	plt.plot(freq0_shift, fft_amp0_shift)
    	plt.title('spectrum two-sided shifted')
    	plt.xlim(0, freq0_shift[-1])
    	plt.xlabel('frequency  (Hz)')
    	plt.ylabel('Amplitude')
    
    	plt.show()

    这里我们对红外数据进行分析,分别绘制了原始信号(这里是预处理之后的),双边谱,单边谱和移动零频后的谱;

    心电图是记录心脏电活动的图形表示,其中包含多种频率成分。一般来说,心电图在频率上可以分为以下几个主要部分:

  • 基线漂移或直流分量:低频成分,通常小于0.5Hz,用于表示心电信号的基准水平;
  • 心跳信号:心跳信号的主要频率集中在0.5 Hz40Hz范围内,其中包括以下几个频率成分:
  • P波:通常在0.5Hz3Hz之间;
  • QRS波群:主要在1Hz20Hz之间;
  • T波:通常在1Hz8Hz之间;
  • 高频噪声和干扰:高频噪声和干扰通常出现在10 Hz以上的频率范围内。
  • 通过对频谱分析可以看到红外信号成分的频率主要集中在0.5-5Hz之间(也就是心率30次/min180次/min)。

    2.4.3 滤波处理

    考虑到心跳频率主要在0.5Hz3Hz之间,因此这里使用低通滤波器将高频部分过滤掉,低通滤波器的原理具体可以参考:《一阶数字低通滤波”原理推导》。

    一阶低通滤波器的公式为:

    \[y(k)=(1-a)y(k-1)+ax(k) \]

    其中:\(x(k)\)为当前输入,\(y(k-1)\)为上一次的输出,\(y(k)\)为当前计算的输出,\(a\)为滤波系数,取值范围为\(0~1\),\(a\)取值越小,当前输入权重就越小,输出波形越平滑,但响应灵敏度降低。

    假设截止频率设置为3Hz,那么一阶低通滤波器的参数\(a\)如何计算呢,根据文章《一阶数字低通滤波”原理推导》我们了解到\(a=\frac{T}{T+τ}\),其中\(T\)为采样周期,这里为0.02s,而截至频率\(f_H=\frac{1}{2\pi τ}\),截止频率定义为幅频响应曲线衰减-3db,即为原来 $\frac{1}{\sqrt 2} $。

    因此我们可以根据公式$$f_H=\frac{1}{2\pi τ}$$反推出\(τ=\frac{1}{6 \pi}\),\(a=\frac{T}{T+τ}=\frac{0.02}{0.02+\frac{1}{6\pi}}=0.2737789034256051\)。

    由于一阶滤波效果不是很好,这里使用了五阶低通滤波器对红外数据进行滤波处理,代码如下:

    def butter_lowpass_filter(self, sequence, cutoff, order=5):
    	"""
    	低通滤波器
    	:param order:
    	:param sequence:
    	:param cutoff: 截止频率
    	:order: 滤波器阶数
    	:return:
    	"""
    	# 采样频率
    	fs = self.__sampling_rate
    	t = self.__t
    
    	# 低通滤波, 这里假设采样频率为50Hz,要滤除3hz以上频率成分,,则wn=2*3/50
    	b, a = butter(order, 2.0 * cutoff / fs, btype="low", analog=False)
    
    	# 计算数字滤波器的频率响应,并返回频率和幅度响应
    	w, h = freqz(b, a, worN=8000)
    
    	# 创建一个新的图形
    	plt.figure(figsize=(12, 9))
    	plt.legend(loc='lower right')
    
    	plt.subplot(2, 1, 1)
    	plt.title("Lowpass Filter Frequency Response")
    	plt.plot(0.5 * fs * w / np.pi, np.abs(h), "b")
    	plt.plot(cutoff, 0.5 * np.sqrt(2), "ko")
    	plt.axvline(cutoff, color="k")
    	plt.xlim(0, 0.5 * fs)
    	plt.xlabel("Frequency [Hz]")
    
    	# 数据预处理
    	data = preprocessing(sequence)
    
    	# Filtering and plotting
    	filtered_data = lfilter(b, a, data)
    
    	plt.subplot(2, 1, 2)
    	plt.title("Original and filter signal")
    	plt.plot(t, data, "b-", label="Original signal")
    	plt.plot(t, filtered_data, "g-", label="Filtered data")
    	plt.xlim(0, t[-1])
    	plt.xlabel("Time(s)")
    	plt.ylabel('Amplitude')
    
    	plt.show()
    	return filtered_data

    滤波后的下过如下图所示,蓝色的为原始信号(这里是预处理之后的),绿色的为滤波后的信号;

    这里我们对滤波后的数据进行频谱分析:

    可以看到,保留了3Hz以下的频率成分,滤除了3Hz以上的频率成分。

    2.5 计算心率

    计算心率的方法如下:

  • 在预处理后的信号中,使用峰值检测算法来寻找脉搏波的峰值点。常见的峰值检测算法包括阈值法、差分法、移动平均法等。
    这些算法可以帮助找到脉搏波的上升沿或下降沿的峰值点;
  • 计算时间间隔:根据峰值点的位置,计算相邻两个峰值点之间的时间间隔(即心跳周期);
  • 计算心率:将心跳周期转换为心率值。心率可以用每分钟心跳次数(BPM)来表示,计算公式为:心率 = 60 / 心跳周期;
  • 实现代码如下:

    def real_heart_rate(self, sequence):
    	"""
    	以下是一种常见的计算心率方法,称为峰值检测法(peak detection)
    	1. 在预处理后的信号中,使用峰值检测算法来寻找脉搏波的峰值点。常见的峰值检测算法包括阈值法、差分法、移动平均法等。
    	   这些算法可以帮助找到脉搏波的上升沿或下降沿的峰值点。
    	2. 计算时间间隔:根据峰值点的位置,计算相邻两个峰值点之间的时间间隔(即心跳周期)。
    	3. 计算心率:将心跳周期转换为心率值。心率可以用每分钟心跳次数(BPM)来表示,计算公式为:心率 = 60 / 心跳周期。
    	:param sequence: 经过预处理之后的近红外/红外数据
    	:return: 心率列表
    	"""
    	# 采样频率
    	fs = self.__sampling_rate
    	t = self.__t
    
    	# 峰值检测
    	peaks_indices, _ = find_peaks(sequence)
    	# 计算相邻心跳之间的时间间隔
    	time_between_beats = np.diff(peaks_indices) / fs
    	# 计算心率(以每分钟为单位)
    	heart_rates = 60 / time_between_beats
    
    	# 创建一个新的图形
    	plt.figure(figsize=(12, 9))
    	plt.legend(loc='lower right')
    
    	# 绘制原始信号和峰值
    	plt.subplot(2, 1, 1)
    	plt.title('Peak')
    	plt.plot(t, sequence, label='Original signal')
    	plt.plot(peaks_indices / fs, sequence[peaks_indices], 'bo', label='Peak')
    	plt.xlabel('Time (s)')
    	plt.ylabel('Amplitude')
    	plt.xlim(0, t[-1])
    
    	# 绘制心跳
    	plt.subplot(2, 1, 2)
    	plt.title('Heartrate')
    	plt.plot(peaks_indices[1:] / fs, heart_rates)
    	plt.xlabel('Time (s)')
    	plt.ylabel('Heartrate')
    	plt.xlim(0, t[-1])
    
    	# 显示图形
    	plt.show()

    绘制的心率图如下:

    2.6 程序入口

    程序入口代码:

    if __name__ == '__main__':
        # 加载原始数据
        file_name = "./要求/数据文件/脉搏波数据文件/采样者1/静坐采样数据.txt"
        # 采样频率
        sampling_rate = 50
        analyze = Max30102DataAnalysis(file_name, sampling_rate)
        # 绘制原始数据
        analyze.plt_raw_data()
        # 对红外数据进行频谱分析
        analyze.frequency_spectrum(analyze.ired)
        # 对红外数据进行预处理,低通滤波
        filtered_ired = analyze.butter_lowpass_filter(analyze.ired, 3, 5)
        # 对滤波后的红外数据进行频谱分析
        analyze.frequency_spectrum(filtered_ired)
        # 计算心率
        analyze.real_heart_rate(filtered_ired)

    三、数据采集工具

    这里我们需要实现一个模拟数据采集客户端,数据采集客户端主要完成以下工作:

  • 读取脉搏波数据文件,数据文件包含不同用户在静坐、跑步、上楼运动后三种状态的数据;这些数据是通过MAX30102传感器采集到的;
  • 调用HTTP接口将采样的数据文件发送到服务端,上报周期为1s
  • 这里我们创建一个client.py文件,用来实现数据采集客户端。

    3.1 数据模拟

    这里我们有采样着1、采样着2、采样着3,3个用户在静坐、跑步、上楼运动后三种状态的数据,如下图所示;

    这里我们以当前时间为基准,随机读取一个采样者的采样数据,并为每个采样点数据生成一个采样时间;实现代码如下:

    def __init__(self, path, sampling_rate, user_names, request_url,period = 5):
    	"""
    	构造函数
    	:param path: 数据文件路径
    	:param sampling_rate: 采样频率
    	:param user_names: 采样者列表
    	:param request_url: 请求url
    	:param period: 上报周期
    	"""
    	# 保存一个用户从当前时刻起的采样数据
    	self.ired = []
    	self.red = []
    	self.list = []
    	# 采样者列表
    	self.user_names = user_names
    	# 采样数据文件所在路径
    	self.__path = path
    	self.__sample_rate = sampling_rate
    	self.__request_url = request_url
        # 数据上报周期,默认5s上报一次
        self.__period = period
    
    def __get_user_sample__(self, user_name):
    	"""
    	每次生成用户的采样数据
    	:param user_name 用户名
    	:return:
    	"""
    	# 获取指定用户数据路径
    	user_path = self.__path + '/' + user_name
    	# 遍历文件夹并获取文件夹下的所有文件
    	files = get_files_in_folder(user_path)
    	length = len(files)
    	# 随便选择一个文件数据
    	num = random.randint(0, length - 1)
    	# 存放红外数据
    	self.ired = []
    	# 存放红光数据
    	self.red = []
    	# 清空
    	self.list = []
    	print('读取采样数据文件', files[num])
    
    	# 文件读取
    	with open(files[num], 'r') as file:
    		first_line_skipped = False
    		for line in file:
    			if not first_line_skipped:
    				first_line_skipped = True
    				continue
    			columns = line.strip().split()
    			self.ired.append(int(columns[0]))
    			self.red.append(int(columns[1]))
    
    	# 获取当前时间
    	start_time = time.time()
    	# 清空
    	self.list = []
    	# 生成每个采样点的采样时间
    	for index in range(len(self.ired)):
    		# 计算采样时间
    		sample_time = start_time + index * (1.0 / sampling_rate)
    		# 格式化时间
    		sample_time_formatted = datetime.fromtimestamp(sample_time).strftime('%Y-%m-%d %H:%M:%S.%f')
    		row = {
    			"time": sample_time_formatted,
    			"infrared": self.ired[index],
    			"red": self.red[index],
    			'user_name': user_name
    		}
    		self.list.append(row)
    3.2 数据上报

    接着将读取到的数据进行上报;

    def reporting_data(self):
    	"""
    	将采集数据上报到服务端
    	:return:
    	"""
    	# 上报周期内采样点
    	sample_length = self.__period * self.__sample_rate
    	# 长度不够,随机生成用户采样数据
    	if len(self.list) < sample_length:
    		self.__get_user_sample__(random.choice(user_names))
    
    	# 每次截取sample_length长度
    	data = self.list[: sample_length]
    	self.list = self.list[sample_length:]
    	print('当前时间{}:上报数据{}'.format(datetime.now(), data))
    	http_request_post(self.__request_url, data)
    3.3 程序测试

    程序入口代码如下:

    if __name__ == '__main__':
        # 服务器数据采样接口
        url = 'http://127.0.0.1:8000/scada/heart/sample/'
        # 采样频率
        sampling_rate = 50
        # 上报周期
        period = 5
        # 采样者列表
        user_names = ['采样者1', '采样者2', '采样者3']
        client = Max30102Client('./课程设计/数据文件/脉搏波数据文件', sampling_rate, user_names, url, period)
        # 上报采样数据
        client.reporting_data()
        # 每隔10秒上报数据
        schedule.every(10).seconds.do(client.reporting_data)
    
        while True:
            # 运行待执行的任务队列
            schedule.run_pending()
            # 暂停1ms,等待下一个任务执行
            time.sleep(1)

    四、后端服务

    这里我们采用Django+sqlite技术栈来实现;首先安装依赖:

    pip install Django -i https://pypi.tuna.tsinghua.edu.cn/simple
    pip install djangorestframework -i https://pypi.tuna.tsinghua.edu.cn/simple
    4.1 创建后端服务

    在你所需要创建的项目的目录下运行:

    G:\python\毕业设计\基于光电容积脉搏的心率检测> django-admin.exe startproject web_server

    在该目录下就会创建出一个新的文件夹,而这个文件夹就是我们的项目,现在我们就完成了项目的构建。

    4.1.1 创建应用

    Django中,如果要使用数据库,比如sqllite3,一般是创建应用,然后在应用中的models.py里构建数据表。

    接下来将展示如何创建应用,并构建简单的数据表为前端提供接口。

    在终端输入:

    G:\python\毕业设计\基于光电容积脉搏的心率检测> cd web_server
    G:\python\毕业设计\基于光电容积脉搏的心率检测/web_server> python manage.py startapp scada

    此时目录结构如下:

    4.1.2 修改配置文件

    然后修改内层web_serversetting.py文件,在INSTALLED_APPS列表中添加刚刚创建的新的应用scada

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'scada'
    ]

    而对于数据库的配置也是在该文件中:

    DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.sqlite3',
            'NAME': BASE_DIR / 'db.sqlite3',
        }
    }

    我们这里直接就使用了默认的数据库sqlite3,当然也可以使用其他的数据库。

    同时修改时区配置:

    USE_TZ = False
    4.1.3 创建实体

    在后端最离不开的就是数据了,而在Django中存储数据是通过数据表来进行的,接下来将介绍如何创建数据model

    scada文件夹下,models.py文件就是专门来写数据表的文件;

    from django.db import models
    
    
    class SensorData(models.Model):
        """
        传感器采样原始数据
        """
        id = models.AutoField(primary_key=True, verbose_name='主键')
        user_name = models.CharField(max_length=255, verbose_name='用户名')
        sample_time = models.DateTimeField(verbose_name='采样时间')
        infrared = models.DecimalField(max_digits=10, decimal_places=2,verbose_name='红外')
        red = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='红光')
        create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    
        class Meta:
            db_table = 'sensor_data'
    
        def __str__(self):
            return '采集时间:{}  采集用户:{}  红外:{} 红光 '.format(self.sample_time, self.user_name, self.infrared, self.red)
    
    
    class HeartRateData(models.Model):
        """
        心跳数据
        """
        id = models.AutoField(primary_key=True, verbose_name='主键')
        user_name = models.CharField(max_length=255, verbose_name='用户名')
        sample_time = models.DateTimeField(verbose_name='采样时间')
        heart_rate_value = models.FloatField(verbose_name='心率')
        create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    
        class Meta:
            db_table = 'heart_rate_data'
    
        def __str__(self):
            return '采集时间:{}  采集用户:{}  心率:{} '.format(self.sample_time, self.user_name, self.heart_rate_value)
    
    
    class HeartRateAbnormalData(models.Model):
        """
        心率异常数据
        """
        id = models.AutoField(primary_key=True, verbose_name='主键')
        user_name = models.CharField(max_length=255, verbose_name='用户名')
        heart_rate_id = models.IntegerField(verbose_name='心跳数据ID')
        abnormal_info = models.TextField(verbose_name='心率异常信息')
        create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    
        class Meta:
            db_table = 'heart_rate_abnormal_data'
    
        def __str__(self):
            return '心率数据ID:{}  采集用户:{}  异常信息:{} '.format(self.heart_rate_id, self.user_name, self.abnormal_info)
    4.1.4 数据采集接口

    服务端需要提供数据采集接口,接口定义:

    POST /scada/heart/sample
    [{
    	"time":"采样时间",
        "infrared":"红外",
        "red": "红光",
        "user_name": "user1"
    }]

    scada文件夹下views.py文件追加;

    @api_view(['POST'])
    def save_sensor_sample_data(request):
        """
        传感器数据采样接口
        :param request:
        :return:
        """
        data = request.data
        # 清理历史数据
        clean_sensor_data()
        if isinstance(data, list):
            sensor_datas = []
            for item in data:
                time = item.get('time')
                infrared = item.get('infrared')
                red = item.get('red')
                user_name = item.get('user_name')
    
                # 创建 SensorData 实例并保存到数据库
                sensor_data = SensorData(
                    sample_time=time,
                    infrared=infrared,
                    red=red,
                    user_name=user_name
                )
                sensor_datas.append(sensor_data)
            # 保存入库
            SensorData.objects.bulk_create(sensor_datas)
            
            response_data = {
                'code': '00000',
                'msg': 'Heart rate samples saved successfully'
            }
        else:
            response_data = {
                'code': '10000',
                'msg': 'Invalid data format'
            }
        return JsonResponse(response_data, status=200, json_dumps_params={'ensure_ascii': False})
    4.1.5 接口路径定义

    首先在sacda目录下新建文件urls.py,并在该目录下写以下代码;

    from django.urls import path
    from .views import save_sensor_sample_data
    
    urlpatterns = [
        path('heart/sample/', save_sensor_sample_data),
    ]

    然后在内层web_server文件夹中urls.py做如下修改;

    from django.contrib import admin
    from django.urls import path
    from django.urls import include
    
    urlpatterns = [
        path('admin/', admin.site.urls),
        path('scada/', include('scada.urls')),
    ]

    好了,现在我们接口路径http://127.0.0.1:8000/scada/heart/sample/就配置完成了,接下来进行验证了。

    注意这里的127.0.0.1:8000,指的是你自己主机的本地地址,所以只有当你将Django在本地跑起来了才能进行访问。

    4.1.6 数据库初始化

    运行服务器前首先进行数据的迁移和构建,因为之前我们写的models.py只是写了代码却没有实现;

    我们在终端输入:

    G:\python\毕业设计\基于光电容积脉搏的心率检测\web_server> python manage.py makemigrations scada
    Migrations for 'scada':
      scada\migrations\0001_initial.py
        - Create model HeartRateAbnormalData
        - Create model HeartRateData
        - Create model SensorData
        
    G:\python\毕业设计\基于光电容积脉搏的心率检测\web_server> python manage.py migrate
    Operations to perform:
      Apply all migrations: admin, auth, contenttypes, scada, sessions
    Running migrations:
      Applying contenttypes.0001_initial... OK
      Applying auth.0001_initial... OK
      Applying admin.0001_initial... OK
      Applying admin.0002_logentry_remove_auto_add... OK
      Applying admin.0003_logentry_add_action_flag_choices... OK
      Applying contenttypes.0002_remove_content_type_name... OK
      Applying auth.0002_alter_permission_name_max_length... OK
      Applying auth.0003_alter_user_email_max_length... OK
      Applying auth.0004_alter_user_username_opts... OK
      Applying auth.0005_alter_user_last_login_null... OK
      Applying auth.0006_require_contenttypes_0002... OK
      Applying auth.0007_alter_validators_add_error_messages... OK
      Applying auth.0008_alter_user_username_max_length... OK
      Applying auth.0009_alter_user_last_name_max_length... OK
      Applying auth.0010_alter_group_name_max_length... OK
      Applying auth.0011_update_proxy_permissions... OK
      Applying auth.0012_alter_user_first_name_max_length... OK
      Applying scada.0001_initial... OK
      Applying sessions.0001_initial... OK

    执行完成后会在项目web_server目录下创建db.sqlite3文件并创建相应的数据表;我们可以使用navicat打开sqlite3数据库;

    4.1.7 运行程序测试

    然后在终端输入:

    G:\python\毕业设计\基于光电容积脉搏的心率检测\web_server> python manage.py runserver 0.0.0.0:8000
    Watching for file changes with StatReloader
    Performing system checks...
    
    System check identified no issues (0 silenced).
    March 17, 2024 - 14:39:31
    Django version 4.2.10, using settings 'web_server.settings'
    Starting development server at http://127.0.0.1:8000/
    Quit the server with CTRL-BREAK.

    接着我们运行我们之前编写的数据采集客户端程序client.py,可以看到数据表sensor_data陆续插入了大量数据;

    4.2 数据查询

    前面我们已经介绍了一个数据采集接口的编写,接下来同样的接口编写思路,我们按照课设要求,完善功能。

    4.2.1 清理历史数据

    当开启了数据采集客户端,每10s上报一次数据,即500个采样点,一分钟将会上报3000个采样点,1小时的数据流将会达到18w

    为了防止sensor_data数据表数据过大,我们将数据只保留3个小时,超过3个小时的将会清理。在scada文件夹下views.py文件追加;

    def clean_sensor_data():
        """
        清理历史数据
        :return:
        """
        cursor = connection.cursor()
    
        # 计算当前时间的3小时之前的时间
        three_hours_ago = datetime.now() - timedelta(hours=3)
        three_hours_ago_str = three_hours_ago.strftime('%Y-%m-%d %H:%M:%S')
    
        # 清理 sensor_data 数据表中3小时之前的数据
        cursor.execute("DELETE FROM sensor_data WHERE create_time < %s", [three_hours_ago_str])

    在数据采集客户端每次上报数据的时候,调用该接口清理历史数据。

    4.2.2 查询心率历史数据

    scada文件夹下views.py文件追加如下代码,如果不指定查询起始、结束时间,默认返回1分钟内的心率数据;

    @api_view(['GET'])
    def heart_rate_data_view(request):
        user_name = request.GET.get('user_name')
        start_time_str = request.GET.get('start_time')
        end_time_str = request.GET.get('end_time')
    
        # 判断用户是否提供了用户名
        if not user_name:
            return JsonResponse({'code': '10000', 'msg': 'user_name不能为空'}, status=200,
                                json_dumps_params={'ensure_ascii': False})
    
        # 获取起始时间和结束时间
        if start_time_str:
            start_time = datetime.fromisoformat(start_time_str)
        else:
            start_time = datetime.now() - timedelta(minutes=1)
    
        if end_time_str:
            end_time = datetime.fromisoformat(end_time_str)
        else:
            end_time = datetime.now()
    
        # 查询heart_rate_data数据表的历史记录
        heart_rate_data = HeartRateData.objects.filter(user_name=user_name, create_time__gte=start_time,
                                                       create_time__lte=end_time)
    
        data = {
            'user_name': user_name,
            'start_time': start_time.isoformat(),
            'end_time': end_time.isoformat(),
            'heart_rate_data': list(heart_rate_data.values())  # 将查询结果转换为列表
        }
    
        response_data = {
            'code': '00000',
            'msg': 'success',
            'data': data
        }
    
        return JsonResponse(response_data, status=200, json_dumps_params={'ensure_ascii': False})

    首先在sacda目录下urls.py,中追加:

    urlpatterns = [
        path('heart/sample/', save_sensor_sample_data),
        path('heart/rate/', save_sensor_sample_data),
    ]

    在开启数据采集客户端的情景下,我们在浏览器输入http://127.0.0.1:8000/scada/heart/rate/?user_name=%E9%87%87%E6%A0%B7%E8%80%852,返回结果如下:

    {
    	"code": "00000",
    	"msg": "success",
    	"data": {
    		"user_name": "采样者1",
    		"start_time": "2024-03-17T18:00:36.682051",
    		"end_time": "2024-03-17T18:01:36.682051",
    		"heart_rate_data": [{
    			"id": 1209,
    			"user_name": "采样者1",
    			"sample_time": "2024-03-17T18:00:41.013",
    			"heart_rate_value": 130.43478260869566,
    			"create_time": "2024-03-17T18:00:45.909"
    		}, {
    			"id": 1210,
    			"user_name": "采样者1",
    			"sample_time": "2024-03-17T18:00:41.473",
    			"heart_rate_value": 125.0,
    			"create_time": "2024-03-17T18:00:45.909"
    		}, {
    			"id": 1211,
    			"user_name": "采样者1",
    			"sample_time": "2024-03-17T18:00:41.953",
    			"heart_rate_value": 120.0,
    			"create_time": "2024-03-17T18:00:45.909"
    		},
            ......,
            ]
    	}
    }

    这里create_time是数据入库时间,sample_time是红外数据采样的时间点,中间的时间差就是服务处理的耗时时间。

    4.3 推送实时数据

    服务端在计算得到心率后需要向前端推送用户的实时数据,心率异常时需要推送告警信息,同时将告警数据入库,实时推送通过WebSocket实现。

    4.3.1 安装依赖

    要在Django中使用WebSocket,我们需要借助一个名为Django Channels的第三方库。

    安装Django Channels非常简单,只需要通过pip安装即可:

    pip install channels daphne -i https://pypi.tuna.tsinghua.edu.cn/simple
    4.3.2 配置settings.py

    配置Django Channels需要进行一些额外的设置。先在Django的设置文件settings.py中添加Channels相关的配置信息:

    INSTALLED_APPS = [
        'daphne', # 注册daphne组件,在channels4.0开始,注册组件使用daphne,一定放在开头
        # 其他应用...
        'acada'    
    ]
    
    # Django项目默认的WSGI配置,可以注释掉,也可以放着不管,因为之后我们不会使用WSGI作为网关,而是使用下面的ASGI配置。
    WSGI_APPLICATION = 'web_server.wsgi.application'
    #channels使用需要添加ASGI_APPLICATION
    ASGI_APPLICATION = 'web_server.asgi.application'
    
    # 使用channel_layers需要配置通道
    CHANNEL_LAYERS = {
        "default": {
            # 1、使用内存作为通道(开发使用)
            "BACKEND": "channels.layers.InMemoryChannelLayer",
            # 2、使用redis(上线使用)
            # 'BACKEND': 'channels.layers.RedisChannelLayer',
            # 'CONFIG': {
            #     'hosts': [('localhost', 6379)],
            # },
        }
    }

    WSGIDjango默认启动模式,WSGI只支持同步处理请求,即Http请求。

    channels运行于ASGI协议上,ASGI的全名是Asynchronous Server Gateway Interface。它是区别于Django使用的WSGI协议 的一种异步服务网关接口协议,正是因为它才实现了Websocket

    ASGI_APPLICATION指定主路由的位置为web_server下的asgi.py文件中的application

    4.3.3 配置asgi.py

    默认的asgi虽然支持了异步服务,但是仍然不支持Websocket。因此我们要修改asgi.py的内容如下所示,修改asgi.py文件;

    import os
    
    from channels.routing import ProtocolTypeRouter, URLRouter
    from django.core.asgi import get_asgi_application
    from . import routings
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'web_server.settings')
    # application = get_asgi_application()  # 注释掉原来的application
    
    application = ProtocolTypeRouter({
        # http走Django默认的asgi
        "http": get_asgi_application(),
        # websocket走channels
        'websocket': URLRouter(routings.websocket_urlpatterns)
    })

    其中:

  • ProtocolTypeRouterASIG支持多种不同的协议,在这里可以指定特定协议的路由信息,我们只使用了Websocket协议,这里只配置Websocket即可;
  • URLRouter:指定路由文件的路径,也可以直接将路由信息写在这里;
  • 经过这样的配置之后,我们之后运行项目的时候不在使用WSGI启动,而是使用ASGI启动项目。

    4.3.4 配置routings.py

    接下来,在settings.py同级目录创建一个名为routings.py的文件,就是我们刚才在asgi.py中导入的那个文件。

    然后在routings.py文件中写上如下的内容:

    from django.urls import path
    from scada.consumers import HeartRateConsumer
    
    websocket_urlpatterns = [
        path('scada/heart', HeartRateConsumer.as_asgi()),
    ]

    routings.py路由文件存放Websocket请求相关的路由信息,跟Djangourl.py功能类似,语法也一样,这里配置的意思就是访问scada/heart/都交给HeartRateConsumer处理。

    4.3.5 consumers

    scada目录下创建一个包含WebSocket路由和处理程序的consumers.py模块;

    heart_rate_message函数中加入数据处理的逻辑,通过计算得到心率值,并保存入库,同时将计算的心率实时推动给前端;

    import json
    from datetime import datetime, timedelta
    
    import numpy as np
    from asgiref.sync import async_to_sync, sync_to_async
    from channels.generic.websocket import AsyncWebsocketConsumer
    from channels.layers import get_channel_layer
    from rest_framework import serializers
    from scipy.signal import butter, lfilter
    from scipy.signal import find_peaks
    
    from .models import HeartRateData
    
    
    class HeartRateDataModelSerializer(serializers.ModelSerializer):
        class Meta:
            model = HeartRateData
            fields = ['id', 'user_name', 'sample_time', 'heart_rate_value', 'create_time']
    
    
    @sync_to_async
    def save_heart_rate(heart_rates):
        HeartRateData.objects.bulk_create(heart_rates)
    
    
    def preprocessing(sequence, order=5, cutoff=3):
        """
        预处理
        1. 去除直流分量:计算信号的平均值,然后从原始信号中减去平均值,得到去除直流成分的信号
        2. 平滑信号:对去除直流成分的信号进行4点移动平均
        3. 低通滤波器
    
        :param sequence: 原始信号
        :param order: 滤波器阶数
        :param cutoff: 截止频率
        :return:
        """
        # 采样频率
        fs = 50
    
        # 去除直流分量
        mean = np.mean(sequence)
        data = sequence - mean
    
        # 4点移动平均
        MA4_SIZE = 4
        N = len(data)
        for k in range(N - MA4_SIZE):
            # 计算4个点的和,计算移动平均值并替换原始值
            data[k] = np.sum(data[k:k + MA4_SIZE]) // MA4_SIZE
    
        # 低通滤波, 这里假设采样频率为50Hz,要滤除3hz以上频率成分,,则wn=2*3/50
        b, a = butter(order, 2.0 * cutoff / fs, btype="low", analog=False)
    
        # Filtering and plotting
        filtered_data = lfilter(b, a, data)
    
        return filtered_data
    
    
    def real_heart_rate(sample_time, user_name, sequence):
        """
        以下是一种常见的计算心率方法,称为峰值检测法(peak detection)
        1. 在预处理后的信号中,使用峰值检测算法来寻找脉搏波的峰值点。常见的峰值检测算法包括阈值法、差分法、移动平均法等。
           这些算法可以帮助找到脉搏波的上升沿或下降沿的峰值点。
        2. 计算时间间隔:根据峰值点的位置,计算相邻两个峰值点之间的时间间隔(即心跳周期)。
        3. 计算心率:将心跳周期转换为心率值。心率可以用每分钟心跳次数(BPM)来表示,计算公式为:心率 = 60 / 心跳周期。
        :param sequence: 经过预处理之后的近红外/红外数据
        :param sample_time: 采样起始时间
        :param user_name: 采用用户名
        :return: 心率列表
        """
        # 采样频率
        fs = 50
        # 每个采样点时间间隔 单位毫秒
        interval = 1000 / fs
    
        # 峰值检测
        peaks_indices, _ = find_peaks(sequence)
        # 计算相邻心跳之间的时间间隔
        time_between_beats = np.diff(peaks_indices) / fs
        # 计算心率(以每分钟为单位)
        heart_rates = 60 / time_between_beats
    
        # 创建一个 timedelta 对象表示要增加的毫秒数
        milliseconds_list = peaks_indices * interval
    
        # 创建一个空列表用于存储新的时间
        date_time_obj = datetime.strptime(sample_time, '%Y-%m-%d %H:%M:%S.%f')
        sample_times = []
    
        # 遍历时间列表
        for index in range(len(peaks_indices)):
            # 将时间字符串解析为 datetime 对象
            # 创建一个 timedelta 对象表示要增加的毫秒数
            milliseconds_delta = timedelta(milliseconds=milliseconds_list[index])
    
            # 将毫秒数加到时间上得到新的时间
            new_time = date_time_obj + milliseconds_delta
    
            # 格式化时间
            sample_time_formatted = new_time.strftime('%Y-%m-%d %H:%M:%S.%f')
    
            # 将新的时间添加到列表中
            sample_times.append(sample_time_formatted)
    
        return [HeartRateData(
            sample_time=sample_time,
            heart_rate_value=heart_rate,
            user_name=user_name) for
            sample_time, heart_rate in
            zip(sample_times, heart_rates)]
    
    
    class HeartRateConsumer(AsyncWebsocketConsumer):
        def __init__(self, *args, **kwargs):
            super().__init__(args, kwargs)
            # 群组名称
            self.room_group_name = None
            # 房间名称
            self.room_name = None
    
        async def connect(self):
            # 这里我们设置了一个固定的群组名称,所有的消息都会发送到这个群组里
            self.room_group_name = 'ired_room'
    
            # 将连接添加到群组
            await self.channel_layer.group_add(
                self.room_group_name,
                self.channel_name
            )
    
            await self.accept()
    
        async def disconnect(self, close_code):
            # 在断开连接时,将连接从群组中移除
            await self.channel_layer.group_discard(
                self.room_group_name,
                self.channel_name
            )
    
        async def receive(self, text_data):
            """
            Receive message from WebSocket
            :param text_data:
            :return:
            """
            print('Websocket receive message', text_data)
            await self.send(text_data=json.dumps({
                'message': text_data
            },ensure_ascii=False))
    
        async def heart_rate_message(self, event):
            """
            对红外数据处理,并计算心率,实时推送
            :param event: 事件
            :return:
            """
            text_data = event['message']
            text_data_json = json.loads(text_data)
            user_name = text_data_json['user_name']
            sample_time = text_data_json['sample_time']
            ired_datas = text_data_json['ired_datas']
    
            # 对红外数据进行预处理
            filtered_ired = preprocessing(ired_datas)
            # 计算心率
            heart_rates = real_heart_rate(sample_time, user_name, filtered_ired)
    
            # 心率数据入库
            await save_heart_rate(heart_rates)
    
            # 将心率数据转换为JSON格式
            data_json = HeartRateDataModelSerializer(heart_rates, many=True).data
    
            # Send message to WebSocket,发送心率数据给客户端
            await self.send(text_data=json.dumps(data_json, ensure_ascii=False))
    
    
    def send_message_to_group(room_group_name, message):
        """
        发送消息到指定的群组
        :param room_group_name: 群组名称
        :param message: 要发送的消息
        """
        channel_layer = get_channel_layer()
    
        # 因为layer中所有的方法都是异步的,要从同步环境发送事件,需要使用async_to_sync来包装,否则消息无法发送
        async_to_sync(channel_layer.group_send)(
            room_group_name,
            {
                'type': 'heart_rate_message',
                'message': message
            }
        )
    4.3.6 运行项目

    运行如下命令:

    G:\python\毕业设计\基于光电容积脉搏的心率检测\web_server> python manage.py runserver 0.0.0.0:8000 
    Watching for file changes with StatReloader
    Performing system checks...
    
    System check identified no issues (0 silenced).
    March 17, 2024 - 22:05:52
    Django version 4.2.10, using settings 'web_server.settings'
    Starting ASGI/Daphne version 4.1.0 development server at http://127.0.0.1:8000/   # 发生变化

    现在,打开浏览器或者Postman等工具进行测试即可,这里使用浏览器的控制台发生Websocket请求进行测试;

    # 创建websocket连接
    ws = new WebSocket('ws://127.0.0.1:8000/scada/heart')
    
    # 监听信息
    ws.onmessage = event => {console.log(event.data)}
    
    # 发送消息
    ws.send('123')
    
    # 关闭连接
    ws.close()

    首先是握手(HANDSHAKING),握手成功就连接上了(CONNECT)。当前端发送关闭之后,服务器收到以后,执行断开连接(DISCONNECT)操作;

    WebSocket HANDSHAKING /scada/heart [127.0.0.1:62801]
    WebSocket CONNECT /scada/heart [127.0.0.1:62801]
    WebSocket DISCONNECT /scada/heart [127.0.0.1:62801]
    4.3.7 测试

    此外我们可以通过如下网址进行测试:EasySwoole-WebSocket在线测试工具。

    运行Web服务,打开测试网址输入如下内容:

    点击开始连接,并发送Hello Word到服务端;

    在服务端我们将接受到的数据重新推送给了客户端,因此同样可以接收到Hello Word

    接着我们运行数据采集客户端程序,服务端在接收到采集的数据,经过处理会将心跳数据推送给客户端,如下所示;

    云服务器部署地址:

  • ws://122.51.17.164:8000/scada/heart
  • http://122.51.17.164:8000/scada/heart/rate/?user_name=采样者1&start_time=2024-03-18 20:22:00
  • 五、前端Web

    5.1 前端打包

    前端这里使用vue开发的,vue程序打包后作为后台的静态文件,vue源码位于webapp目录下,将 vue.config.js 修改为:

    module.exports = {
      assetsDir: 'static',// 静态资源打包输出目录 (js, css, img, fonts),相应的url路径也会改变
    };

    然后运行 npm run build,生成的 dist 目录如下;

    📁 dist/
    ├── 📁 static/
    │   ├── 📁 css/
    │   └── 📁 js/
    ├── favicon.ico
    └── index.html

    web_server新建文件夹frontend(用来存放前端代码),将vue打包的dist目录的文件放到里面;

    5.2 修改setting主要配置文件

    修改web_server/web_server目录下的settings.py文件;

    TEMPLATES = [
        {
            'BACKEND': 'django.template.backends.django.DjangoTemplates',
            'DIRS': [os.path.join(BASE_DIR, 'frontend')]
            ,
            'APP_DIRS': True,
            'OPTIONS': {
                'context_processors': [
                    'django.template.context_processors.debug',
                    'django.template.context_processors.request',
                    'django.contrib.auth.context_processors.auth',
                    'django.contrib.messages.context_processors.messages',
                ],
            },
        },
    ]
    
    STATIC_URL = '/static/'
    STATIC_ROOT = os.path.join(BASE_DIR, 'static')
    STATICFILES_DIRS = [
        os.path.join(BASE_DIR, "frontend/static"),
    ]
    5.3 配置路由

    修改web_server/web_server目录下的urls.py文件;

    from django.views.generic import TemplateView
    urlpatterns = [
    	.......
        path('', TemplateView.as_view(template_name="index.html"))
    ]
    5.4 运行项目

    运行以下命令收集静态文件:

    G:\python\毕业设计\基于光电容积脉搏的心率检测\web_server> python manage.py collectstatic

    这将会把 frontend/static 目录下的静态文件复制到 STATIC_ROOT 目录中。

    运行如下命令:

    G:\python\毕业设计\基于光电容积脉搏的心率检测\web_server> python manage.py runserver 0.0.0.0:8000

    运行数据采集客户端,同时浏览器打开页面输入地址127.0.0.1:8000,稍微等待几分钟,可以看到数据已经陆续采样过来并正常显示;

    [1] MAX30102脉搏血氧仪和心率传感器

    [2] 一阶数字低通滤波”原理推导

    [3] 第一节、信号的频域分析

    [4] MAX30102心率血氧模块_代码详细注释

    [5] Django4+webSocket

    作者:Graceful_scenery

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python Django实现基于MAX30102光电容积脉搏心率检测详解

    发表回复