import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import breeze.linalg._
object LR4 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val sparkSession = SparkSession.builder().master("local[4]").appName("LR").getOrCreate()
val data = sparkSession.sparkContext.textFile("G:\\mldata\\airfoil_self_noise.txt").map(line => line.split('|'))
val splitData = data.randomSplit(Array(0.7,0.3))
val trainData = splitData(0) .map(arr => Row(arr(5).toDouble,arr(0).toDouble,arr(1).toDouble,arr(2).toDouble,arr(3).toDouble,arr(4).toDouble))
val testDataLabel = splitData(1).map(arr => arr(5).toDouble).collect()
val testData = splitData(1).map(arr => arr.dropRight(1).map(_.toDouble)).collect()
val schema = StructType(List(StructField("label",DoubleType,true),StructField("Frequency",DoubleType,true)
,StructField("Angle",DoubleType,true),StructField("Chord",DoubleType,true)
,StructField("velocity",DoubleType,true),StructField("thickness",DoubleType,true)))
val featuresArr = Array("Frequency","Angle","Chord","velocity","thickness")
val traindf = sparkSession.createDataFrame(trainData,schema)
val vectorAssemb = new VectorAssembler().setInputCols(featuresArr).setOutputCol("features")
val trainDF = vectorAssemb.transform(traindf).select("label","features")
val theaMatrix:DenseMatrix[Double] = LR(trainDF)
var testMatrix = DenseMatrix.zeros[Double](testData.length,featuresArr.length+1)
testMatrix(::,0) := 1.0
for(i<-0 until testData.length){
for(j<-0 until featuresArr.length){
testMatrix(i,j+1) = testData(i)(j)
}
}
val resultMatrix = testMatrix * theaMatrix
val k = (resultMatrix.toArray zip testDataLabel).map{case(predict,value)=>math.pow((predict-value),2)}
var sum = 0.0
for(i<-k) sum += i
val rmse = math.sqrt(sum / testData.length)
println(“rmse = “ + rmse)
}
def LR(df:DataFrame)={
val label = df.rdd.map(row => row.getDouble(row.fieldIndex("label"))).collect()
val trainData = df.select("features").rdd.map(row => row.toString())
.map(str => str.replace('[',' '))
.map(str => str.replace(']',' '))
.map(str => str.trim()).map(str => str.split(','))
.map(arr => arr.map(str => str.toDouble))
.collect()
val dimensions = trainData(0).length
val matrixRows = trainData.length
val matrixColumns = dimensions + 1
var matrix = DenseMatrix.zeros[Double](matrixRows,matrixColumns)
matrix(::,0) := 1.0
for(i<-0 until matrixRows){
for(j<-0 until dimensions){
matrix(i,j+1) = trainData(i)(j)
}
}
var labelMatrix = DenseMatrix.zeros[Double](matrixRows,1)
for(i<-0 until matrixRows) labelMatrix(i,0) = label(i)
val thea = (inv(((matrix.t) * matrix))) * (matrix.t) * labelMatrix
thea
}
}
-------------------------result--------------------------------------------------------------------------------------------------------------------
rmse = 5.003040917178264
