Word2Vec原理与Tensorflow实现
引言:
Word2Vec也成为Word Embeddings,一般称为“词向量”或者“词嵌入”。笼统地说Word2Vec就是要把语言中字词转为向量形式的表达 接下来就是要来谈谈为什么要把字词转为向量的形式
- 使用Word2Vec的原因:
I want a glass of orange ____ 中期望的单词为juice时,此时去回答
I want a glass of apple ____ 中期望的单词时,就会遇到困难。这是因为仅仅使用one-hot编码无法体现apple和orange的相似性(任何两个one-hot编码的内积都为0)
此外,将字词存储为稀疏向量的话,我们通常需要更多的数据来训练,因为稀疏数据训练的效率比较低,计算也比较麻烦
因此,我们可以使用特征表征(Featurized representation)的方法对每个单词进行编码。也就是使用一个特征向量表征单词,特征向量的每个元素都是对该单词某一特征的量化描述,量化范围可以是[-1,1]之间。特征表征的例子如下图所示:
为了结果更加直观,我们也可以通过T-SNE算法将高维空间中的数据,降维到2D平面图中,来可视化结果:
-
Word2Vec简介:
Word2Vec是一种计算非常高效的,可以从原始语料中学习字词向量空间的预测模型。
词向量的学习其实是学习一个词嵌入矩阵,比如我们设置每一个单词的维度为128D,共有5000个单词,那么词嵌入矩阵就是大小的矩阵,用来表示这个单词,对每一个单词,有一个one-hot编码(注意,这个one-hot编码仅仅只是寻找其对应的词向量),用来表示,其维度为。
因此,我们只需要训练好,那么其对应的词向量就是:但是这种做法效率并不高,通常的做法就是直接选取中第行即可
他主要分为两种模式(本文实现的主要的Skip-Gram模式的Word2Vec):
1. CBOW:例如从原始语句I want a glass of orange __ 推测目标单词为juice
2. Skip-Gram:与CBOW正好相反,他是从目标字词推出原始语句
Skip-Gram其训练样本的通常是要构造一个语境与目标词汇的映射关系,其方法就是首先选择一个单词作为context,然后用一个宽度自定义的滑动窗,在context周围选取一个单词作为target。
以I want a glass of orange juice为例,如果我们选取context为orange,滑动窗大小为1,那么target word就可以是juice。通过上面这种方法就可以获取许多个context->target word的监督训练样本。对于负样本的选取,通常采用的的一种方法叫做 负采样(Negative Sampling)。 在这种情况下,我们只需要随机选取k个词汇,而非词汇表中全部词汇,因此训练速度非常快。当然,仔细的朋友会注意到负样本中出现了滑动窗口范围内的target:of,但是在一个大样本下采样到窗口范围内的单词其实也是一个小概率事件,对结果并不会产生比较大的影响。
然而,对于Word2Vec中的特征学习,可以不需要一个完整的概率模型。CBOW和Skip-Gram模型在输出端使用的是一个二分类器(即Logistic Regression),来区分目标词和词库中其他的 个词。此时最大化的目标函数如下:
其中为二元逻辑回归的概率,具体为在数据集D中,输入的、上下文为的情况下出现的概率,公式后半部分为个负采样的样本。从这个损失函数也可以看出,该目标优化目标是预测真实的目标词汇为高概率,同时预测其他负样本为低概率
-
Word2Vec实现:
Note: 如果方便,建议移步我的github,用jupyter notebook阅读代码更加清晰易懂,中间关键程序都会打印相应结果帮助理解
import collections
import math
import os
import random
import zipfile
import numpy as np
import urllib
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
# 下载文本数据的地址
url = 'http://mattmahoney.net/dc/'
# 下载数据的压缩文件并核对文件尺寸,如果已经下载了文件则跳过
def maybe_download(filename, expected_bytes):
# 如果不存在该文件
if not os.path.exists(filename):
# filename: 保存的文件名
filename, _ = urllib.request.urlretrieve(url + filename, filename)
# stat 系统调用时用来返回相关文件的系统状态信息
statinfo = os.stat(filename)
# 如果下载的文件尺寸与期望的尺寸相同
if statinfo.st_size == expected_bytes:
print('Found and verified', filename)
else: # 如果尺寸不对
print(statinfo.st_size)
# 抛出异常
raise Exception('Failed to verify ' + filename + '. Can you get to it with a browser?')
return filename
filename = maybe_download('text8.zip', 31344016)
# 解压下载的压缩文件
def read_data(filename):
with zipfile.ZipFile(filename) as f:
# 将数据转为单词的列表
data = tf.compat.as_str(f.read(f.namelist()[0])).split()
# print(f.namelist())
# print(f.namelist()[0])
return data
words = read_data(filename)
print('Data Size', len(words))
# 创建vocabulary词汇表
vocabulary_size = 50000
'''
dictionary: 存储了top 5000的单词(字典),key为单词,value为编号
words: 存储全部的单词
data: 存储top 5000单词的编号,不在top 5000中的单词存储编号为0(列表)
reverse_dictionary: 将dictionary键值对换后的字典
'''
def build_dataset(words):
count = [['UNK', -1]]
# Counter: 统计单词列表中单词的频数
# most_common: 选取top 50000频数的单词作为vocabulary
count.extend(collections.Counter(words).most_common(vocabulary_size - 1)) # 添加到列表count中
# python中dict的查询复杂度为O(1)
dictionary = dict()
for word, _ in count:
# 按频度存储每个词的编号
dictionary[word] = len(dictionary)
data = list()
unk_count = 0
# 遍历单词列表,对每一个单词若出现在dictionary中,则将其转为编号,若不是则转为编号0
for word in words:
# 单词频度在top 50000
if word in dictionary:
index = dictionary[word]
else: # top 50000以外的单词,我们认定其为Unknow,将其标号为0,并统计其数量
index = 0
unk_count += 1
data.append(index)
count[0][1] = unk_count
# 词汇表的反转形式
reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
return data, count, dictionary, reverse_dictionary
data, count, dictionary, reverse_dictionary = build_dataset(words)
# 删除原始单词列表,以节约内存
del words
print('Most common words (+Unk)', count[:5])
print('Sample data', data[:10], [reverse_dictionary[i] for i in data[:10]])
data_index = 0
# 生成训练用的batch数据
def generate_batch(batch_size, num_skips, skip_window):
'''
batch_size: batch的大小
num_skips: 对每个单词可生成的样本 <= skip_window * 2, 且必须能被batch_size整除
skip_window: 指单词最远可以联系的距离
'''
# 定义为全局变量,因为我们要确保data_index可在函数内部进行修改
global data_index
# 断言函数,保证我们的要求满足
assert batch_size % num_skips == 0
assert num_skips <= 2 * skip_window
# 将batch和labels初始化为数组
batch = np.ndarray(shape = (batch_size), dtype = np.int32)
labels = np.ndarray(shape = (batch_size, 1), dtype = np.int32)
# span: 为某个单词创建相关样本时所需要的单词数量(1是单词其本身)
span = 2 * skip_window + 1
# 创建一个最大容量为span的双端队列
buffer = collections.deque(maxlen = span)
# 从序号data_index开始,把span个单词顺序读入buffer作为初始值
for _ in range(span):
buffer.append(data[data_index]) # buffer中存储的是单词的编号
data_index = (data_index + 1) % len(data)
# 每次循环对一个目标单词生成样本
for i in range(batch_size // num_skips):
target = skip_window # 表示buffer中第skip_window个变量为目标单词
targets_to_avoid = [skip_window] # 生成样本是要避免出现的单词
for j in range(num_skips):
while rnd in targets_to_avoid:
target = random.randint(0, span-1) # 产生随机数,直到随机数不在target_to_avoid中
targets_to_avoid.append(target)
# batch和labels存储的都是单词的编号
batch[i * num_skips + j] = buffer[skip_window] # 对应的目标,即context对应的单词
labels[i * num_skips + j, 0] = buffer[target] # 自己的标签,即target对应的单词
buffer.append(data[data_index]) # buffer中第一个单词会因为append后一个单词而出队
data_index = (data_index + 1) % len(data)
return batch, labels
batch, labels = generate_batch(batch_size = 8, num_skips = 2, skip_window = 1)
for i in range(8):
print(batch[i], reverse_dictionary[batch[i]], '->', labels[i, 0],
reverse_dictionary[labels[i, 0]])
# ['anarchism', 'originated', 'as', 'a', 'term', 'of',...]
batch_size = 128
embedding_size = 128 # 单词转为稠密向量的维度
skip_window = 1 # 前后最远可以联系的距离
num_skip = 2 # 对每个目标单词提取的样本数
'''
生成验证数据valid_examples,这里随机抽取一些频数最高的单词
看向量空间上跟它们最近的单词是否相关性比较高
'''
valid_size = 16 # 抽取的验证单词数
valid_window = 100 # 从验证单词中只取频数最高的前100个单词
valid_examples = np.random.choice(valid_window, valid_size, replace = True) # replace=True表示抽样后放回
num_sample = 64 # 训练时用来做负样本的噪声单词数量
# 定义Skip-Gram Word2Vec模型的网络结构
graph = tf.Graph()
with graph.as_default():
train_inputs = tf.placeholder(tf.int32, shape = [batch_size])
train_labels = tf.placeholder(tf.int32, shape = [batch_size, 1])
valid_dataset = tf.constant(valid_examples, dtype = tf.int32)
# 随机生成词向量50000x128
embeddings = tf.Variable(tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))
# 查找输入train_inputs对应的向量embed
embed = tf.nn.embedding_lookup(embeddings, train_inputs)
nce_weights = tf.Variable(tf.truncated_normal([vocabulary_size, embedding_size],
stddev = 1.0 / math.sqrt(embedding_size)))
nce_biases = tf.Variable(tf.zeros([vocabulary_size]))
# 用NCE Loss作为训练的优化目标
loss = tf.reduce_mean(tf.nn.nce_loss(weights = nce_weights,
biases = nce_biases,
labels = train_labels,
inputs = embed,
num_sampled = num_sample,
num_classes = vocabulary_size))
# 优化方法
optimizer = tf.train.AdamOptimizer(0.001).minimize(loss)
# 计算embeddings的模
norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims = True))
# 相当于向量 / 模,做单位化
normalized_embedding = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(normalized_embedding, valid_dataset)
# 计算余弦相似度(因为除了个norm,所以下面其实就是"向量内积/模模")
similarity = tf.matmul(valid_embeddings, normalized_embedding, transpose_b = True)
init = tf.global_variables_initializer()
num_steps = 100001
with tf.Session() as sess:
sess.run(init)
average_loss = 0
for step in range(num_steps):
batch_inputs, batch_labels = generate_batch(batch_size, num_skip, skip_window)
feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}
# 进行训练
_, loss_val = sess.run([optimizer, loss], feed_dict = feed_dict)
average_loss += loss_val
# 每2000轮计算一次损失结果
if step % 2000 == 0:
if step >= 0:
average_loss /= 2000
print("Average loss at step ", step, ": ", average_loss)
average_loss = 0
# 每10000次验证一次验证单词与全部单词的相似度,并将每个验证单词最相似的8个单词展示出来
if step % 10000 == 0:
sim = similarity.eval()
for i in range(valid_size):
valid_word = reverse_dictionary[valid_examples[i]]
top_k = 8
# 找出相似度最高的top_k个单词
nearest = (-sim[i, :]).argsort()[1: top_k+1]
log_str = "Nearest to %s :" % valid_word
for k in range(top_k):
close_word = reverse_dictionary[nearest[k]]
log_str = "%s %s, " % (log_str, close_word)
print(log_str)
final_embeddings = normalized_embedding.eval()
# 定义可视化Word2Vec效果的函数
def plot_with_labels(low_dim_embs, labels, filename='tsne.png'):
assert low_dim_embs.shape[0] >= len(labels), "More labels than embeddings"
plt.figure(figsize=(10, 10)) #in inches
for i, label in enumerate(labels):
x, y = low_dim_embs[i,:]
plt.scatter(x, y) # 绘制散点图
# 对点进行说明
plt.annotate(label, # 注释的文本内容
xy=(x, y), # xy表示注释的坐标点
xytext=(5, 2), # 注释文本相对于点的坐标
textcoords='offset points',
ha='right',
va='bottom')
plt.savefig(filename)
# 使用T-SNE算法将128维的嵌入向量降到2维
tsne = TSNE(perplexity = 30, n_components = 2, init = 'pca', n_iter = 5000, random_state = 1)
# 绘制点的个数
plot_only = 100
low_dim_embs = tsne.fit_transform(final_embeddings[: plot_only, :])
labels = [reverse_dictionary[i] for i in range(plot_only)]
plot_with_labels(low_dim_embs, labels)
[1] 网易云课堂:Andrew Ng, 序列模型
[2] Tensorflo实战.黄文坚,唐源
[3] http://www.jeyzhang.com/tensorflow-learning-notes-3.html
如果觉得我有地方讲的不好的或者有错误的欢迎给我留言,如果对您有帮助,帮我点个赞哦~,感谢大家阅读