自研半监督学习算法在短信分类中的实践

1. 背景

本文仅探讨自然语言分类算法。

1)有监督机器学习

所谓有监督,是指给定一组人工标注好的样本(打好分类标签的文本),通过机器学习算法训练模型(比如朴素贝叶斯分类器),然后用训练好的模型对未知的文本进行分类。

有监督机器学习的准确率受人工标注样本的数量和质量影响很大。在待分类实体总量较少(比如百万以下),且边界较为清晰(不同分类的实体间相似特征较少)时,可以达到很好的效果(90%+准确率)。

缺点也很明显:需要大量的高质量人工标注样本。在处理海量(比如亿级)且边界模糊的数据时基本无法满足需求。

 

2)无监督机器学习

所谓无监督,是指给定一组随机的样本(没有分类标签的文本),通过机器学习算法训练模型(比如KMeans算法),然后用训练好的模型对未知的文本进行聚类。

由于缺乏人工标注样本,机器无法准确判断文本的具体分类,但是通过文本相似度分析,可以将随机样本大致的聚类为K个组,同一组内文本相似度较高,不同组间文本相似度较低。

其最大的优势是不依赖于人工干预,理论上可以处理无限大的样本集合。

 

3)半监督机器学习

所谓半监督,是将有监督和无监督两种算法结合,在有限的人工标注样本的帮助下,结合无监督机器学习所获得的海量样本聚类信息,将人工标注的样本标签通过迭代方式“感染”处于同一聚类组中的无标注样本,从而获得一批数量巨大且标注好的样本。最后,用巨量的带标注样本训练有监督机器学习模型,得到最终的分类器。

虽然比起纯粹的有监督机器学习,半监督方式准确率要低一些(无监督带来的噪音),但在处理海量数据时,这种“老师”(人工标注样本)带“学生”(无监督聚类样本)的思想值得尝试。

 

4)短信分类

短信是一种典型的短文本、海量的自然语言信息。在动辄上亿的短信文本中,存在模板化的企业营销信息,也存在大量随意编辑的书面语、口语、符号信息。

传统的关键词匹配方式受限于建设关键词库的高成本,捉襟见肘。

这是否正是机器学习、人工智能大展身手之地?

2. 算法逻辑

自研半监督学习算法在短信分类中的实践

 

3. 关键技术点

3.1 随机采样

从HIVE仓库上亿短文本中随机采样。利用HIVE SQL的随机算子rand()来实现。代码示例:

...
            con = DriverManager.getConnection(hiveUrl, hiveUser, hivePwd); // hive-jdbc连接
            outputFileWriter = new BufferedWriter(
                    new OutputStreamWriter(new FileOutputStream(new File(fileOutputPath))), 2048 * 1024);  // 样本输出到文件
            // 执行查询
            Statement stmt = con.createStatement();
            String sql = "SELECT content FROM "
                    + "  (SELECT content, rand() rate FROM my_big_table ) as temp1 "
                    + "  WHERE temp1.rate <= " + sampleRate;  // sampleRate 为采样概率,比如从1亿样本中采样100万,此处等于0.01
            ResultSet res = stmt.executeQuery(sql);
            long id = 1;
            while (res.next()) {
                String msgText = res.getString(1);   // jdbc结果集列下标从1开始
                if (null == msgText || StringUtils.isBlank(msgText)) {
                    System.out.println("unexpected msgText in row: " + res.getRow());
                    continue;
                }
                id++;
                output.append(msgText).append("\n");
                if (id % 10000 == 0) {
                    // 批量输出
                    System.out.println("finished lines : " + id);
                    outputFileWriter.write(output.toString());
                    output = new StringBuilder();
                }
            }
            outputFileWriter.write(output.toString());
...

 

3.2 文本向量化

主要分以下几步。

1)分词

本文采用较为流行的汉语处理包HanLP实现分词。

先引入Maven依赖

        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.3</version>
        </dependency>

随时随地的分词

