目录
1. 数据的获取
# 导入一些需要的库
# 由于Python是由社区推动的开源并且免费的开发语言,不受商业公司控制,因此,Python的改进往往比较激进,
# 不兼容的情况时有发生。Python为了确保你能顺利过渡到新版本,特别提供了__future__模块,
# 让你在旧的版本中试验新版本的一些特性。
# 如果你在main.py中写import string,那么在Python 2.4或之前, Python会先查找当前目录下
# 有没有string.py, 若找到了,则引入该模块,然后你在main.py中可以直接用string了。
# 如果你是真的想用同目录下的string.py那就好,但是如果你是想用系统自带的标准string.py呢?
# 那其实没有什么好的简洁的方式可以忽略掉同目录的string.py而引入系统自带的标准string.py。
# 这时候你就需要from __future__ import absolute_import了。这样,你就可以用
# import string来引入系统的标准string.py, 而用from pkg import string来引入
# 当前目录下的string.py了
from __future__ import absolute_import
# 如果你想在Python 2.7的代码中直接使用Python 3.x的精准除法,可以通过__future__模块的division实现
from __future__ import division
from __future__ import print_function
import math
import os
from six.moves import urllib
from six.moves import xrange
# 为了使用Skip-Gram方法训练语言模型,需要下载对应语言的语料库。在网站http://mattmahoney.net/dc/
# 上提供了大量英语语料库下载,为了方便学习,使用一个比较小的语料库http://mattmahoney.net/dc/text8.zip
# 作为示例训练模型,程序会自动下载这个文件
def maybe_download(url, filename, expected_bytes):
"""
:param filename: 如果filename不存在,在上面的地址下载它
:param expected_bytes: 如果filename存在,跳过下载
最终会检查文字的字节数是否和expected_bytes相同
"""
if not os.path.exists(filename):
filename, _ = urllib.request.urlretrieve(url + filename, filename)
statinfo = os.stat(filename)
if statinfo.st_size == expected_bytes:
print('Found and verified', filename)
else:
print(statinfo.st_size)
raise Exception(
'Failed to verify ' + filename + '. Can you get to it with a browser?'
)
return filename
if __name__ == '__main__':
url = 'http://mattmahoney.net/dc/'
filename = maybe_download(url, 'text8.zip', 31344016)
# 如果读者运行这段程序后,发现没有办法正常下载文件,可以尝试使用URL手动下载,并将下载好的文件放在当前目录下
2. 数据加载
# 下载、验证完成后,使用下面的程序将语料库中的数据读出来
import zipfile
import tensorflow as tf
from word_2_vector.download_data import maybe_download
# 将语料库解压,并转换成一个word的list
def read_data(filename):
"""
将下载好的zip文件解压并读取为word的list
"""
with zipfile.ZipFile(filename) as f:
print(f.namelist()[0])
data = tf.compat.as_str(f.read(f.namelist()[0])).split()
return data
if __name__ == '__main__':
url = 'http://mattmahoney.net/dc/'
filename = maybe_download(url, 'text8.zip', 31344016)
vocabulary = read_data(filename=filename)
print('Data size', len(vocabulary))
# 输出前100个词
# 词语本来是在连续的句子中的,现在已经被去掉了标点
print(vocabulary[0:100])
3. 制作数据集
# 制作词表
import collections
from word_2_vector.download_data import maybe_download
from word_2_vector.load_data import read_data
# 下载并取出语料库后,来制作一个单词表,它可以将单词映射为一个数字,这个数字是该单词的ID,即建立索引
# 一般来说,因为在语料库中有些词只出现有限的几次,如果单词表中包含了语料库中的所有词,会过于庞大。所以,
# 单词表一般只包含最常用的那些词。对于剩下的不常用的词,会将它替换为一个罕见词标记'UNK',所有罕见的词都会
# 被映射为同一个单词ID
# 制作一个词表,将单词映射为一个的ID
# 词表的大小为5万,即只考虑最常出现的5万个词
# 将不常见的词变成一个UNK标识符,映射到统一的ID
def build_dataset(words, n_words):
"""
将原始的单词表示变成index索引表示
"""
count = [['UNK', -1]]
count.extend(collections.Counter(words).most_common(n_words-1))
dictionary = dict()
for word, _ in count:
dictionary[word] = len(dictionary)
data = list()
unk_count = 0
for word in words:
if word in dictionary:
index = dictionary[word]
else:
index = 0 # 如果没有的词就和UNK一样是索引0
unk_count += 1
data.append(index)
count[0][1] = unk_count
reversed_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
return data, count, dictionary, reversed_dictionary
if __name__ == '__main__':
url = 'http://mattmahoney.net/dc/'
filename = maybe_download(url, 'text8.zip', 31344016)
vocabulary = read_data(filename=filename)
vocabulary_size = 50000
data, count, dictionary, reverse_dictionary = build_dataset(vocabulary, vocabulary_size)
del vocabulary # 删除以节省内存
# 输出最常见的5个单词
print('Most common words (+UNK)', count[:5])
# 输出转换后的数据库data,和原来的单词,前10个
print('Sample data', data[:10], [reverse_dictionary[i] for i in data[:10]])
# 在这里的程序中,单词表中只包含了最常用的50000个单词。请注意,在这个实现中,名词的单复数形式,如boy和boys,
# 动词的不同时态,如make和made都被算作是不同的单词。原来的训练数据vocabulary是一个单词的列表,在经过转换后,
# 它变成了一个单词ID的列表,即程序中的变量data,它的形式是[5234, 3081, 12, 6, 195, 2, 3134, 46, ...]
4. 制作训练集
import random
import numpy as np
import collections
from word_2_vector.download_data import maybe_download
from word_2_vector.load_data import read_data
from word_2_vector.make_dict import build_dataset
# 得到的变量data包含了训练集中所有的数据,现在把它转换成训练时使用的batch数据
# 一个batch可以看作是一些"单词对"的集合,如woman->man,woman->fell,箭头左边表示"出现的单词",
# 右边表示该单词所在的"上下文"中的单词,这是所说的Skip-Gram方法
data_index = 0
def generate_batch(batch_size, num_skips, skip_window, data):
"""
每运行一次这个函数,会产生一个batch的数据以及对应的标签labels
:param batch_size: 一个批次中单词对的个数
:param num_skips: 在生成单词对时,会在语料库中先取一个长度为skip_window * 2 + 1连续单词列表
这个单词列表放在上面程序中的变量buffer。buffer中最中间的那个单词是skip-gram
方法中"出现的单词",其余的skip_window * 2个单词是它的"上下文"。
会在skip_window*2个单词中随机选取num_skips个单词,放入标签labels
:param skip_window:
:param data:
:return: 返回两个值batch和labels,前者表示skip-gram方法中"出现的单词",后者表示"上下文"中的单词
它们的形状分别为(batch_size,)和 (batch_size, 1)
"""
# data_index相当于一个指针,初始为0
# 每次生成一个batch,data_index会相应地往后推
global data_index
assert batch_size % num_skips == 0
assert num_skips <= 2 * skip_window
batch = np.ndarray(shape=(batch_size), dtype=np.int32)
labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
span = 2 * skip_window + 1 # [ skip_window target skip_window ]
buffer = collections.deque(maxlen=span)
# data_index是当前数据开始的位置
# 产生batch后往后推1位(产生batch)
for _ in range(span):
buffer.append(data[data_index])
data_index = (data_index + 1) % len(data)
for i in range(batch_size // num_skips):
# 利用buffer生成batch
# buffer是一个长度为2*skip_window + 1长度的word list
# 一个buffer生成num_skips个数的样本
target = skip_window # target label at the center of the buffer
# targets_to_avoid保证样本不重复
targets_to_avoid = [skip_window]
for j in range(num_skips):
while target in targets_to_avoid:
target = random.randint(0, span - 1)
targets_to_avoid.append(target)
batch[i * num_skips + j] = buffer[skip_window]
labels[i * num_skips + j, 0] = buffer[target]
buffer.append(data[data_index])
# 每利用buffer生成num_skips个样本,data_index向后推进一位
data_index = (data_index + 1) % len(data)
data_index = (data_index + len(data) - span) % len(data)
return batch, labels
if __name__ == '__main__':
url = 'http://mattmahoney.net/dc/'
filename = maybe_download(url, 'text8.zip', 31344016)
vocabulary = read_data(filename=filename)
vocabulary_size = 50000
data, count, dictionary, reverse_dictionary = build_dataset(vocabulary, vocabulary_size)
del vocabulary # 删除以节省内存
# 输出最常见的5个单词
print('Most common words (+UNK)', count[:5])
# 输出转换后的数据库data,和原来的单词,前10个
print('Sample data', data[:10], [reverse_dictionary[i] for i in data[:10]])
# 默认情况下skip_window=1, num_skips=2
# 此时是从连续的3 (3 = skip_window * 2 + 1)个词中生成2(num_skips)个样本
# 如连续的三个词['used', 'against', 'early']
# 生成两个样本:against -> used, against -> early
batch, labels = generate_batch(batch_size=8, num_skips=2, skip_window=1, data=data)
for i in range(8):
print(batch[i], reverse_dictionary[batch[i]], '->', labels[i, 0], reverse_dictionary[labels[i, 0]])
5. 模型定义
import tensorflow as tf
import math
# 此处的模型实际可以抽象为:用一个单词预测另一个单词,在输出时,不使用softmax损失,而使用NCE损失,
# 即再选取一些"噪声词",作为负样本进行两类分类
# 建立模型
def generate_graph(vocabulary_size, valid_examples):
batch_size = 128
embedding_size = 128 # 词嵌入空间是128维的。即word2vec中的vec是一个128维的向量
# 构造损失时选取的噪声词的数量
num_sampled = 64
graph = tf.Graph()
with graph.as_default():
# 输入的batch
train_inputs = tf.placeholder(tf.int32, shape=[batch_size])
train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
# 用于验证的词
valid_dataset = tf.constant(valid_examples, dtype=tf.int32)
# 下面采用的某些函数还没有GPU实现,所以只在CPU上定义模型
with tf.device('/cpu:0'):
# 定义一个embeddings变量,这个变量的形状是(vocabulary_size,embedding_size)
# 相当于每一行存储了一个单词的嵌入向量embedding,例如,单词id为0的嵌入是
# embeddings[0,:],单词id为1的嵌入是embeddings[1:],依次类推
embeddings = tf.Variable(
tf.random_uniform([vocabulary_size, embedding_size], -1.0, -1.0)
)
# 利用embedding_lookup可以轻松得到一个batch内的所有的词嵌入
embed = tf.nn.embedding_lookup(embeddings, train_inputs)
# 创建两个变量用于NCE Loss(即选取噪声词的二分类损失)
nce_weights = tf.Variable(
tf.truncated_normal([vocabulary_size, embedding_size],
stddev=1.0 / math.sqrt(embedding_size))
)
nce_bias = tf.Variable(tf.zeros([vocabulary_size]))
# tf.nn.nce_loss会自动选取噪声词,并且形成损失
# 随机选取num_sampled个噪声词
loss = tf.reduce_mean(
tf.nn.nce_loss(weights=nce_weights, biases=nce_bias,
labels=train_labels, inputs=embed,
num_sampled=num_sampled, num_classes=vocabulary_size)
)
# 得到loss后,可以构造优化器了
optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)
# 对embedding层做一次归一化
# 由于直接得到的embeddings矩阵可能在各个维度上有不同的大小,为了使计算的相似度更合理,
# 先对其做一次归一化,用归一化后的normalized_embeddings计算验证词和其他单词的相似度。
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True))
normalized_embeddings = embeddings / norm
# 找出和验证词的embedding并计算它们和所有单词的相似度(用于验证)
# 在训练模型时,还希望对模型进行验证。此处采取的方法是选出一些"验证单词",
# 计算在嵌入空间中与其最相近的词。
valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid_dataset)
similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)
# 变量初始化步骤
init = tf.global_variables_initializer()
return graph, init, train_inputs, train_labels, loss, optimizer, normalized_embeddings, similarity
6. 训练模型
import tensorflow as tf
import numpy as np
from sklearn.manifold import TSNE
from word_2_vector.make_trainset import generate_batch
from word_2_vector.define_model import generate_graph
from word_2_vector.download_data import maybe_download
from word_2_vector.load_data import read_data
from word_2_vector.make_dict import build_dataset
from word_2_vector.visualization import plot_with_labels
def train(graph, init, train_inputs, train_labels, loss, optimizer, normalized_embeddings, similarity
, reverse_dictionary, data):
num_steps = 100001
with tf.Session(graph=graph) as session:
# 初始化变量
init.run()
print('Initialized')
average_loss = 0
for step in range(num_steps):
batch_inputs, batch_labels = generate_batch(
batch_size=batch_size, num_skips=num_skips, skip_window=skip_window, data=data
)
feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}
# 优化一下
_, loss_val = session.run([optimizer, loss], feed_dict=feed_dict)
average_loss += loss_val
if step % 2000 == 0:
if step > 0:
average_loss /= 2000
# 2000个batch的平均损失
print('Average loss at step ', step, ': ', average_loss)
average_loss = 0
# 每1万步,进行一次验证
if step % 10000 == 0:
# sim是验证词与所有词之间的相似度
sim = similarity.eval()
# 一共有valid_size个验证词
for i in range(valid_size):
valid_word = reverse_dictionary[valid_examples[i]]
top_k = 8 # 输出最相邻的8个词语
nearest = (-sim[i, :]).argsort()[1: top_k+1]
log_str = 'Nearest to %s:' % valid_word
for k in range(top_k):
close_word = reverse_dictionary[nearest[k]]
log_str = '%s %s,' % (log_str, close_word)
print(log_str)
# final_embeddings是最后得到的embedding向量
# 它的形状是[vocabulary_size, embedding_size]
# 每一行代表着对应单词id的词嵌入表示
final_embeddings = normalized_embeddings.eval()
# 最终,得到的词嵌入向量为final_embeddings,它是归一化后的词嵌入向量,
# 形状为(vocabulary_size, embedding_size),final_embeddings[0, :]
# 是id为0的单词对应的词嵌入表示,final_embeddings[1, :]是id为1的单词
# 对应的词嵌入表示,以此类推
# 因为embedding大小为128维,没有办法直接可视化
# 所以用t-SNE方法进行降维
tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
# 只画出500个词的位置
plot_only = 500
low_dim_embs = tsne.fit_transform(final_embeddings[:plot_only, :])
labels = [reverse_dictionary[i] for i in range(plot_only)]
plot_with_labels(low_dim_embs, labels)
if __name__ == '__main__':
url = 'http://mattmahoney.net/dc/'
filename = maybe_download(url, 'text8.zip', 31344016)
vocabulary = read_data(filename=filename)
vocabulary_size = 50000
data, count, dictionary, reverse_dictionary = build_dataset(vocabulary, vocabulary_size)
del vocabulary # 删除以节省内存
# 输出最常见的5个单词
print('Most common words (+UNK)', count[:5])
# 输出转换后的数据库data,和原来的单词,前10个
print('Sample data', data[:10], [reverse_dictionary[i] for i in data[:10]])
# 在训练过程中,会对模型进行验证
# 验证的方法是找出和某个词最近的词
# 只对前valid_window的词进行验证,因为这些词最常出现
valid_size = 16 # 每次验证16个词
valid_window = 100 # 这16个词是从前100个最常见的词中选出来的
valid_examples = np.random.choice(valid_window, valid_size, replace=False)
batch_size = 128
skip_window = 1 # skip_window参数和之前保持一致
num_skips = 2 # num_skips参数和之前保持一致
graph, init, train_inputs, train_labels, loss, optimizer, normalized_embeddings, similarity = \
generate_graph(vocabulary_size, valid_examples)
train(graph, init, train_inputs, train_labels, loss, optimizer, normalized_embeddings
, similarity, reverse_dictionary, data)
7. 可视化
import matplotlib.pyplot as plt
# 由于之前设定的embedding_size=128,即每个词都被表示为一个128维的向量,虽然没有方法把128维的空间
# 直接画出来,但是下面的程序使用了t-SNE方法把128维空间映射到了2维,并画出最常使用的500个词的位置。
# 画出的图片保存为tsne.png文件
def plot_with_labels(low_dim_embs, labels, filename='tsne.png'):
assert low_dim_embs.shape[0] >= len(labels), 'More labels than embeddings'
plt.figure(figsize=(20, 20)) # in inches
for i, label in enumerate(labels):
x, y = low_dim_embs[i, :]
plt.scatter(x, y)
plt.annotate(label,
xy=(x, y),
xytext=(5, 2),
textcoords='offset points',
ha='right',
va='bottom')
plt.savefig(filename)
# 因为循环太多,每个循环里面画了一张图,所以可以在每个循环内把plt关闭
plt.close()