一.原理说明
变分自编码器是自编码器的改进版本,自编码器是一种无监督学习,但它无法产生新的内容,变分自编码器对其潜在空间进行拓展,使其满足正态分布,情况就大不一样了。
自编码器是通过对输入X进行编码后得到一个低维的向量z,然后根据这个向量还原出输入X。通过对比X与X ̃的误差,再利用神经网络去训练使得误差逐渐减小,从而达到非监督学习的目的。下图为自编码器的架构图。
变分自编码器关键一点就是增加一个对潜在空间Z的正态分布约束,如何确定这个正态分布就成主要目标,我们知道要确定正态分布,只要确定其两个参数均值u和标准差σ。那么如何确定u、σ?用一般的方法或估计比较麻烦,效果也不好,用神经网络去拟合,简单高效。下图为VAE的架构图
二.数据说明
MINST数据集是机器学习领域一个经典的数据集,其中包括70000个样本,包括60000个训练样本和10000个测试样本
三.代码实战
第一步:导入头文件
import os
import time
import struct
import numpy as np
import glob
import matplotlib.pyplot as plt
import PIL
import imageio
import tensorflow as tf
from IPython import display
print(tf.__version__)
第二步:导入数据并进行预处理
def dense_to_one_hot(labels_dense, num_classes=10):
"""将类标签从标量转换为一个独热向量"""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
def load_mnist(path, kind='train'):
"""根据指定路径加载数据集"""
labels_path = os.path.join(path, '%s-labels-idx1-ubyte' % kind)
images_path = os.path.join(path, '%s-images-idx3-ubyte' % kind)
with open(labels_path, 'rb') as lbpath:
magic, n = struct.unpack('>II',lbpath.read(8))
labels = np.fromfile(lbpath, dtype=np.uint8)
labels=dense_to_one_hot(labels)
with open(images_path, 'rb') as imgpath:
magic, num, rows, cols = struct.unpack(">IIII",imgpath.read(16))
images = np.fromfile(imgpath, dtype=np.uint8).reshape(len(labels), 784)
return images, labels
X_train, y_train = load_mnist('../data/MNIST/raw/', kind='train')
print('Rows: %d, columns: %d' % (X_train.shape[0], X_train.shape[1]))
print('Rows: %d, columns: %d' % ( y_train.shape[0], y_train.shape[1]))
X_test, y_test = load_mnist('../data/MNIST/raw/', kind='t10k')
print('Rows: %d, columns: %d' % (X_test.shape[0], X_test.shape[1]))
# 构建数据集
train_images = X_train.reshape(X_train.shape[0], 28, 28, 1).astype('float32')
test_images = X_test.reshape(X_test.shape[0], 28, 28, 1).astype('float32')
# 化到0-1之间
train_images /= 255.0
test_images /= 255.0
# 二值化
train_images[train_images>=0.5] = 1.0
train_images[train_images<0.5] = 0.0
test_images[test_images>=0.5] = 1.0
test_images[test_images<0.5] = 0.0
# 超参数
TRAIN_BUF=60000
BATCH_SIZE = 100
TEST_BUF = 10000
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(TRAIN_BUF).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices(test_images).shuffle(TEST_BUF).batch(BATCH_SIZE)
第三步:搭建模型网络
class VAE(tf.keras.Model):
def __init__(self, latent_dim):
super(VAE, self).__init__()
self.latent_dim = latent_dim
self.inference_net = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(
filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
tf.keras.layers.Conv2D(
filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(latent_dim+latent_dim)
])
self.generative_net = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
tf.keras.layers.Dense(units=7*7*32, activation='relu'),
tf.keras.layers.Reshape(target_shape=(7, 7, 32)),
tf.keras.layers.Conv2DTranspose(
filters=64, kernel_size=3, strides=(2, 2),
padding='SAME', activation='relu'
),
tf.keras.layers.Conv2DTranspose(
filters=32, kernel_size=3, strides=(2, 2),
padding='SAME', activation='relu'
),
# 不使用激活函数
tf.keras.layers.Conv2DTranspose(
filters=1, kernel_size=3, strides=(1, 1),
padding='SAME'
),
])
@tf.function
def sample(self, eps=None):
if eps is None:
eps = tf.random.normal(shape=(100, self.latent_dim))
return self.decode(eps, apply_sigmoid=True)
def encode(self, x):
mean, logvar = tf.split(self.inference_net(x), num_or_size_splits=2,
axis=1)
return mean, logvar
def reparameterize(self, mean, logvar):
eps = tf.random.normal(shape=mean.shape)
return eps * tf.exp(logvar * 0.5) + mean
def decode(self, z, apply_sigmoid=False):
logits = self.generative_net(z)
if apply_sigmoid:
probs = tf.sigmoid(logits)
return probs
return logits
第四步:训练
optimizer = tf.keras.optimizers.Adam(1e-4)
def log_normal_pdf(sample, mean, logvar, raxis=1):
log2pi = tf.math.log(2.0 * np.pi)
return tf.reduce_sum(
-0.5*((sample -mean)**2.0 * tf.exp(-logvar)+logvar+log2pi),
axis=raxis
)
@tf.function
def compute_loss(model, x):
mean, logvar = model.encode(x)
z = model.reparameterize(mean, logvar)
x_logit = model.decode(z)
cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
logpx_z = -tf.reduce_sum(cross_ent, axis=[1,2,3])
logpz = log_normal_pdf(z, 0.0, 0.0)
logpz_x = log_normal_pdf(z, mean, logvar)
return -tf.reduce_mean(logpx_z+logpz-logpz_x)
@tf.function
def compute_apply_gradients(model, x, optimizer):
with tf.GradientTape() as tape:
loss = compute_loss(model, x)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
epochs = 100
latent_dim = 50
num_examples_to_generate = 16
# 保持随机向量恒定以进行生成(预测),以便看到改进。
random_vector_for_generation = tf.random.normal(
shape=[num_examples_to_generate, latent_dim])
model = VAE(latent_dim)
# 使用输入生成图片
def generate_and_save_images(model, epoch, test_input):
predictions = model.sample(test_input)
fig = plt.figure(figsize=(4,4))
for i in range(predictions.shape[0]):
plt.subplot(4, 4, i+1)
plt.imshow(predictions[i, :, :, 0], cmap='gray')
plt.axis('off')
# tight_layout 最小化两个子图之间的重叠
plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
plt.show()
for epoch in range(1, epochs+1):
start_time = time.time()
for train_x in train_dataset:
compute_apply_gradients(model, train_x, optimizer)
end_time = time.time()
if epoch % 20 == 0:
loss = tf.keras.metrics.Mean()
for test_x in test_dataset:
loss(compute_loss(model, test_x))
elbo = -loss.result()
display.clear_output(wait=False)
print('Epoch: {}, Test set ELBO: {}, '
'time elapse for current epoch {}'.format(epoch,
elbo,
end_time - start_time))
generate_and_save_images(model, epoch, random_vector_for_generation)
结果:Epoch: 100, Test set ELBO: -78.37091064453125, time elapse for current epoch 1.5785579681396484
可视化结果:
标签:编码器,Keras,labels,变分,images,shape,train,tf,keras From: https://blog.csdn.net/u013289254/article/details/143752201