MNIST手写识别系列——kNN算法(多进程和单进程)

核心思想

KNN算法的核心思想其实很简单,就是计算两张图片之间的距离。

当我们有一个已经打上标签的数据集,然后我们输入一张图片,此时图片会和数据集中所有图片进行求“距离”运算,选出其中最近的k张图,然后根据这k张图中的标签,选择标签出现次数最多的作为我们的输出。

距离公式有以下两种:

曼哈顿距离:
MNIST手写识别系列——kNN算法(多进程和单进程)
高斯距离:
MNIST手写识别系列——kNN算法(多进程和单进程)

对于超参K的选择:

MNIST手写识别系列——kNN算法(多进程和单进程)
将测试数据分的等分成多个折,每个折使用不同的K值,最后看哪个折的分类效果最好,K就取这个值。

如果数据太少不满足,还可以交叉验证。首先使用1-4折作为训练,5作为验证。然后变换训练和验证的组合,不断测试。

分类MNIST代码:

由于kNN需要计算输入和所有数据集的距离,并且每次计算都是完全独立的,所以我尝试使用多进程去求解。

这里给出两个版本的代码,一份是单进程的,一份是多进程的。

单进程

from __future__ import print_function
import keras
from keras.datasets import mnist
import numpy as np
import operator
import time

def loadData():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255 # 全部化为0,1.不用颜色深浅
    x_test /= 255
    x_train = np.ceil(x_train) # 向上取整
    x_test = np.ceil(x_test)
    x_train = x_train.astype('int32')
    x_test = x_test.astype('int32')
    
    # 将 60000*28*28 的三维数组化为 60000*784
    x_train = x_train.reshape(x_train.shape[0], x_train.shape[1]*x_train.shape[2])
    x_test = x_test.reshape(x_test.shape[0], x_test.shape[1]*x_test.shape[2])
#     print(x_train.shape)
#     print(x_test.shape)
    y_train = y_train.reshape(1, y_train.shape[0])
    y_test = y_test.reshape(1, y_test.shape[0])
    y_train = y_train.astype('int32')
    y_test = y_test.astype('int32')
    print("load data finish")
    #print("x_train.shape: ", x_train.shape)
    #print("y_train.shape: ", y_train.shape)
    return (x_train, y_train), (x_test, y_test)


def classify(inX, dataSet, labels, k=5):
    dataSetSize = dataSet.shape[0]
    diffMat = np.tile(inX, (dataSetSize,1))- dataSet
    sqDiffMat = diffMat**2
    sqDistances = sqDiffMat.sum(axis=1)
    distances = sqDistances**0.5
    sortedDistIndicies = distances.argsort()
    classCount = {}
    for i in range(k):
        voteIlabel = labels[0][sortedDistIndicies[i]]
        classCount[voteIlabel] = classCount.get(voteIlabel, 0)+1
    sortedClassCount = sorted(classCount.items(), key=operator.itemgetter(1), reverse=True)
    return sortedClassCount[0][0]


def run(x_train, y_train, x_test, y_test):
    right = 0;
    for i in range(x_test.shape[0]):
        if(classify(x_test[i], x_train, y_train, 5)==y_test[0][i]):
            print('***', i, ' is right!***\n')
            right = right + 1
        else:
            print(i, ' is wrong!\n')
    print(right)
    return right

if __name__ == '__main__':
    (x_train, y_train), (x_test, y_test) = loadData()
    right = run(x_train, y_train, x_test, y_test)
    print(right/x_test.shape[0])

多进程

from __future__ import print_function
import keras
from keras.datasets import mnist
import numpy as np
import operator
import time
from multiprocessing.dummy import Pool
from multiprocessing import Lock

def loadData():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255 # 全部化为0,1.不用颜色深浅
    x_test /= 255
    x_train = np.ceil(x_train) # 向上取整
    x_test = np.ceil(x_test)
    x_train = x_train.astype('int32')
    x_test = x_test.astype('int32')
    
    # 将 60000*28*28 的三维数组化为 60000*784
    x_train = x_train.reshape(x_train.shape[0], x_train.shape[1]*x_train.shape[2])
    x_test = x_test.reshape(x_test.shape[0], x_test.shape[1]*x_test.shape[2])
#     print(x_train.shape)
#     print(x_test.shape)
    y_train = y_train.reshape(1, y_train.shape[0])
    y_test = y_test.reshape(1, y_test.shape[0])
    y_train = y_train.astype('int32')
    y_test = y_test.astype('int32')
    print("load data finish")
    #print("x_train.shape: ", x_train.shape)
    #print("y_train.shape: ", y_train.shape)
    return (x_train, y_train), (x_test, y_test)

# 全局变量
right = 0
lock = Lock()
(x_train, y_train), (x_test, y_test) = loadData()

def classify(test, dataSet=x_train, labels=y_train, result=y_test, k=5):
    inX = test[0]
    index = test[1]
    dataSetSize = dataSet.shape[0]
    diffMat = np.tile(inX, (dataSetSize,1))- dataSet
    sqDiffMat = diffMat**2
    sqDistances = sqDiffMat.sum(axis=1)
    distances = sqDistances**0.5
    sortedDistIndicies = distances.argsort()
    classCount = {}
    for i in range(k):
        voteIlabel = labels[0][sortedDistIndicies[i]]
        classCount[voteIlabel] = classCount.get(voteIlabel, 0)+1
    sortedClassCount = sorted(classCount.items(), key=operator.itemgetter(1), reverse=True)
    if(sortedClassCount[0][0]==result[0][index]):
        print('***', index, ' is right!***\n')
        global right,lock
        with lock:
            right = right + 1
    else:
        print(index, ' is wrong!\n')  

if __name__ == '__main__':
    task = zip(x_test, range(0, x_test.shape[0]))
    pool = Pool(3)
    pool.map(classify, list(task))
    pool.close()
    pool.join()
    #print(right/x_test.shape[0])

其实多进程的结果好像不够好,这里只是给大家参考。本来还尝试GPU编程,后来发现自己电脑是AMD的,不支持。还希望大家能指出不足之处。

代码可以查看github

KNN缺点:

MNIST手写识别系列——kNN算法(多进程和单进程)
但其实KNN从未被正式运用,因为效率很低。上图第一张图是原始图片,第二张图相比第一张图左移了一点,就会计算出不一样的距离。第三张图做了一点遮挡,第四张图变暗,都会使得距离很不一样。

其实之所以用在MNIST上并且效果还不错我觉得有以下几个原因:

  1. 数据集简单,只有10个分类
  2. 数据集都是灰度图片,并且结构简单。
  3. 恰好同一类都在结构上非常相似。(相同数字写出来大抵相同嘛)

换在其他数据集上怕是很难有好的效果。