首页 > 编程语言 >基于降噪自编码器的时间序列降噪方法-以心电信号为例(Python)

基于降噪自编码器的时间序列降噪方法-以心电信号为例(Python)

时间:2025-01-03 18:30:17浏览次数:3  
标签:loss 为例 Python 05 降噪 186 np ax data

# Import needed modules
import matplotlib
import matplotlib.pyplot as plt


import numpy as np
import pandas as pd
import tensorflow as tf
from scipy.fft import fft, fftfreq
from scipy.signal import butter, lfilter, freqz, bode, filtfilt


from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model
# Download the dataset
data_path = 'data.csv'
df = pd.read_csv(data_path, header=None, sep=';')
raw_data = df.values
# The second element contains the signal key
labels = np.array(raw_data[:, 0])
print(f"Shape of the signal keys: {labels.shape}")


# The second element contains the labels
labels = np.array(raw_data[:, 1])
print(f"Shape of the labels: {labels.shape}")


# The other data points are the ECG data
data = np.array(raw_data[:, 2:], dtype=np.float64)
print(f"Shape of the dataset: {data.shape}")


# Count the number of abnormal (0) and normal (1) ECG samples
[ABNORMAL, NORMAL, OTHER], n = np.unique(labels, return_counts=True)
print(f"Number of abnormal rythm samples: {n[0]}")
print(f"Number of normal rythm samples: {n[1]}")
print(f"Number of other rythm samples: {n[2]}")
Shape of the signal keys: (8229,)
Shape of the labels: (8229,)
Shape of the dataset: (8229, 3000)
Number of abnormal rythm samples: 737
Number of normal rythm samples: 5040
Number of other rythm samples: 2452
# Define sampling frequency and interval
f_s = 300
T_s = 1 / f_s


# Define the time vector
t = np.arange(0, data.shape[1]) * T_s


# Visualize some normal and abnormal rythms
normal_data = data[labels == NORMAL]
abnormal_data = data[labels == ABNORMAL]
other_data = data[labels == OTHER]


fig, ax = plt.subplots(3, 1,sharex=True, sharey=True, figsize=(12,12))


ax[0].plot(t, normal_data[0,:], color='#0033cc')
ax[0].set_ylabel('Voltage (mV)')
ax[0].grid(True, which='both', axis='both')
ax[0].set_title("Normal rythm")


ax[1].plot(t, abnormal_data[0,:], color='#0033cc')
ax[1].set_ylabel('Voltage (mV)')
ax[1].grid(True, which='both', axis='both')
ax[1].set_title("Abnormal rythm")


ax[2].plot(t, other_data[20,:], color='#0033cc')
ax[2].set_ylabel('Voltage (mV)')
ax[2].grid(True, which='both', axis='both')
ax[2].set_title("Other rythm")


plt.xlabel('Time (s)')
plt.savefig('examples.pdf', dpi=120, format='pdf', bbox_inches='tight')

# Split the data into training and test sets
train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.1, random_state=21,
)
print(test_labels.shape)
# Test indecies 0 (Normal), 2 (AF), and -1 (Other)
(823,)
# Normalize the data between 0 and 1
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)


train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)


train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)


train_data = train_data[..., tf.newaxis]
test_data = test_data[..., tf.newaxis]
fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(12,12))


ax[0].plot(t, train_data[0,:], color='#0033cc')
ax[0].set_ylabel('Amplitude')
ax[0].grid(True, which='both', axis='both')


ax[1].plot(t, train_data[1,:], color='#0033cc')
ax[1].set_ylabel('Amplitude')
ax[1].grid(True, which='both', axis='both')


plt.suptitle("Normalized signals")
plt.xlabel('Time (s)')

# Add some noise to the training and test sets
noisy_train_data = np.zeros(shape=train_data.shape)
noisy_test_data = np.zeros(shape=test_data.shape)


