如何使用结构化流从Kafka读取JSON格式的记录?

问题描述:

我想使用基于DataFrame/Dataset API的Spark-Streaming使用structured streaming approach从Kafka加载数据流。如何使用结构化流从Kafka读取JSON格式的记录?

我使用:

  • 火花2.10
  • 卡夫卡0.10
  • 火花-SQL卡夫卡-0-10

火花卡夫卡数据源定义了底层模式:

|key|value|topic|partition|offset|timestamp|timestampType| 

我的数据以json格式并且它们存储在的值列中。我正在寻找一种方法,如何从值列中提取底层模式并将接收到的数据帧更新为存储在中的列?我尝试下面的方法,但它不工作:

val columns = Array("column1", "column2") // column names 
val rawKafkaDF = sparkSession.sqlContext.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers","localhost:9092") 
    .option("subscribe",topic) 
    .load() 
    val columnsToSelect = columns.map(x => new Column("value." + x)) 
    val kafkaDF = rawKafkaDF.select(columnsToSelect:_*) 

    // some analytics using stream dataframe kafkaDF 

    val query = kafkaDF.writeStream.format("console").start() 
    query.awaitTermination() 

在这里我得到异常org.apache.spark.sql.AnalysisException: Can't extract value from value#337;,因为在创建流的时候,里面的值是不知道......

你有什么建议?

从Spark的角度来看value只是一个字节序列。它没有关于序列化格式或内容的知识。为了能够提取字段,你必须先解析它。

如果数据序列化为JSON字符串,则有两个选项。使用get_json_object由路径提取物领域

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.from_json 

val schema: StructType = StructType(Seq(
    StructField("column1", ???), 
    StructField("column2", ???) 
)) 

rawKafkaDF.select(from_json($"value".cast(StringType), schema)) 

castStringType:您可以castvalueStringType和使用from_json,并提供一个架构

import org.apache.spark.sql.functions.get_json_object 

val columns: Seq[String] = ??? 

val exprs = columns.map(c => get_json_object($"value", s"$$.$c")) 

rawKafkaDF.select(exprs: _*) 

cast后来到所需的类型。