利用pyspark.ml训练lightgbm模型的流程

在spark上训练模型的优势:

(1)机器学习算法一般都有很多个步骤迭代计算的过程,机器学习的计算需要在多次迭代后获得足够小的误差或者足够收敛才会停止,迭代时如果使用一般的Hadoop分布式计算框架,每次计算都要读 / 写磁盘以及任务的启动等工作,这回导致非常大的 I/O 和 CPU 消耗。而 Spark 基于内存的计算模型天生就擅长迭代计算,多个步骤计算直接在内存中完成.

(2)从通信的角度讲,如果使用 Hadoop分布式计算框架, 工作和任务之间由于是通过 heartbeat 的方式来进行的通信和传递数据,会导致非常慢的执行速度,正常来说会减缓机器学习的速度.spark通讯效率极高,可以解决这个问题.

 

目前发现的spark训练模型的劣势:

(1)配置繁琐,使用之前要先配置spark集群,scala编译器,java编译器

(2)要按照spark指定的数据格式进行训练,一般机器学习可以直接读取数据例如csv,指定特征列后进行特征处理训练,spark的训练格式要把数据里面类型,标签提前整理好

sparl.ml的功能示意图

MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤,MLlib在Spark整个生态系统中的位置如图下图所示。这里还展示除了内置的模型之外的lightgbm的模型训练

利用pyspark.ml训练lightgbm模型的流程

在测试过程中,原生自带的spark.ml模型包可以正常训练

 

 

实现环境

Liunx平台 GLIBC_版本 2.23以上

我使用的是unbantu18.04 内置GLIBC_2.27

 

 

安装语言

spark的原生语言是scala,首先需要安装scala的编译器 建议2.11版本

依赖包

pip install pyspark

PySpark 是 Spark 为 Python 开发者提供的 API

 

安装扩展包

mmlspark

MMLSpark是一个工具生态系统,旨在将Apache Spark的分布式计算框架扩展到 几个新的方向。MMLSpark为Spark生态系统添加了许多深度学习和数据科学工具,Microsoft Cognitive Toolkit(CNTK),LightGBMOpenCV的无缝集成。这些工具可为各种数据源提供功能强大且可高度扩展的预测和分析模型。

MMLSpark需要Scala 2.11,Spark 2.3+以及Python 2.7或Python 3.5+

 

项目地址:https://github.com/Azure/mmlspark

 

这里有一个pip包需要下载安装

https://mmlspark.azureedge.net/pip/mmlspark-0.15.dev2+1.g11ad24d-py2.py3-none-any.whl

 

MMLspaek在spark群上的安装

spark-shell --packages Azure:mmlspark:0.15
pyspark --packages Azure:mmlspark:0.15
spark-submit --packages Azure:mmlspark:0.15 MyApp.jar

 

MMLspark在python上的使用

import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "Azure:mmlspark:0.15") \
            .getOrCreate()
            
# 只有先执行上面的语句 才能正确导包
import mmlspark

导入lightgbm模型

from mmlspark import LightGBMRegressor
lgb = LightGBMRegressor(alpha=0.3,
                  learningRate=0.3,
                  numIterations=100,
                  numLeaves=31)

简单的训练案例

导入数据,指定格式

import pyspark.sql.types as typ
# 指定标签类型
labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]
# 指定dataframe的格式
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('births_transformed.csv.gz',
header=True,
schema=schema)
​

数据处理,one-hot编码,这里是spark训练模型的特别部分,需要将特征整理在一起

import pyspark.ml.feature as ft
# 将列转化为数值类型
births = births \
.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] \
.cast(typ.IntegerType()))
# 使用one-hot编码
encoder = ft.OneHotEncoder(
inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')
# 使用VectorAssembler创建单个列,将所有特性整理在一起。
featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] + \
[encoder.getOutputCol()],
outputCol='features'
)

导入原生的spark.ML的LogisticRegression分类包

import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
 # 这里要制定标签列
labelCol='INFANT_ALIVE_AT_REPORT')

导入管道

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])

训练模型

births_train, births_test = births \
.randomSplit([0.7, 0.3], seed=666)
​
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

到模型评价数据

import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderPR'}))

利用pyspark.ml训练lightgbm模型的流程

使用lightgbm进行模型训练

from mmlspark import LightGBMRegressor
lgb = LightGBMRegressor(alpha=0.3,
                learningRate=0.3,
                numIterations=100,
                numLeaves=31,
                # 注意 在spark里面训练模型要制定目标值的列
                labelCol='INFANT_ALIVE_AT_REPORT')

使用管道训练模型

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
                # one-hot编码
                encoder,
                # 特征整理
                featuresCreator,
                # 模型名称
                    lgb])
​
# 模型训练
model = pipeline.fit(births_train)
test_model = model.transform(births_test)



# 查看得分

import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderPR'}))

得到结果

利用pyspark.ml训练lightgbm模型的流程

 

 

python训练模型保存到PMML格式

可以直接调用jar包 把模型保存成pmml格式

cd  target
java -jar converter-executable-1.2-SNAPSHOT.jar  --lgbm-input 
# 制定保存模型的txt文件位置
/Users/shuubiasahi/Documents/python/credit-tfgan/xml/lightgbm.txt --pmml-output 
# 制定输出的pmml文件位置
/Users/shuubiasahi/Documents/python/credit-tfgan/xml/lightgbm.pmml