Spark ML
使用LogisticRegression处理多分类问题
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.feature.{StringIndexer,VectorIndexer,IndexToString} import org.apache.spark.ml.classification.{LogisticRegression,LogisticRegressionModel} import org.apache.spark.ml.{Pipeline,PipelineModel} import org.apache.spark.ml.tuning.{ParamGridBuilder,CrossValidator} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
object classificationModel { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[1]").appName("spark").getOrCreate() val sc = spark.sparkContext //以RDD方式载入数据并创建数据框 val rowRDD = sc.textFile("hdfs://localhost:9000/dataset/iris.txt") .map(s => s.split(",")) .map(s => Row(s(0).toDouble,s(1).toDouble,s(2).toDouble,s(3).toDouble,s(4).toDouble)) val schema = StructType(List( StructField("v1",DoubleType,nullable = true),StructField("v2",DoubleType,nullable = true), StructField("v3",DoubleType,nullable = true),StructField("v4",DoubleType,nullable = true), StructField("labels",DoubleType,nullable = true) )) val df = spark.createDataFrame(rowRDD,schema) //将特征规约为特征集合 val vectorAssembler = new VectorAssembler().setInputCols(Array("v1","v2","v3","v4")) .setOutputCol("features") val data = vectorAssembler.transform(df).select("features","labels") //划分数据集 val Array(train,test) = data.randomSplit(Array(0.8,0.2),seed = 1000) train.cache() test.cache() println("Train size = " + train.count() + " Test size = " + test.count()) //StringIndexer:将字符串标签转为索引标签(基于频数进行编码) val stringIndexer = new StringIndexer().setInputCol("labels").setOutputCol("indexedLabels").fit(data) //VectorIndexer:区别特征类型(连续/离散)并作相应处理 val vectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures") .setMaxCategories(10).fit(data) val logisticRegression = new LogisticRegression().setFeaturesCol("indexedFeatures").setLabelCol("indexedLabels").setMaxIter(20) //将索引标签转为字符串标签 val indexToString = new IndexToString().setLabels(stringIndexer.labels).setInputCol("prediction").setOutputCol("forecastLabels") //建立流水线 val pipeline = new Pipeline().setStages(Array(stringIndexer,vectorIndexer,logisticRegression,indexToString)) //设置参数网格搜索 val paramGrid = new ParamGridBuilder().addGrid(logisticRegression.elasticNetParam,Array(0.2,0.8)) .addGrid(logisticRegression.regParam,Array(0.1,0.5,0.8)) .build() //交叉验证训练集 val crossValidator = new CrossValidator().setEstimator(pipeline).setEstimatorParamMaps(paramGrid) .setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabels").setPredictionCol("prediction")) .setNumFolds(3) val cvModel = crossValidator.fit(train) /* 预测输出项目说明 -features: 处理前的特征集合 indexedFeatures:处理(vectorIndexer)后的特征集合 -labels:处理前的字符串标签 indexedLabels:处理(stringIndexer)后的索引标签 -probability:标签预测概率 rawPrediction:softmax预测值 -prediction:预测值(索引标签) forecastLabels:预测值(字符串标签) */ val cvTrainResults = cvModel.transform(train) val cvTestResults = cvModel.transform(test) cvTestResults.show() val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabels").setPredictionCol("prediction") val train_acc = evaluator.evaluate(cvTrainResults) val test_acc = evaluator.evaluate(cvTestResults) println("The accuracy of train set: "+train_acc+" The accuracy of test set: "+test_acc) //查看最佳参数(elasticNetParam = 0.2, regParam = 0.1) val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel] val DtModel = bestModel.stages(2).asInstanceOf[LogisticRegressionModel] println("Model params: "+ DtModel.explainParams())
} }