val segment = HanLP.newSegment()  // 分词工具
val terms = segment.seg(originalText) // 对原始文本执行分词
for( k <- 0 until terms.size() ){
  val term = terms.get(k)  // 每个词语对应一个Term对象
  val termStr = term.word; // 词语
  val firstChar = term.nature.firstChar();  // 词性
  ...
}

 

2)去除停用词、无意义词

所谓停用词,是指"帮忙","麻烦","谢谢"之类常见的,又无法表达句子实际语义的词,容易干扰分类结果,将这些词从分词结果中剔除即可。

另外,语气词、助词等无意义的词语,可通过 HanLP 分词后的词性剔除。例如:

...
          var termStr = "";
          val firstChar = term.nature.firstChar();  // 获取 HanLP 分词后的词性
          if (firstChar == 'n' // 名词系列
                  || firstChar == 'v' // 动词系列
                  || firstChar == 'a' // 形容词系列
                  || firstChar == 'r' // 代词系列
          ) {
              termStr = term.word;   // 有意义的词
          } else if (firstChar == 'm') { // 数词
              termStr = "NUM" + term.word.length();   // 数词加工。TODO : 加入手机号识别等算法,提高准确率
          }
          if (!StringUtils.isBlank(termStr)) {  // 无意义词直接过滤掉
              words.append(termStr)  // 有意义词拼成词语List
          }
...


3)训练word2vec模型

Spark2.x版本使用 org.apache.spark.ml.feature.Word2Vec 进行word2vec模型训练。

      import sparkSession.implicits._   // 开启spark隐式调用,例如xxx.toDF()
      val df = sparkSession.createDataFrame(tuples).toDF("text", "originalLine")  // tuples类型为Seq[(Array[String], String)],其中第一个元素为文本分词后的有意义词语列表,第二个元素为原始文本;tuples包含所有的未标注样本
      val vectorSize = 100  // 此数值影响文本间的区分度和计算量,根据经验值微调
      val word2Vec = new Word2Vec().setVectorSize(vectorSize).setInputCol("text").setOutputCol("lineVector")   // 输入词语列表,输出Double向量
      val word2vecModel = word2Vec.fit(df)   // 得到word2vec模型


4)使用word2vec模型将文本转化为向量

var result = word2vecModel.transform(df).select("originalLine", "lineVector")  // 输入DataFrame包含"text", "originalLine"两列,输出DataFrame包含"originalLine", "lineVector"两列

输入输出的列名、列数可按需要调整,只要确保与训练word2vec模型时指定的输入输出列名对应即可。

5)根据不同的机器学习算法,调整向量值域

本文的实践中发现,word2vecModel 输出的向量中每个Double的范围处于(-1,1)区间,而Spark的NaiveBayes则要求输入的向量值不小于0,因此需要进行调整:

      result = result.map{ item =>
        val text = classOf[String].cast(item.get(0))     // Row对象元素下标从0开始,对应"originalLine"列
        val vector = classOf[Vector].cast(item.get(1))   // 对应"lineVector"列
        // 调整KMeans专用向量值域
        val doubleArray1 = vector.toArray
        for( i <- 0 until doubleArray1.size ){
          doubleArray1(i) = (doubleArray1(i) + 1) / word2vecKMeansReduce     // word2vecKMeansReduce的经验值为2.0,即向量值域从(-1,1)调整为(0,1)
        }
        // 调整NaiveBayes专用向量值域
        val doubleArray2 = vector.toArray
        for( i <- 0 until doubleArray2.size ){
          doubleArray2(i) = (doubleArray2(i) + 1) / word2vecNaiveBayesReduce     // word2vecNaiveBayesReduce的经验值为0.5,即向量值域从(-1,1)调整为(0,4)
        }
        (text, Vectors.dense(doubleArray1), Vectors.dense(doubleArray2))
      }.toDF("originalLine", "lineVectorKMeans", "lineVectorNaiveBayes")  // 最终("originalLine", "lineVector")变为("originalLine", "lineVectorKMeans", "lineVectorNaiveBayes")

 

3.3 KMeans聚类

1)对所有样本进行聚类

