C-MAPSS是针对航空发动机剩余寿命预测的数据集。该数据集由NASA(美国国家航空航天局)发布,包含了四个不同类型的航空发动机的传感器数据,以及相应的故障模式和剩余寿命数据,如表1所示。
Dataset | FD001 | FD002 | FD003 | FD004 |
---|---|---|---|---|
Engine units for training | 100 | 260 | 100 | 249 |
Engine units for testing | 100 | 259 | 100 | 248 |
Operating conditions | 1 | 6 | 1 | 6 |
Fault modes | 1 | 1 | 2 | 2 |
Training samples (default) | 17731 | 48819 | 21820 | 57522 |
Testing samples | 100 | 259 | 100 | 248 |
FD001~FD004这4个数据子集包含的参数类型完全相同,原始文件为txt类型,每列参数名称如表2所示。这些传感器数据包括了发动机的操作参数、振动和温度等信息供24个传感器信息,如表3所示,可以用于训练和测试故障诊断和剩余寿命预测模型。该数据集被广泛应用于机器学习和数据挖掘领域,为航空发动机的健康管理提供了有价值的数据支持。
列数 | 1 | 2 | 3~5 | 6~26 |
---|---|---|---|---|
参数名称 | 发动机引擎id | 引擎的当前循环数 | 操作条件 | 传感器数据 |
数据预处理代码(语言为python)
代码来源于《Variational encoding approach for interpretable assessment of remaining useful life estimation》作者的公开代码,笔者有更改,不保证绝对正确,请谨慎使用。
github
: https://github.com/NahuelCostaCortez/RemainingUseful-Life-Estimation-Variational
import numpy as np
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import StandardScaler
import pandas as pd
def add_remaining_useful_life(df):
# Get the total number of cycles for each unit
grouped_by_unit = df.groupby(by="unit_nr")
max_cycle = grouped_by_unit["time_cycles"].max()
# Merge the max cycle back into the original frame
result_frame = df.merge(max_cycle.to_frame(name='max_cycle'), left_on='unit_nr', right_index=True)
# Calculate remaining useful life for each row
remaining_useful_life = result_frame["max_cycle"] - result_frame["time_cycles"]
result_frame["RUL"] = remaining_useful_life
# drop max_cycle as it's no longer needed
result_frame = result_frame.drop("max_cycle", axis=1)
return result_frame
def add_operating_condition(df):
df_op_cond = df.copy()
df_op_cond['setting_1'] = abs(df_op_cond['setting_1'].round())
df_op_cond['setting_2'] = abs(df_op_cond['setting_2'].round(decimals=2))
# converting settings to string and concatanating makes the operating condition into a categorical variable
df_op_cond['op_cond'] = df_op_cond['setting_1'].astype(str) + '_' + \
df_op_cond['setting_2'].astype(str) + '_' + \
df_op_cond['setting_3'].astype(str)
return df_op_cond
def condition_scaler(df_train, df_test, sensor_names):
# apply operating condition specific scaling
scaler = StandardScaler()
for condition in df_train['op_cond'].unique():
scaler.fit(df_train.loc[df_train['op_cond']==condition, sensor_names])
df_train.loc[df_train['op_cond']==condition, sensor_names] = scaler.transform(df_train.loc[df_train['op_cond']==condition, sensor_names])
df_test.loc[df_test['op_cond']==condition, sensor_names] = scaler.transform(df_test.loc[df_test['op_cond']==condition, sensor_names])
return df_train, df_test
def exponential_smoothing(df, sensors, n_samples, alpha=0.4):
df = df.copy()
# first, take the exponential weighted mean
df[sensors] = df.groupby('unit_nr')[sensors].apply(lambda x: x.ewm(alpha=alpha).mean()).reset_index(level=0, drop=True)
# second, drop first n_samples of each unit_nr to reduce filter delay
def create_mask(data, samples):
result = np.ones_like(data)
result[0:samples] = 0
return result
mask = df.groupby('unit_nr')['unit_nr'].transform(create_mask, samples=n_samples).astype(bool)
df = df[mask]
return df
def gen_train_data(df, sequence_length, columns):
data = df[columns].values
num_elements = data.shape[0]
# -1 and +1 because of Python indexing
for start, stop in zip(range(0, num_elements-(sequence_length-1)), range(sequence_length, num_elements+1)):
yield data[start:stop, :]
def gen_data_wrapper(df, sequence_length, columns, unit_nrs=np.array([])):
if unit_nrs.size <= 0:
unit_nrs = df['unit_nr'].unique()
data_gen = (list(gen_train_data(df[df['unit_nr']==unit_nr], sequence_length, columns))
for unit_nr in unit_nrs)
data_array = np.concatenate(list(data_gen)).astype(np.float32)
return data_array
def gen_labels(df, sequence_length, label):
data_matrix = df[label].values
num_elements = data_matrix.shape[0]
# -1 because I want to predict the rul of that last row in the sequence, not the next row
return data_matrix[sequence_length-1:num_elements, :]
def gen_label_wrapper(df, sequence_length, label, unit_nrs=np.array([])):
if unit_nrs.size <= 0:
unit_nrs = df['unit_nr'].unique()
label_gen = [gen_labels(df[df['unit_nr']==unit_nr], sequence_length, label)
for unit_nr in unit_nrs]
label_array = np.concatenate(label_gen).astype(np.float32)
return label_array
def gen_test_data(df, sequence_length, columns, mask_value):
if df.shape[0] < sequence_length:
data_matrix = np.full(shape=(sequence_length, len(columns)), fill_value=mask_value) # pad
idx = data_matrix.shape[0] - df.shape[0]
data_matrix[idx:,:] = df[columns].values # fill with available data
else:
data_matrix = df[columns].values
# specifically yield the last possible sequence
stop = data_matrix.shape[0]
start = stop - sequence_length
for i in list(range(1)):
yield data_matrix[start:stop, :]
def get_data(dataset, sensors, sequence_length, alpha, threshold):
# files
dir_path = './data/'
train_file = 'train_'+dataset+'.txt'
test_file = 'test_'+dataset+'.txt'
# columns
index_names = ['unit_nr', 'time_cycles']
setting_names = ['setting_1', 'setting_2', 'setting_3']
sensor_names = ['s_{}'.format(i+1) for i in range(0,21)]
col_names = index_names + setting_names + sensor_names
# data readout
train = pd.read_csv((dir_path+train_file), sep=r'\s+', header=None,
names=col_names)
test = pd.read_csv((dir_path+test_file), sep=r'\s+', header=None,
names=col_names)
y_test = pd.read_csv((dir_path+'RUL_'+dataset+'.txt'), sep=r'\s+', header=None,
names=['RemainingUsefulLife'])
# create RUL values according to the piece-wise target function
train = add_remaining_useful_life(train)
train['RUL'].clip(upper=threshold, inplace=True)
y_test['RemainingUsefulLife'].clip(upper=threshold, inplace=True)
# remove unused sensors
drop_sensors = [element for element in sensor_names if element not in sensors]
# scale with respect to the operating condition
X_train_pre = add_operating_condition(train.drop(drop_sensors, axis=1))
X_test_pre = add_operating_condition(test.drop(drop_sensors, axis=1))
X_train_pre, X_test_pre = condition_scaler(X_train_pre, X_test_pre, sensors)
# exponential smoothing
X_train_pre= exponential_smoothing(X_train_pre, sensors, 0, alpha)
X_test_pre = exponential_smoothing(X_test_pre, sensors, 0, alpha)
# train-val split
gss = GroupShuffleSplit(n_splits=1, train_size=0.80, random_state=42)
# generate the train/val for *each* sample -> for that we iterate over the train and val units we want
# this is a for that iterates only once and in that iterations at the same time iterates over all the values we want,
# i.e. train_unit and val_unit are not a single value but a set of training/vali units
for train_unit, val_unit in gss.split(X_train_pre['unit_nr'].unique(), groups=X_train_pre['unit_nr'].unique()):
train_unit = X_train_pre['unit_nr'].unique()[train_unit] # gss returns indexes and index starts at 1
val_unit = X_train_pre['unit_nr'].unique()[val_unit]
x_train = gen_data_wrapper(X_train_pre, sequence_length, sensors, train_unit)
y_train = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], train_unit)
x_val = gen_data_wrapper(X_train_pre, sequence_length, sensors, val_unit)
y_val = gen_label_wrapper(X_train_pre, sequence_length, ['RUL'], val_unit)
# create sequences for test
test_gen = (list(gen_test_data(X_test_pre[X_test_pre['unit_nr']==unit_nr], sequence_length, sensors, -99.))
for unit_nr in X_test_pre['unit_nr'].unique())
x_test = np.concatenate(list(test_gen)).astype(np.float32)
return x_train, y_train, x_val, y_val, x_test, y_test['RemainingUsefulLif
代码来源于《Variational encoding approach for interpretable assessment of remaining useful life estimation》作者的公开代码,笔者有更改,不保证绝对正确,请谨慎使用,谢谢。
github
: https://github.com/NahuelCostaCortez/RemainingUseful-Life-Estimation-Variational