def generate_baseline_wander_noise(ecg_signal, t):
  A = np.random.uniform(0, 0.15) * np.abs((np.max(ecg_signal) - np.min(ecg_signal)))  # Amplitude
  w = 2*np.pi*np.random.uniform(0.15, 0.3)  # Angular frequency
  phi = np.random.uniform(-np.pi, np.pi) # Phase
  return A*np.sin(w*t + phi).reshape(ecg_signal.shape[0], 1)


def generate_power_line_interference(ecg_signal, t):
  A = np.random.uniform(0, 0.5) * np.abs((np.max(ecg_signal) - np.min(ecg_signal)))  # Amplitude
  w = 2*np.pi*np.random.uniform(49.8, 50.2)  # Angular frequency
  phi = np.random.uniform(-np.pi, np.pi) # Phase
  return A*np.sin(w*t + phi).reshape(ecg_signal.shape[0], 1)


def generate_muscle_artefacts(ecg_signal, t):
  A = np.random.uniform(0, 0.1) * np.abs((np.max(ecg_signal) - np.min(ecg_signal)))  # Amplitude
  w = 2*np.pi*np.random.uniform(0, 10000)  # Angular frequency
  phi = np.random.uniform(-np.pi, np.pi) # Phase
  return A*np.sin(w*t + phi).reshape(ecg_signal.shape[0], 1)


for i in range(train_data.shape[0]):
  baseline_wander_noise = generate_baseline_wander_noise(train_data[i], t)
  power_line_interference = generate_power_line_interference(train_data[i], t)
  muscle_artefacts = generate_muscle_artefacts(train_data[i], t)
  noisy_train_data[i] = train_data[i] + baseline_wander_noise + power_line_interference + muscle_artefacts


for i in range(test_data.shape[0]):
  baseline_wander_noise = generate_baseline_wander_noise(test_data[i], t)
  power_line_interference = generate_power_line_interference(test_data[i], t)
  muscle_artefacts = generate_muscle_artefacts(test_data[i], t)
  noisy_test_data[i] = test_data[i] + baseline_wander_noise + power_line_interference + muscle_artefacts