val kmeans = new KMeans().setK(kmeansGroupCount).setSeed(1L).setFeaturesCol("lineVectorKMeans").setPredictionCol("kmeans")
// kmeansGroupCount 为分组数量,根据实际的样本分布情况确定,文本中取150
// "lineVectorKMeans"为上文中word2vec向量化结果
// 输出列名定义为"kmeans"
val kmeansModel = kmeans.fit(unlabelledDataFrame)  // unlabelledDataFrame为包含("originalLine", "lineVectorKMeans", "lineVectorNaiveBayes")三列的无标注样本集合
// 转换后丢弃 lineVectorKMeans 列
val unlabelledDataFrameNew = kmModel.transform(unlabelledDataFrame).select("originalLine", "lineVectorNaiveBayes", "kmeans")
// 对标注文本聚类,标注文本集合包含("originalLine", "businessType", "lineVectorKMeans", "lineVectorNaiveBayes")四列,其中"businessType"为人工标注的分类标签
// 转换后丢弃 lineVectorKMeans 列
val labelledDataFrameNew = kmModel.transform(labelledDataFrame).select("originalLine", "lineVectorNaiveBayes", "businessType", "kmeans")

2)如果还没有人工标注样本,输出对无标注样本的聚类结果,用于辅助人工标注

      // 分文件输出
      val strBuilderArray = new Array[StringBuilder](kmeansGroupCount)
      for( i <- 0 until strBuilderArray.size ){
        strBuilderArray(i) = new StringBuilder
      }
      for( x <- unlabelledDataFrame.select("kmeans", "originalLine").collect() ){
        val logger1 = Logger(LoggerFactory.getLogger(getClass.getName))
        val kmeans = classOf[java.lang.Integer].cast(x.get(0))
        val text = x.get(1)
        if( kmeans >= strBuilderArray.size ){
          logger1.error("unexpected kmeans " + kmeans)
        }
        val builder = strBuilderArray(kmeans)
        builder.append(text).append("\n")
      }
      for( i <- 0 until strBuilderArray.size ){
        val builder = strBuilderArray(i)
        val fileName = outputPath + "/group" + i
        val bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(fileName))))
        bw.write(builder.toString())
        bw.close()
      }

执行上述代码后,outputPath 文件夹下将会出现一组groupxxx文件,从每个文件中挑选任意多个样本进行人工标注,可以达到比完全随机标注好很多的效果。

有了标注样本,同样的进行向量化、KMeans聚类操作之后,即可用于训练NaiveBayes模型。

3.4 NaiveBayes模型训练

1)标注样本转换为LabeledPoint对象

    val rows = labelledDataFrame.select("originalLine", "lineVectorNaiveBayes", "businessType").collect()
...
    for(item <- rows) {
      val text = classOf[String].cast(item.get(0))
      val vector = classOf[Vector].cast(item.get(1))
      val businessType = classOf[java.lang.Integer].cast(item.get(2))
...
      val labeledPoint = new LabeledPoint(businessType.toDouble, vector)    // 第一参数为人工标注的标签值,第二参数为word2vec向量
      labeledPointList.append(labeledPoint)  // 所有样本拼接成list
    }

2)训练模型

    import sparkSession.implicits._
    val nbModel = new NaiveBayes().fit(labelPoints.toDF())

3.5 使用NaiveBayes模型预测无标注样本

    val udfCode : (Vector => java.lang.Integer) = (vec : Vector) => {
      nbModel.predict(vec).round.toInt + 1 // 此版本Spark库预测值返回的是期望值的下标,从0开始;而标签值要求从1开始。请根据实际版本差异调整
    }
    val addColumn = udf(udfCode)   // 使用udf函数
    // 输入为word2vec向量列"lineVectorNaiveBayes",输出为预测的标签值列"businessType"
    val newDataFrame = unlabelledDataFrame.withColumn("businessType", addColumn(unlabelledDataFrame("lineVectorNaiveBayes")))

3.6 从预测结果中获取可信标注样本

基本思路为:

存在某个无标注样本X,预测的标签为NB(X),KMeans聚类编号为KM(X);

