在前两天的博客中,我们用Deeplearning4j做了Mnist数据集的分类。算是第一个深度学习的应用。像Mnist数据集这样图片尺寸不大,而且是黑白的开源图片集在本地完成训练是可以的,毕竟我们用了Lenet这样相对简单的网络结构,而且本地的机器配置也有8G左右的内存。但实际生产中,图片的数量要多得多,尺寸也大得多,用的网络也会是AlexNet、GoogLenet这样更多层数的网络,所以往往我们需要用集群来解决计算资源的问题。由于Deeplearning4j本身基于Spark实现了神经网络的分布式训练,所以我们就以此作为我们的解决方案。
我们还是以Mnist数据集为例来做Deeplearning4j的第一个Spark版本的应用。首先需要在上一篇博客的基础上,在pom里面加入新的依赖:
-
<dependency>
-
<groupId>org.nd4j</groupId>
-
<artifactId>nd4j-kryo_${scala.binary.version}</artifactId>
-
<version>${nd4j.version}</version>
-
</dependency>
这个是为了将Nd4j的序列化形式从Java默认的形式转到kryo的格式,以此提高序列化的效率。如果在代码中不为该类注册kryo的序列化格式,那么训练的时候会抛异常。接着代码分为2个部分,一个部分是将Mnist数据集在本地以JavaRDD<DataSet>的形式存到磁盘并最终推到HDFS上作为Spark job的输入数据源。另一个部分则是模型的训练和保存。第一部分的逻辑大致如下:本地建立Spark任务-->获取所有Mnist图片的路径-->读取图片并提取特征,打上标注,以DataSet的形式作为一张图片的wrapper-->将所有图片构成的JavaRDD<DataSet>存储下来。
这里原始的Mnist数据集是以图片形式存在,不再是二进制格式的数据。这个例子这样处理,也是方便日后用同样的方式读取一般的图片。Mnist的图片如下:
-
SparkConf conf = new SparkConf()
-
.setMaster("local[*]") //local mode
-
.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator")
-
.setAppName("Mnist Java Spark (Java)");
-
JavaSparkContext jsc = new JavaSparkContext(conf);
-
-
final List<String> lstLabelNames = Arrays.asList("零","一","二","三","四","五","六","七","八","九"); //Chinese Label
-
final ImageLoader imageLoader = new ImageLoader(28, 28, 1); //Load Image
-
final DataNormalization scaler = new ImagePreProcessingScaler(0, 1); //Normalize
-
-
String srcPath = args[0];
-
FileSystem hdfs = FileSystem.get(URI.create(srcPath),jsc.hadoopConfiguration()); //hdfs read local file system
-
FileStatus[] fileList = hdfs.listStatus(new Path(srcPath));
-
List<String> lstFilePath = new ArrayList<>();
-
for( FileStatus fileStatus : fileList){
-
lstFilePath.add(srcPath + "/" + fileStatus.getPath().getName());
-
}
-
JavaRDD<String> javaRDDImagePath = jsc.parallelize(lstFilePath);
-
JavaRDD<DataSet> javaRDDImageTrain = javaRDDImagePath.map(new Function<String, DataSet>() {
-
-
@Override
-
public DataSet call(String imagePath) throws Exception {
-
FileSystem fs = FileSystem.get(new Configuration());
-
DataInputStream in = fs.open(new Path(imagePath));
-
INDArray features = imageLoader.asRowVector(in); //features tensor
-
String[] tokens = imagePath.split("\\/");
-
String label = tokens[tokens.length-1].split("\\.")[0];
-
int intLabel = Integer.parseInt(label);
-
INDArray labels = Nd4j.zeros(10); //labels tensor
-
labels.putScalar(0, intLabel, 1.0);
-
DataSet trainData = new DataSet(features, labels); //DataSet, wrapper of features and labels
-
trainData.setLabelNames(lstLabelNames);
-
scaler.preProcess(trainData); //normalize
-
fs.close();
-
return trainData;
-
}
-
});
-
javaRDDImageTrain.saveAsObjectFile("mnistNorm.dat"); //save training data
这里有几点需要解释。1.用hdfs.filesystem来获取文件。用Java原生态的File来操作也是完全可以的。只不过,这样读取文件的方式,同时适用于读取本地和HDFS上的文件。2.ImageLoader类。这个类是用来读取图片文件的。类似的还有一个类,叫NativeImageLoader。不同的在于,NativeImageLoader是调用了OpenCV的相关方法来对图片做处理的,效率更高,因此推荐使用NativeImageLoader
保存的RDD的形式如下图:
然后,讲下模型训练任务的逻辑。读取HDFS上的以DataSet形式存储的Mnist文件-->定义参数中心服务-->定义神经网络结构(Lenet)--> 训练网络-->保存训练好的模型。首先看前两步的操作:
-
SparkConf conf = new SparkConf()
-
.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator") //register kryo for nd4j
-
.setAppName("Mnist Java Spark (Java)");
-
final String imageFilePath = args[0];
-
final int numEpochs = Integer.parseInt(args[1]);
-
final String modelPath = args[2];
-
final int numBatch = Integer.parseInt(args[3]);
-
//
-
JavaSparkContext jsc = new JavaSparkContext(conf);
-
//
-
JavaRDD<DataSet> javaRDDImageTrain = jsc.objectFile(imageFilePath); //load image data from hdfs
-
ParameterAveragingTrainingMaster trainMaster = new ParameterAveragingTrainingMaster.Builder(numBatch) //weight average service
-
.workerPrefetchNumBatches(0)
-
.saveUpdater(true)
-
.averagingFrequency(5)
-
.batchSizePerWorker(numBatch)
这里我们获取传入的一些参数,如文件的hdfs路径,最后保存model的路径,mini-batch的大小(一般32,62,128这样的值为好,可以自行尝试),总的训练的轮次epoch。这里需要解释的是ParameterAveragingTrainingMaster这个类。这个类的作用是用于将spark worker节点上各自计算的权重收回到driver节点上进行加权平均,并将最新的权重广播到worker节点上。也即为:将各个工作节点的参数的均值作为全局参数值。这种分布式机器学习中,数据并行化的一种操作。
下面是定义神经网络结构和训练网络:
-
int nChannels = 1;
-
int outputNum = 10;
-
int iterations = 1;
-
int seed = 123;
-
MultiLayerConfiguration.Builder builder = new NeuralNetConfiguration.Builder() //define lenent
-
.seed(seed)
-
.iterations(iterations)
-
.regularization(true).l2(0.0005)
-
.learningRate(0.1)
-
.learningRateScoreBasedDecayRate(0.5)
-
.optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
-
.updater(Updater.ADAM)
-
.list()
-
.layer(0, new ConvolutionLayer.Builder(5, 5)
-
.nIn(nChannels)
-
.stride(1, 1)
-
.nOut(20)
-
.weightInit(WeightInit.XAVIER)
-
.activation("relu")
-
.build())
-
.layer(1, new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
-
.kernelSize(2, 2)
-
.build())
-
.layer(2, new ConvolutionLayer.Builder(5, 5)
-
.nIn(20)
-
.nOut(50)
-
.stride(2,2)
-
.weightInit(WeightInit.XAVIER)
-
.activation("relu")
-
.build())
-
.layer(3, new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
-
.kernelSize(2, 2)
-
.build())
-
.layer(4, new DenseLayer.Builder().activation("relu")
-
.weightInit(WeightInit.XAVIER)
-
.nOut(500).build())
-
.layer(5, new OutputLayer.Builder(LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
-
.nOut(outputNum)
-
.weightInit(WeightInit.XAVIER)
-
.activation("softmax")
-
.build())
-
.backprop(true).pretrain(false);
-
new ConvolutionLayerSetup(builder,28,28,1);
-
-
MultiLayerConfiguration netconf = builder.build();
-
MultiLayerNetwork net = new MultiLayerNetwork(netconf);
-
net.setListeners(new ScoreIterationListener(1));
-
net.init();
-
SparkDl4jMultiLayer sparkNetwork = new SparkDl4jMultiLayer(jsc, net, trainMaster);
-
//train the network on Spark
-
for( int i = 0; i < numEpochs; ++i ){
-
sparkNetwork.fit(javaRDDImageTrain);
-
System.out.println("----- Epoch " + i + " complete -----");
-
Evaluation evalActual = sparkNetwork.evaluate(javaRDDImageTrain);
-
System.out.println(evalActual.stats());
-
}
这部分没有什么特别的地方,和单机的形式差不太多。值得说明的就是,我们在每一轮次的训练后,直接预测全部的训练数据来做评估,并没有做交叉验证。当然,做交叉验证也是完全可以的。最后一部分是保存模型到hdfs上:
-
//save model
-
FileSystem hdfs = FileSystem.get(jsc.hadoopConfiguration());
-
Path hdfsPath = new Path(modelPath);
-
FSDataOutputStream outputStream = hdfs.create(hdfsPath);
-
MultiLayerNetwork trainedNet = sparkNetwork.getNetwork();
-
ModelSerializer.writeModel(trainedNet, outputStream, true);
到此coding的部分就结束了,我们构建了在Spark进行分布式深度神经网络的训练并保存了模型。Spark的提交命令如下:
spark-submit --master yarn-cluster --executor-memory 5g --num-executors 16 --driver-memory 8g --conf "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=2921225472" --conf spark.yarn.executor.memoryOverhead=5000
需要说明的是--conf后面的内容,因为Nd4j在计算的时候,实际需要两部分的内存:on-heap memory和off-heap memory。前者就是jvm为开辟对象所需内存,后者是C++的内存。Nd4j为了效率,在底层是通过JavaCPP调用C++进行计算的。如果不显示地申请C++的内存,那默认会从on-heap中分出10%给off-heap,但这样可能会不够。所以我们显示地申请off-heap内存。
下面这张图是正常的Spark UI显示的Deeplearning4j的训练过程:
然后,我们看下训练的结果:
在150轮的训练过后,模型的准确率达到了95.48%。误判的情况也列在上面了。到此,在Spark上进行Mnist数据集的训练和评估就完成了。总结一下就是,先将数据以RDD的形式保存到HDFS上,然后建模读取RDD并训练模型。其实,将图片存在HDFS上也是一种方案,但是HDFS的一个block可能需要占用32M,64M这样的空间。因此图片这样的小文件,是很占用集群的存储空间的。并且,当图片数量很多的时候,我们会为了读取图片频繁地和HDFS建立和释放网络链接,这样同样消耗HDFS的资源。因此我们选择先在本地存储RDD的形式来处理。其实分布式的机器学习有很多策略,比如数据的并行化和模型的并行化,这里只是一笔掠过,待自己研究清楚了再写点东西。最后就是模型的调参。这里面我们也没有提,其实是极其重要的。因为目前,还没有非常权威的,或者定义的调参方案,因为训练过程每个人是不同的,所以只能结合自己的训练情况来调。一般当loss不下降的时候,调小学习率,batch-size也试着调小来看看效果,分布式的学习率较单机的要大些,这些原则去调。