利用pyspark.ml训练lightgbm模型的流程
在spark上训练模型的优势:
(1)机器学习算法一般都有很多个步骤迭代计算的过程,机器学习的计算需要在多次迭代后获得足够小的误差或者足够收敛才会停止,迭代时如果使用一般的Hadoop分布式计算框架,每次计算都要读 / 写磁盘以及任务的启动等工作,这回导致非常大的 I/O 和 CPU 消耗。而 Spark 基于内存的计算模型天生就擅长迭代计算,多个步骤计算直接在内存中完成.
(2)从通信的角度讲,如果使用 Hadoop分布式计算框架, 工作和任务之间由于是通过 heartbeat 的方式来进行的通信和传递数据,会导致非常慢的执行速度,正常来说会减缓机器学习的速度.spark通讯效率极高,可以解决这个问题.
目前发现的spark训练模型的劣势:
(1)配置繁琐,使用之前要先配置spark集群,scala编译器,java编译器
(2)要按照spark指定的数据格式进行训练,一般机器学习可以直接读取数据例如csv,指定特征列后进行特征处理训练,spark的训练格式要把数据里面类型,标签提前整理好
sparl.ml的功能示意图
MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤,MLlib在Spark整个生态系统中的位置如图下图所示。这里还展示除了内置的模型之外的lightgbm的模型训练
在测试过程中,原生自带的spark.ml模型包可以正常训练
实现环境
Liunx平台 GLIBC_版本 2.23以上
我使用的是unbantu18.04 内置GLIBC_2.27
安装语言
spark的原生语言是scala,首先需要安装scala的编译器 建议2.11版本
依赖包
pip install pyspark
PySpark 是 Spark 为 Python 开发者提供的 API
安装扩展包
mmlspark
MMLSpark是一个工具生态系统,旨在将Apache Spark的分布式计算框架扩展到 几个新的方向。MMLSpark为Spark生态系统添加了许多深度学习和数据科学工具,Microsoft Cognitive Toolkit(CNTK),LightGBM和 OpenCV的无缝集成。这些工具可为各种数据源提供功能强大且可高度扩展的预测和分析模型。
MMLSpark需要Scala 2.11,Spark 2.3+以及Python 2.7或Python 3.5+
项目地址:https://github.com/Azure/mmlspark
这里有一个pip包需要下载安装
https://mmlspark.azureedge.net/pip/mmlspark-0.15.dev2+1.g11ad24d-py2.py3-none-any.whl
MMLspaek在spark群上的安装
spark-shell --packages Azure:mmlspark:0.15
pyspark --packages Azure:mmlspark:0.15
spark-submit --packages Azure:mmlspark:0.15 MyApp.jar
MMLspark在python上的使用
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "Azure:mmlspark:0.15") \
.getOrCreate()
# 只有先执行上面的语句 才能正确导包
import mmlspark
导入lightgbm模型
from mmlspark import LightGBMRegressor
lgb = LightGBMRegressor(alpha=0.3,
learningRate=0.3,
numIterations=100,
numLeaves=31)
简单的训练案例
导入数据,指定格式
import pyspark.sql.types as typ
# 指定标签类型
labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]
# 指定dataframe的格式
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('births_transformed.csv.gz',
header=True,
schema=schema)
数据处理,one-hot编码,这里是spark训练模型的特别部分,需要将特征整理在一起
import pyspark.ml.feature as ft
# 将列转化为数值类型
births = births \
.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] \
.cast(typ.IntegerType()))
# 使用one-hot编码
encoder = ft.OneHotEncoder(
inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')
# 使用VectorAssembler创建单个列,将所有特性整理在一起。
featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] + \
[encoder.getOutputCol()],
outputCol='features'
)
导入原生的spark.ML的LogisticRegression分类包
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
# 这里要制定标签列
labelCol='INFANT_ALIVE_AT_REPORT')
导入管道
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])
训练模型
births_train, births_test = births \
.randomSplit([0.7, 0.3], seed=666)
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
得到模型评价数据
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderPR'}))
使用lightgbm进行模型训练
from mmlspark import LightGBMRegressor
lgb = LightGBMRegressor(alpha=0.3,
learningRate=0.3,
numIterations=100,
numLeaves=31,
# 注意 在spark里面训练模型要制定目标值的列
labelCol='INFANT_ALIVE_AT_REPORT')
使用管道训练模型
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
# one-hot编码
encoder,
# 特征整理
featuresCreator,
# 模型名称
lgb])
# 模型训练
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
# 查看得分
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderPR'}))
得到结果
python训练模型保存到PMML格式
可以直接调用jar包 把模型保存成pmml格式
cd target
java -jar converter-executable-1.2-SNAPSHOT.jar --lgbm-input
# 制定保存模型的txt文件位置
/Users/shuubiasahi/Documents/python/credit-tfgan/xml/lightgbm.txt --pmml-output
# 制定输出的pmml文件位置
/Users/shuubiasahi/Documents/python/credit-tfgan/xml/lightgbm.pmml