如果存在某个人工标注样本Y,且标签NB(Y) = NB(X),且KMeans聚类编号KM(Y) = KM(X),则认为对X的预测可以信任。

或者简单来说,基于假设:NaiveBayes预测为同一类的样本,其KMeans聚类结果也应为同一类。

      // 获取可信标注列表
      val trustedDatas = newDataFrame.select("originalLine", "lineVectorNaiveBayes", "businessType", "kmeans").filter{ item =>
        val logger1 = Logger(LoggerFactory.getLogger("newDataFrame.filter"))
        val businessType = classOf[java.lang.Integer].cast(item.get(2))
        val kmeans = classOf[java.lang.Integer].cast(item.get(3))
        val entry = kmeansMap.get(kmeans)   // kmeansMap 为预先处理好的已标注样本统计结果。key:kmeans分组编号,value:Map<NaiveBayes标签值,样本数量>
        if( null == entry ){
          logger1.error("unexpected kmeans group id : " + kmeans)
        }
        val count = entry.get(businessType)
        if( null != count && count >= minKMeansNaiveBayesMatchedCount ){  // 此处minKMeansNaiveBayesMatchedCount取1,即只要找到一个Y,就判定X可信
          // 可信标注
          true
        }
        else{
          // 不可信标注
          false
        }
      }
...
      // 合并可信样本、人工标注样本,作为新的标注样本集合
      labelledDataFrame = labelledDataFrame.union(trustedDatas)

当有新的可信样本产生,扩充了标注样本集合时,可进行下一轮NaiveBayes模型训练。由于增加了可信样本,新一轮的NaiveBayes模型将产生更优异的表现,就像青出于蓝的学生。

实践中,两轮训练后,可信样本不再产生,迭代结束。

3.7 模型验证

基本流程:

1)执行Hive SQL获取待分类文本全集

val dataFrame = sparkSession.sql("SELECT id, text as originalLine FROM my_big_table")

2)执行文本向量化

先用udf函数将文本分词、过滤无用词,输出为新的列"text";

然后用word2vecModel 将"text"列转化为向量"lineVectorNaiveBayes"列;

对"lineVectorNaiveBayes"列的值域进行调整,确保与训练NaiveBayes模型的样本值域相同;

3)预测分类

用udf函数对"lineVector"列进行NaiveBayes分类预测,得到"business_type"列

4)写入HIVE

分类结果创建临时表,然后用INSERT语句写入预先定义的Hive表

resultDataFrame.select("id", "business_type").createOrReplaceTempView("temptable1")
...
sparkSession.sql("INSERT INTO my_type_table SELECT id, business_type FROM temptable1")

5)随机采样,人工评分

参考上文《随机采样》,从Hive表中随机选取若干分类结果,人工评分。

4. 注意事项

1)停用词表非常有效

实验证明,常见的客套话("谢谢"、"麻烦"等)容易影响分类效果,建议加入停用词表

2)样本数量必须平滑

这里的"平滑"是指针对不同分类,训练样本数量接近。

实验证明,样本数量越多的分类,算法预测结果倾斜越明显。

相同的样本反复训练,可以解决样本数量不足的问题。

5. 分类验证结果

由于时间关系,这里只进行了两组模型参数的对比。

待分类文本数量为2亿多。

模型参数分组 有标注样本数量 无标注样本数量

模型训练+分类耗时

(分钟)

抽样数量 分类正确数量 分类错误数量 准确率
A 301 56189 59 1121 807 314 72.0%
B 301 1077531 87 1052 913 139 86.8%

从结果不难看出,随着无标注样本数量的增加,模型的准确率得到很大提升。这可能归功于本文算法背后的假设:NaiveBayes预测为同一类的样本,其KMeans聚类结果也应为同一类。

6. 后续优化方向

分析抽样结果,可以发现大部分错误的样本都不在人工标注集合中,且有很多样本套用任何分类都比较勉强。

要进一步提高准确率,必须深入分析所有样本内在的业务逻辑,制定覆盖范围更广泛、边界更清晰的分类标准,而这是算法本身无法解决的问题。