noisy_train_data = tf.clip_by_value(noisy_train_data, clip_value_min=0., clip_value_max=1.)
noisy_test_data = tf.clip_by_value(noisy_test_data, clip_value_min=0., clip_value_max=1.)
# Define a function to compute Fast-Fourier Transform
def FFT(signal):
  signal = np.array(signal).reshape(-1)
  N = signal.shape[0]
  y = signal - np.mean(signal)
  yf = fft(y)
  xf = fftfreq(N, T_s)[:N//2]
  return xf, 2.0/N * np.abs(yf[0:N//2])
# Define a function to perform filtering
def do_filtering(signal, plot_response=False):
  signal = np.array(signal).reshape(-1)
  u = np.mean(signal)
  y = signal - u
  b1, a1 = butter(3, Wn=45, fs=f_s, btype='lowpass', analog=False)
  b2, a2 = butter(3, Wn=[45, 55], fs=f_s, btype='bandstop', analog=False)
  if plot_response:
    w1, mag1, phase1 = bode((b1, a1))
    w1, h1 = freqz(b1, a1, fs=f_s, worN=8000)


    w2, mag2, phase2 = bode((b2, a2))
    w2, h2 = freqz(b2, a2, fs=f_s, worN=8000)


    plt.plot(w1, np.abs(h1) * np.abs(h2), 'b')


    plt.xlim(0, 0.5*f_s)
    plt.title("Frequency Response")
    plt.xlabel('Frequency (Hz)')
    plt.grid()
    plt.show()


  y1 = filtfilt(b1, a1, y)
  y2 = filtfilt(b2, a2, y1)
  return y2 + u
xf, yf = FFT(noisy_train_data[0,:])
plt.plot(xf, yf)

fig, ax = plt.subplots(2, 2, figsize=(16,12))


ax[0][0].plot(t, noisy_test_data[0,:], color='#0033cc')
ax[0][0].set_ylabel('Amplitude')
ax[0][0].grid(True, which='both', axis='both')
ax[0][0].set_xlabel("Time (s)")


ax[0][1].plot(*FFT(noisy_train_data[0,:]), color='#0033cc')
ax[0][1].set_ylabel('Amplitude (au)')
ax[0][1].grid(True, which='both', axis='both')
ax[0][1].set_xlabel("Frequency (Hz)")


ax[1][0].plot(t, noisy_test_data[1,:], color='#0033cc')
ax[1][0].set_ylabel('Amplitude')
ax[1][0].grid(True, which='both', axis='both')
ax[1][0].set_xlabel("Time (s)")


ax[1][1].plot(*FFT(noisy_train_data[1,:]), color='#0033cc')
ax[1][1].set_ylabel('Amplitude (au)')
ax[1][1].grid(True, which='both', axis='both')
ax[1][1].set_xlabel("Frequency (Hz)")


plt.suptitle("Noisy signals and their Fourier transforms")

y = do_filtering(noisy_test_data[0,:], plot_response=True)


plt.plot(t, y)
plt.show()


plt.plot(*FFT(y))
plt.show()

# Define the denoising autoencoder
class Denoiser(Model):
  def __init__(self):
    super(Denoiser, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Input(shape=(3000, 1)),
      layers.Conv1D(16, 3, activation='relu', padding='same', strides=2),
      layers.Conv1D(8, 3, activation='relu', padding='same', strides=2),
      layers.Conv1D(4, 3, activation='relu', padding='same', strides=2)])


    self.decoder = tf.keras.Sequential([
      layers.Conv1DTranspose(4, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv1DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv1DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv1D(1, 3, activation='sigmoid', padding='same')])


  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded
autoencoder = Denoiser()


autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())


history = autoencoder.fit(noisy_train_data, train_data, 
          epochs=30, 
          validation_split=0.2,
          shuffle=True,)
Epoch 1/30
186/186 [==============================] - 20s 100ms/step - loss: 3.2762e-04 - val_loss: 3.0635e-04
Epoch 2/30
186/186 [==============================] - 18s 99ms/step - loss: 2.8828e-04 - val_loss: 2.0353e-04
Epoch 3/30
186/186 [==============================] - 20s 110ms/step - loss: 1.3508e-04 - val_loss: 1.1697e-04
Epoch 4/30
186/186 [==============================] - 18s 99ms/step - loss: 1.0006e-04 - val_loss: 9.4711e-05
Epoch 5/30
186/186 [==============================] - 18s 99ms/step - loss: 8.1577e-05 - val_loss: 9.1667e-05
Epoch 6/30
186/186 [==============================] - 22s 117ms/step - loss: 7.5245e-05 - val_loss: 8.3877e-05
Epoch 7/30
186/186 [==============================] - 19s 102ms/step - loss: 7.2229e-05 - val_loss: 8.2462e-05
Epoch 8/30
186/186 [==============================] - 20s 107ms/step - loss: 6.8987e-05 - val_loss: 7.8797e-05
Epoch 9/30
186/186 [==============================] - 18s 99ms/step - loss: 6.8779e-05 - val_loss: 7.6079e-05
Epoch 10/30
186/186 [==============================] - 18s 99ms/step - loss: 6.7444e-05 - val_loss: 7.5826e-05
Epoch 11/30
186/186 [==============================] - 21s 111ms/step - loss: 6.7014e-05 - val_loss: 7.6798e-05
Epoch 12/30
186/186 [==============================] - 21s 111ms/step - loss: 6.6452e-05 - val_loss: 7.5280e-05
Epoch 13/30
186/186 [==============================] - 19s 100ms/step - loss: 6.6554e-05 - val_loss: 7.4448e-05
Epoch 14/30
186/186 [==============================] - 19s 100ms/step - loss: 6.5830e-05 - val_loss: 7.4646e-05
Epoch 15/30
186/186 [==============================] - 19s 100ms/step - loss: 6.5416e-05 - val_loss: 7.3206e-05
Epoch 16/30
186/186 [==============================] - 21s 112ms/step - loss: 6.5805e-05 - val_loss: 7.3651e-05
Epoch 17/30
186/186 [==============================] - 19s 100ms/step - loss: 6.5166e-05 - val_loss: 7.5686e-05
Epoch 18/30
186/186 [==============================] - 19s 100ms/step - loss: 6.5179e-05 - val_loss: 7.3824e-05
Epoch 19/30
186/186 [==============================] - 18s 99ms/step - loss: 6.5522e-05 - val_loss: 8.9496e-05
Epoch 20/30
186/186 [==============================] - 20s 110ms/step - loss: 6.7115e-05 - val_loss: 7.3509e-05
Epoch 21/30
186/186 [==============================] - 18s 99ms/step - loss: 6.4488e-05 - val_loss: 7.3919e-05
Epoch 22/30
186/186 [==============================] - 18s 99ms/step - loss: 6.4671e-05 - val_loss: 7.2101e-05
Epoch 23/30
186/186 [==============================] - 19s 100ms/step - loss: 6.4819e-05 - val_loss: 7.1936e-05
Epoch 24/30
186/186 [==============================] - 19s 100ms/step - loss: 6.4541e-05 - val_loss: 7.2177e-05
Epoch 25/30
186/186 [==============================] - 20s 110ms/step - loss: 6.4493e-05 - val_loss: 7.2277e-05
Epoch 26/30
186/186 [==============================] - 18s 99ms/step - loss: 6.4300e-05 - val_loss: 7.2186e-05
Epoch 27/30
186/186 [==============================] - 18s 99ms/step - loss: 6.3860e-05 - val_loss: 7.4237e-05
Epoch 28/30
186/186 [==============================] - 18s 99ms/step - loss: 6.4343e-05 - val_loss: 7.2179e-05
Epoch 29/30
186/186 [==============================] - 23s 122ms/step - loss: 6.4499e-05 - val_loss: 7.1089e-05
Epoch 30/30
186/186 [==============================] - 19s 100ms/step - loss: 6.3786e-05 - val_loss: 7.2893e-05
autoencoder.encoder.summary()
autoencoder.decoder.summary()
Model: "sequential_8"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv1d_16 (Conv1D)          (None, 1500, 16)          64        
                                                                 
 conv1d_17 (Conv1D)          (None, 750, 8)            392       
                                                                 
 conv1d_18 (Conv1D)          (None, 375, 4)            100       
                                                                 
=================================================================
Total params: 556
Trainable params: 556
Non-trainable params: 0
_________________________________________________________________
Model: "sequential_9"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv1d_transpose_12 (Conv1D  (None, 750, 4)           52        
 Transpose)                                                      
                                                                 
 conv1d_transpose_13 (Conv1D  (None, 1500, 8)          104       
 Transpose)                                                      
                                                                 
 conv1d_transpose_14 (Conv1D  (None, 3000, 16)         400       
 Transpose)                                                      
                                                                 
 conv1d_19 (Conv1D)          (None, 3000, 1)           49        
                                                                 
=================================================================
Total params: 605
Trainable params: 605
Non-trainable params: 0
_________________________________________________________________
encoded_ecgs = autoencoder.encoder(noisy_test_data).numpy()
decoded_ecgs = autoencoder.decoder(encoded_ecgs).numpy()
n = 3
ind = [0, 2, -1]
fig, ax = plt.subplots(3, n, sharex=True, sharey=True, figsize=(26, 10))
for i in range(n):
    ix = ind[i]
    # display original + noise
    ax[0][i].set_title("Original signal")
    ax[0][i].plot(t, tf.squeeze(test_data[ix]), color='#0033cc')
    ax[0][i].grid(True)
    ax[0][i].set_ylabel('Amplitude')
    ax[0][i].set_xlabel('Time (s)')


    # display original + noise
    ax[1][i].set_title("Original signal corrupted with noise")
    ax[1][i].plot(t, tf.squeeze(noisy_test_data[ix]), color='#0033cc')
    ax[1][i].grid(True)
    ax[1][i].set_ylabel('Amplitude')
    ax[1][i].set_xlabel('Time (s)')


    # display reconstruction
    ax[2][i].set_title("Reconstructed signal")
    ax[2][i].plot(t, tf.squeeze(decoded_ecgs[ix]), color='#0033cc')
    ax[2][i].grid(True)
    ax[2][i].set_ylabel('Amplitude')
    ax[2][i].set_xlabel('Time (s)')


#plt.suptitle("Example results of using the designed denoising autoencoder on noisy ECG recordings")
plt.show()
fig.savefig('results.pdf', dpi=120, format='pdf', bbox_inches='tight')

y = do_filtering(noisy_test_data[0,:])


fig, ax = plt.subplots(2, 1, sharex=True, sharey=True, figsize=(12, 8))


print(mean_squared_error(tf.squeeze(test_data[0]), y))
print(mean_squared_error(tf.squeeze(test_data[0]), tf.squeeze(decoded_ecgs[0])))


ax[0].plot(t, tf.squeeze(test_data[0]), 'b')
ax[0].plot(t, tf.squeeze(decoded_ecgs[0]), 'r')
ax[0].fill_between(t, tf.squeeze(decoded_ecgs[0]),  tf.squeeze(test_data[0]), color='lightcoral')
ax[0].legend(labels=["Actual ECG signal", "Autoencoder reconstruction", "Error"], loc='upper left')
ax[0].grid()
ax[0].set_xlim([0, 5])


ax[1].plot(t, tf.squeeze(test_data[0]), 'b')
ax[1].plot(t, y, 'r')
ax[1].fill_between(t, y,  tf.squeeze(test_data[0]), color='lightcoral')
ax[1].legend(labels=["Actual ECG signal", "Low-pass + band-stop filtered", "Error"], loc='upper left')
ax[1].grid()
ax[1].set_xlim([0, 5])


plt.ylabel("Amplitude")
plt.xlabel("Time (s)")
#plt.xlim([1, 5])
plt.show()
fig.savefig('results2.pdf', dpi=120, format='pdf', bbox_inches='tight')

n = test_data.shape[0]


RMSE_Autoencoder = []
RMSE_Lowpass = []


for i in range(2):
  decoded = tf.squeeze(decoded_ecgs[i])
  filtered = do_filtering(noisy_test_data[0,:])


  RMSE_Autoencoder.append(np.sqrt(np.mean((test_data[i]-decoded)**2)))
  RMSE_Lowpass.append(np.sqrt(np.mean((test_data[i]-filtered)**2)))


print(np.mean(RMSE_Autoencoder))
print(np.mean(RMSE_Lowpass))
0.010725546
0.012756948
fig = plt.figure(figsize=(10, 6))
fig.tight_layout()


x = [i + 1 for i in range(30)]
plt.plot(x, history.history["loss"], label="Training loss", color='red', marker='x')
plt.plot(x, history.history["val_loss"], label="Validation loss", color='blue', marker='x')
plt.xticks(x)
#plt.ylim([0, max(*history.history["loss"], *history.history["val_loss"])])
plt.grid()
plt.xlabel("Epoch")
plt.ylabel("Mean-Squared-Error")
plt.legend()
plt.savefig('results_loss.pdf', dpi=120, format='pdf', bbox_inches='tight')

学术咨询:

担任《Mechanical System and Signal Processing》《中国电机工程学报》等期刊审稿专家,擅长领域:信号滤波/降噪,机器学习/深度学习,时间序列预分析/预测,设备故障诊断/缺陷检测/异常检测。

分割线分割线分割线分割线分割线分割线分割线分割线分割线分割线

基于蚁狮优化变分模态分解的非平稳信号降噪(MATLAB)

基于海洋捕食者优化算法的变分模态分解非平稳信号降噪(MATLAB)

完整代码通过学术咨询获得:

标签:loss,为例,Python,05,降噪,186,np,ax,data
From: https://blog.csdn.net/2301_78829506/article/details/144819455

相关文章

  • Python语法——增加代码可读性
    类型注释增加代码可读性fromtypingimportList,Dict,Set,Union,Optionaldefadd_enter(b:str)->str:returnb+'\n'defparse_data(data):total=0fork,vsindata.items():ifk[1]:forvinvs:......
  • python毕设 网上商城购物系统程序+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于网上商城购物系统的研究,现有研究主要集中在系统的整体架构、用户体验优化等方面,多以大型电商平台为研究对象。专门针对使用Python......
  • 淘宝店铺商品数据洞察:利用Python爬虫获取item_search_shop接口
    引言在电子商务的世界里,商品详情页是连接商家与消费者的重要桥梁。它不仅展示了商品的详细信息,还直接影响着消费者的购买决策。淘宝作为全球知名的电商平台,提供了丰富的API接口,使得开发者能够获取商品的详细信息。本文将探讨如何利用JAVA爬虫技术,获取淘宝的item_get_pro接口,以......
  • WxPython跨平台开发框架之模块字段权限的管理
    在我的很多Winform开发项目中,统一采用了权限管理模块来进行各种权限的控制,包括常规的功能权限(工具栏、按钮、菜单权限),另外还可以进行字段级别的字段权限控制,字段权限是我们在一些对权限要求比较严格的系统里面涉及到的,可以对部分用户隐藏一些敏感的信息,或者禁止不够权限的用户编辑......
  • 淘宝店铺商品数据洞察:利用Python爬虫获取item_search_shop接口
    引言在电商领域,数据的力量不容小觑。对于淘宝店铺而言,掌握店铺内所有商品的数据,对于优化库存、提升销售策略、增强用户体验等方面都至关重要。本文将探讨如何利用Python爬虫技术,获取淘宝的item_search_shop接口,以获得店铺的所有商品信息,包括商品ID、名称、价格、库存量等关键数据......
  • 新年到了!使用Python创建一个简易的接金元宝游戏
    引言在本教程中,我们将一起学习如何使用Python编程语言和Pygame库来创建一个简单的休闲游戏——“接金元宝”。准备工作 首先,确保你的计算机上已经安装了Python(推荐3.6以上版本)和Pygame库。如果还没有安装Pygame,可以通过pip命令轻松安装:pipinstallpygame没有安装的可......
  • python中的优先队列
    在Python中,优先队列(PriorityQueue)是一个可以随时获取队列中最大(或最小)元素的数据结构。Python的标准库heapq提供了一个实现最小堆的优先队列,默认情况下是最小堆,但可以通过一些技巧来实现最大堆。优先队列在算法中常用于求解最短路径、合并有序链表、求解k个最小/最大的元......
  • [oeasy]python056_python中下划线是什么意思_underscore_理解_声明与赋值_改名字
    python中下划线是什么意思_underscore_理解_声明与赋值_改名字回忆上次内容上次了解到已有的函数名、类名、模块名不适合覆盖了赋新值会失去原有功能比如max   添加图片注释,不超过140字(可选) 如果我就想让max当......
  • 从零开始:Python 新增的注解功能(Type Hints)
    适用读者:对Python有一定基础,想了解Python注解(TypeHints)以及它在代码可读性、调试与维护方面的作用的朋友们。一、什么是Python注解(TypeHints)?简单来说,**Python注解(TypeHints)**就是在变量或函数上标记“希望(或建议)它是某种类型”,从而帮助我们和其他开发者更好地理......
  • 西圣AVA2Pro:革新半入耳式降噪耳机体验,百元小金刚,碾压同级竞品
    作为国内性价比的标杆,西圣xisem耳机品牌一直以“平价享轻奢”的理念深受数码圈、音频发烧友以及运动人群的热爱,始终坚持以消费者需求为导向,西圣凭借其对技术创新的不懈追求,成功将平价高端音频科技带入大众视野。今天,西圣再次突破极限,正式发布全新力作——西圣AVA2Pro真无线蓝......