MNIST手写识别系列——kNN算法(多进程和单进程)
核心思想
KNN算法的核心思想其实很简单,就是计算两张图片之间的距离。
当我们有一个已经打上标签的数据集,然后我们输入一张图片,此时图片会和数据集中所有图片进行求“距离”运算,选出其中最近的k张图,然后根据这k张图中的标签,选择标签出现次数最多的作为我们的输出。
距离公式有以下两种:
曼哈顿距离:
高斯距离:
对于超参K的选择:
将测试数据分的等分成多个折,每个折使用不同的K值,最后看哪个折的分类效果最好,K就取这个值。
如果数据太少不满足,还可以交叉验证。首先使用1-4折作为训练,5作为验证。然后变换训练和验证的组合,不断测试。
分类MNIST代码:
由于kNN需要计算输入和所有数据集的距离,并且每次计算都是完全独立的,所以我尝试使用多进程去求解。
这里给出两个版本的代码,一份是单进程的,一份是多进程的。
单进程
from __future__ import print_function
import keras
from keras.datasets import mnist
import numpy as np
import operator
import time
def loadData():
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255 # 全部化为0,1.不用颜色深浅
x_test /= 255
x_train = np.ceil(x_train) # 向上取整
x_test = np.ceil(x_test)
x_train = x_train.astype('int32')
x_test = x_test.astype('int32')
# 将 60000*28*28 的三维数组化为 60000*784
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1]*x_train.shape[2])
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1]*x_test.shape[2])
# print(x_train.shape)
# print(x_test.shape)
y_train = y_train.reshape(1, y_train.shape[0])
y_test = y_test.reshape(1, y_test.shape[0])
y_train = y_train.astype('int32')
y_test = y_test.astype('int32')
print("load data finish")
#print("x_train.shape: ", x_train.shape)
#print("y_train.shape: ", y_train.shape)
return (x_train, y_train), (x_test, y_test)
def classify(inX, dataSet, labels, k=5):
dataSetSize = dataSet.shape[0]
diffMat = np.tile(inX, (dataSetSize,1))- dataSet
sqDiffMat = diffMat**2
sqDistances = sqDiffMat.sum(axis=1)
distances = sqDistances**0.5
sortedDistIndicies = distances.argsort()
classCount = {}
for i in range(k):
voteIlabel = labels[0][sortedDistIndicies[i]]
classCount[voteIlabel] = classCount.get(voteIlabel, 0)+1
sortedClassCount = sorted(classCount.items(), key=operator.itemgetter(1), reverse=True)
return sortedClassCount[0][0]
def run(x_train, y_train, x_test, y_test):
right = 0;
for i in range(x_test.shape[0]):
if(classify(x_test[i], x_train, y_train, 5)==y_test[0][i]):
print('***', i, ' is right!***\n')
right = right + 1
else:
print(i, ' is wrong!\n')
print(right)
return right
if __name__ == '__main__':
(x_train, y_train), (x_test, y_test) = loadData()
right = run(x_train, y_train, x_test, y_test)
print(right/x_test.shape[0])
多进程
from __future__ import print_function
import keras
from keras.datasets import mnist
import numpy as np
import operator
import time
from multiprocessing.dummy import Pool
from multiprocessing import Lock
def loadData():
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255 # 全部化为0,1.不用颜色深浅
x_test /= 255
x_train = np.ceil(x_train) # 向上取整
x_test = np.ceil(x_test)
x_train = x_train.astype('int32')
x_test = x_test.astype('int32')
# 将 60000*28*28 的三维数组化为 60000*784
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1]*x_train.shape[2])
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1]*x_test.shape[2])
# print(x_train.shape)
# print(x_test.shape)
y_train = y_train.reshape(1, y_train.shape[0])
y_test = y_test.reshape(1, y_test.shape[0])
y_train = y_train.astype('int32')
y_test = y_test.astype('int32')
print("load data finish")
#print("x_train.shape: ", x_train.shape)
#print("y_train.shape: ", y_train.shape)
return (x_train, y_train), (x_test, y_test)
# 全局变量
right = 0
lock = Lock()
(x_train, y_train), (x_test, y_test) = loadData()
def classify(test, dataSet=x_train, labels=y_train, result=y_test, k=5):
inX = test[0]
index = test[1]
dataSetSize = dataSet.shape[0]
diffMat = np.tile(inX, (dataSetSize,1))- dataSet
sqDiffMat = diffMat**2
sqDistances = sqDiffMat.sum(axis=1)
distances = sqDistances**0.5
sortedDistIndicies = distances.argsort()
classCount = {}
for i in range(k):
voteIlabel = labels[0][sortedDistIndicies[i]]
classCount[voteIlabel] = classCount.get(voteIlabel, 0)+1
sortedClassCount = sorted(classCount.items(), key=operator.itemgetter(1), reverse=True)
if(sortedClassCount[0][0]==result[0][index]):
print('***', index, ' is right!***\n')
global right,lock
with lock:
right = right + 1
else:
print(index, ' is wrong!\n')
if __name__ == '__main__':
task = zip(x_test, range(0, x_test.shape[0]))
pool = Pool(3)
pool.map(classify, list(task))
pool.close()
pool.join()
#print(right/x_test.shape[0])
其实多进程的结果好像不够好,这里只是给大家参考。本来还尝试GPU编程,后来发现自己电脑是AMD的,不支持。还希望大家能指出不足之处。
代码可以查看github
KNN缺点:
但其实KNN从未被正式运用,因为效率很低。上图第一张图是原始图片,第二张图相比第一张图左移了一点,就会计算出不一样的距离。第三张图做了一点遮挡,第四张图变暗,都会使得距离很不一样。
其实之所以用在MNIST上并且效果还不错我觉得有以下几个原因:
- 数据集简单,只有10个分类
- 数据集都是灰度图片,并且结构简单。
- 恰好同一类都在结构上非常相似。(相同数字写出来大抵相同嘛)
换在其他数据集上怕是很难有好的效果。