自研半监督学习算法在短信分类中的实践
1. 背景
本文仅探讨自然语言分类算法。
1)有监督机器学习
所谓有监督,是指给定一组人工标注好的样本(打好分类标签的文本),通过机器学习算法训练模型(比如朴素贝叶斯分类器),然后用训练好的模型对未知的文本进行分类。
有监督机器学习的准确率受人工标注样本的数量和质量影响很大。在待分类实体总量较少(比如百万以下),且边界较为清晰(不同分类的实体间相似特征较少)时,可以达到很好的效果(90%+准确率)。
缺点也很明显:需要大量的高质量人工标注样本。在处理海量(比如亿级)且边界模糊的数据时基本无法满足需求。
2)无监督机器学习
所谓无监督,是指给定一组随机的样本(没有分类标签的文本),通过机器学习算法训练模型(比如KMeans算法),然后用训练好的模型对未知的文本进行聚类。
由于缺乏人工标注样本,机器无法准确判断文本的具体分类,但是通过文本相似度分析,可以将随机样本大致的聚类为K个组,同一组内文本相似度较高,不同组间文本相似度较低。
其最大的优势是不依赖于人工干预,理论上可以处理无限大的样本集合。
3)半监督机器学习
所谓半监督,是将有监督和无监督两种算法结合,在有限的人工标注样本的帮助下,结合无监督机器学习所获得的海量样本聚类信息,将人工标注的样本标签通过迭代方式“感染”处于同一聚类组中的无标注样本,从而获得一批数量巨大且标注好的样本。最后,用巨量的带标注样本训练有监督机器学习模型,得到最终的分类器。
虽然比起纯粹的有监督机器学习,半监督方式准确率要低一些(无监督带来的噪音),但在处理海量数据时,这种“老师”(人工标注样本)带“学生”(无监督聚类样本)的思想值得尝试。
4)短信分类
短信是一种典型的短文本、海量的自然语言信息。在动辄上亿的短信文本中,存在模板化的企业营销信息,也存在大量随意编辑的书面语、口语、符号信息。
传统的关键词匹配方式受限于建设关键词库的高成本,捉襟见肘。
这是否正是机器学习、人工智能大展身手之地?
2. 算法逻辑
3. 关键技术点
3.1 随机采样
从HIVE仓库上亿短文本中随机采样。利用HIVE SQL的随机算子rand()来实现。代码示例:
... con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd); // hive-jdbc连接 outputFileWriter = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(new File(fileOutputPath))), 2048 * 1024); // 样本输出到文件 // 执行查询 Statement stmt = con.createStatement(); String sql = "SELECT content FROM " + " (SELECT content, rand() rate FROM my_big_table ) as temp1 " + " WHERE temp1.rate <= " + sampleRate; // sampleRate 为采样概率,比如从1亿样本中采样100万,此处等于0.01 ResultSet res = stmt.executeQuery(sql); long id = 1; while (res.next()) { String msgText = res.getString(1); // jdbc结果集列下标从1开始 if (null == msgText || StringUtils.isBlank(msgText)) { System.out.println("unexpected msgText in row: " + res.getRow()); continue; } id++; output.append(msgText).append("\n"); if (id % 10000 == 0) { // 批量输出 System.out.println("finished lines : " + id); outputFileWriter.write(output.toString()); output = new StringBuilder(); } } outputFileWriter.write(output.toString()); ... |
3.2 文本向量化
主要分以下几步。
1)分词
本文采用较为流行的汉语处理包HanLP实现分词。
先引入Maven依赖
<dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.3</version> </dependency> |
随时随地的分词
val segment = HanLP.newSegment() // 分词工具 val terms = segment.seg(originalText) // 对原始文本执行分词 for( k <- 0 until terms.size() ){ val term = terms.get(k) // 每个词语对应一个Term对象 val termStr = term.word; // 词语 val firstChar = term.nature.firstChar(); // 词性 ... } |
2)去除停用词、无意义词
所谓停用词,是指"帮忙","麻烦","谢谢"之类常见的,又无法表达句子实际语义的词,容易干扰分类结果,将这些词从分词结果中剔除即可。
另外,语气词、助词等无意义的词语,可通过 HanLP 分词后的词性剔除。例如:
... var termStr = ""; val firstChar = term.nature.firstChar(); // 获取 HanLP 分词后的词性 if (firstChar == 'n' // 名词系列 || firstChar == 'v' // 动词系列 || firstChar == 'a' // 形容词系列 || firstChar == 'r' // 代词系列 ) { termStr = term.word; // 有意义的词 } else if (firstChar == 'm') { // 数词 termStr = "NUM" + term.word.length(); // 数词加工。TODO : 加入手机号识别等算法,提高准确率 } if (!StringUtils.isBlank(termStr)) { // 无意义词直接过滤掉 words.append(termStr) // 有意义词拼成词语List } ... |
3)训练word2vec模型
Spark2.x版本使用 org.apache.spark.ml.feature.Word2Vec 进行word2vec模型训练。
import sparkSession.implicits._ // 开启spark隐式调用,例如xxx.toDF() val df = sparkSession.createDataFrame(tuples).toDF("text", "originalLine") // tuples类型为Seq[(Array[String], String)],其中第一个元素为文本分词后的有意义词语列表,第二个元素为原始文本;tuples包含所有的未标注样本 val vectorSize = 100 // 此数值影响文本间的区分度和计算量,根据经验值微调 val word2Vec = new Word2Vec().setVectorSize(vectorSize).setInputCol("text").setOutputCol("lineVector") // 输入词语列表,输出Double向量 val word2vecModel = word2Vec.fit(df) // 得到word2vec模型 |
4)使用word2vec模型将文本转化为向量
var result = word2vecModel.transform(df).select("originalLine", "lineVector") // 输入DataFrame包含"text", "originalLine"两列,输出DataFrame包含"originalLine", "lineVector"两列 |
输入输出的列名、列数可按需要调整,只要确保与训练word2vec模型时指定的输入输出列名对应即可。
5)根据不同的机器学习算法,调整向量值域
本文的实践中发现,word2vecModel 输出的向量中每个Double的范围处于(-1,1)区间,而Spark的NaiveBayes则要求输入的向量值不小于0,因此需要进行调整:
result = result.map{ item => val text = classOf[String].cast(item.get(0)) // Row对象元素下标从0开始,对应"originalLine"列 val vector = classOf[Vector].cast(item.get(1)) // 对应"lineVector"列 // 调整KMeans专用向量值域 val doubleArray1 = vector.toArray for( i <- 0 until doubleArray1.size ){ doubleArray1(i) = (doubleArray1(i) + 1) / word2vecKMeansReduce // word2vecKMeansReduce的经验值为2.0,即向量值域从(-1,1)调整为(0,1) } // 调整NaiveBayes专用向量值域 val doubleArray2 = vector.toArray for( i <- 0 until doubleArray2.size ){ doubleArray2(i) = (doubleArray2(i) + 1) / word2vecNaiveBayesReduce // word2vecNaiveBayesReduce的经验值为0.5,即向量值域从(-1,1)调整为(0,4) } (text, Vectors.dense(doubleArray1), Vectors.dense(doubleArray2)) }.toDF("originalLine", "lineVectorKMeans", "lineVectorNaiveBayes") // 最终("originalLine", "lineVector")变为("originalLine", "lineVectorKMeans", "lineVectorNaiveBayes") |
3.3 KMeans聚类
1)对所有样本进行聚类
val kmeans = new KMeans().setK(kmeansGroupCount).setSeed(1L).setFeaturesCol("lineVectorKMeans").setPredictionCol("kmeans") // kmeansGroupCount 为分组数量,根据实际的样本分布情况确定,文本中取150 // "lineVectorKMeans"为上文中word2vec向量化结果 // 输出列名定义为"kmeans" val kmeansModel = kmeans.fit(unlabelledDataFrame) // unlabelledDataFrame为包含("originalLine", "lineVectorKMeans", "lineVectorNaiveBayes")三列的无标注样本集合 // 转换后丢弃 lineVectorKMeans 列 val unlabelledDataFrameNew = kmModel.transform(unlabelledDataFrame).select("originalLine", "lineVectorNaiveBayes", "kmeans") // 对标注文本聚类,标注文本集合包含("originalLine", "businessType", "lineVectorKMeans", "lineVectorNaiveBayes")四列,其中"businessType"为人工标注的分类标签 // 转换后丢弃 lineVectorKMeans 列 val labelledDataFrameNew = kmModel.transform(labelledDataFrame).select("originalLine", "lineVectorNaiveBayes", "businessType", "kmeans") |
2)如果还没有人工标注样本,输出对无标注样本的聚类结果,用于辅助人工标注
// 分文件输出 val strBuilderArray = new Array[StringBuilder](kmeansGroupCount) for( i <- 0 until strBuilderArray.size ){ strBuilderArray(i) = new StringBuilder } for( x <- unlabelledDataFrame.select("kmeans", "originalLine").collect() ){ val logger1 = Logger(LoggerFactory.getLogger(getClass.getName)) val kmeans = classOf[java.lang.Integer].cast(x.get(0)) val text = x.get(1) if( kmeans >= strBuilderArray.size ){ logger1.error("unexpected kmeans " + kmeans) } val builder = strBuilderArray(kmeans) builder.append(text).append("\n") } for( i <- 0 until strBuilderArray.size ){ val builder = strBuilderArray(i) val fileName = outputPath + "/group" + i val bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(fileName)))) bw.write(builder.toString()) bw.close() } |
执行上述代码后,outputPath 文件夹下将会出现一组groupxxx文件,从每个文件中挑选任意多个样本进行人工标注,可以达到比完全随机标注好很多的效果。
有了标注样本,同样的进行向量化、KMeans聚类操作之后,即可用于训练NaiveBayes模型。
3.4 NaiveBayes模型训练
1)标注样本转换为LabeledPoint对象
val rows = labelledDataFrame.select("originalLine", "lineVectorNaiveBayes", "businessType").collect() ... for(item <- rows) { val text = classOf[String].cast(item.get(0)) val vector = classOf[Vector].cast(item.get(1)) val businessType = classOf[java.lang.Integer].cast(item.get(2)) ... val labeledPoint = new LabeledPoint(businessType.toDouble, vector) // 第一参数为人工标注的标签值,第二参数为word2vec向量 labeledPointList.append(labeledPoint) // 所有样本拼接成list } |
2)训练模型
import sparkSession.implicits._ val nbModel = new NaiveBayes().fit(labelPoints.toDF()) |
3.5 使用NaiveBayes模型预测无标注样本
val udfCode : (Vector => java.lang.Integer) = (vec : Vector) => { nbModel.predict(vec).round.toInt + 1 // 此版本Spark库预测值返回的是期望值的下标,从0开始;而标签值要求从1开始。请根据实际版本差异调整 } val addColumn = udf(udfCode) // 使用udf函数 // 输入为word2vec向量列"lineVectorNaiveBayes",输出为预测的标签值列"businessType" val newDataFrame = unlabelledDataFrame.withColumn("businessType", addColumn(unlabelledDataFrame("lineVectorNaiveBayes"))) |
3.6 从预测结果中获取可信标注样本
基本思路为:
存在某个无标注样本X,预测的标签为NB(X),KMeans聚类编号为KM(X);
如果存在某个人工标注样本Y,且标签NB(Y) = NB(X),且KMeans聚类编号KM(Y) = KM(X),则认为对X的预测可以信任。
或者简单来说,基于假设:NaiveBayes预测为同一类的样本,其KMeans聚类结果也应为同一类。
// 获取可信标注列表 val trustedDatas = newDataFrame.select("originalLine", "lineVectorNaiveBayes", "businessType", "kmeans").filter{ item => val logger1 = Logger(LoggerFactory.getLogger("newDataFrame.filter")) val businessType = classOf[java.lang.Integer].cast(item.get(2)) val kmeans = classOf[java.lang.Integer].cast(item.get(3)) val entry = kmeansMap.get(kmeans) // kmeansMap 为预先处理好的已标注样本统计结果。key:kmeans分组编号,value:Map<NaiveBayes标签值,样本数量> if( null == entry ){ logger1.error("unexpected kmeans group id : " + kmeans) } val count = entry.get(businessType) if( null != count && count >= minKMeansNaiveBayesMatchedCount ){ // 此处minKMeansNaiveBayesMatchedCount取1,即只要找到一个Y,就判定X可信 // 可信标注 true } else{ // 不可信标注 false } } ... // 合并可信样本、人工标注样本,作为新的标注样本集合 labelledDataFrame = labelledDataFrame.union(trustedDatas) |
当有新的可信样本产生,扩充了标注样本集合时,可进行下一轮NaiveBayes模型训练。由于增加了可信样本,新一轮的NaiveBayes模型将产生更优异的表现,就像青出于蓝的学生。
实践中,两轮训练后,可信样本不再产生,迭代结束。
3.7 模型验证
基本流程:
1)执行Hive SQL获取待分类文本全集
val dataFrame = sparkSession.sql("SELECT id, text as originalLine FROM my_big_table") |
2)执行文本向量化
先用udf函数将文本分词、过滤无用词,输出为新的列"text";
然后用word2vecModel 将"text"列转化为向量"lineVectorNaiveBayes"列;
对"lineVectorNaiveBayes"列的值域进行调整,确保与训练NaiveBayes模型的样本值域相同;
3)预测分类
用udf函数对"lineVector"列进行NaiveBayes分类预测,得到"business_type"列
4)写入HIVE
分类结果创建临时表,然后用INSERT语句写入预先定义的Hive表
resultDataFrame.select("id", "business_type").createOrReplaceTempView("temptable1") ... sparkSession.sql("INSERT INTO my_type_table SELECT id, business_type FROM temptable1") |
5)随机采样,人工评分
参考上文《随机采样》,从Hive表中随机选取若干分类结果,人工评分。
4. 注意事项
1)停用词表非常有效
实验证明,常见的客套话("谢谢"、"麻烦"等)容易影响分类效果,建议加入停用词表
2)样本数量必须平滑
这里的"平滑"是指针对不同分类,训练样本数量接近。
实验证明,样本数量越多的分类,算法预测结果倾斜越明显。
相同的样本反复训练,可以解决样本数量不足的问题。
5. 分类验证结果
由于时间关系,这里只进行了两组模型参数的对比。
待分类文本数量为2亿多。
模型参数分组 | 有标注样本数量 | 无标注样本数量 |
模型训练+分类耗时 (分钟) |
抽样数量 | 分类正确数量 | 分类错误数量 | 准确率 |
---|---|---|---|---|---|---|---|
A | 301 | 56189 | 59 | 1121 | 807 | 314 | 72.0% |
B | 301 | 1077531 | 87 | 1052 | 913 | 139 | 86.8% |
从结果不难看出,随着无标注样本数量的增加,模型的准确率得到很大提升。这可能归功于本文算法背后的假设:NaiveBayes预测为同一类的样本,其KMeans聚类结果也应为同一类。
6. 后续优化方向
分析抽样结果,可以发现大部分错误的样本都不在人工标注集合中,且有很多样本套用任何分类都比较勉强。
要进一步提高准确率,必须深入分析所有样本内在的业务逻辑,制定覆盖范围更广泛、边界更清晰的分类标准,而这是算法本身无法解决的问题。