如何使用Kafka API入门Spark流和MapR流

这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息。 Spark Streaming是核心Spark API的扩展,可实现连续数据流处理。 MapR Streams是一个分布式消息传递系统,用于大规模流式传输事件数据。 MapR Streams使生产者和消费者可以通过Apache Kafka 0.9 API实时交换事件。 MapR Streams通过Kafka直接方法与Spark Streaming集成。 这篇文章是一个简单的示例方法,如果您是Spark Streaming和Kafka API的新手,则可能需要先阅读以下内容:

示例数据集来自2014年意大利电信大数据挑战赛 ,它由米兰和特伦托市的意大利电信蜂窝网络生成的汇总移动网络数据组成。 该数据基于2013年2个月内移动网络上发生的移动事件来衡量用户与移动电话网络互动的位置和水平。挑战中的项目使用此数据来提供洞察力,识别和预测移动电话大都市圈的基于位置的活动趋势和人口模式。

如何使用Kafka API入门Spark流和MapR流

  1. Square ID :城市网格中位置的ID
  2. 时间间隔时间间隔的开始
  3. 国家/地区代码 :电话国家/地区代码
  4. 短信输入活动 :在Square ID中收到短信
  5. 短信发送活动 :在Square ID中发送的短信
  6. 来电活动 :在Square ID中收到的来电
  7. 呼出活动 :在Square ID中发出的呼叫
  8. 互联网流量活动 :Square id内的互联网流量

数据记录为TSV格式,示例行如下所示:

如何使用Kafka API入门Spark流和MapR流

首先,导入将MapR Streams与Spark Streaming和Spark SQL集成所需的软件包。

如何使用Kafka API入门Spark流和MapR流

为了让Spark Streaming从MapR Streams中读取消息,您需要从org.apache.spark.streaming.kafka.v09导入。 为了让Spark Streaming将消息写入MapR Streams,您需要从org.apache.spark.streaming.kafka.producer._导入类;

Scala CallDataRecord案例类定义了与TSV记录相对应的架构。 parseCallDataRecord函数将制表符分隔的值解析为CallDataRecord案例类。

如何使用Kafka API入门Spark流和MapR流

这些是Spark Streaming Consumer Producer代码的基本步骤:

  1. 配置Kafka Consumer Producer属性。
  2. 初始化Spark StreamingContext对象。 使用此上下文,创建一个从主题读取消息的DStream。
  3. 应用转换(创建新的DStreams)。
  4. 将消息从转换的DStream写入主题。
  5. 开始接收数据并进行处理。 等待处理停止。

我们将通过示例应用程序代码完成所有这些步骤。

第一步是设置KafkaConsumer和KafkaProducer配置属性,稍后将使用它们来创建DStream,以接收/发送主题消息。 您需要设置以下参数:

  • 键和值反序列化器:用于反序列化消息。
  • 自动偏移量重置:从最早或最新的消息开始读取。
  • 引导服务器:由于代理地址实际上未由MapR流使用,因此可以将其设置为虚拟host:port。

有关配置参数的更多信息, 请参见MapR Streams文档。

如何使用Kafka API入门Spark流和MapR流

2)初始化Spark StreamingContext对象。

我们将KafkaUtils createDirectStream方法与StreamingContext对象,Kafka配置参数以及主题列表结合使用,以根据MapR Streams主题创建输入流。 这将创建一个DStream来表示传入的数据流,其中每个消息都是一个键值对。 我们使用DStream映射转换来创建带有消息值的DStream。

如何使用Kafka API入门Spark流和MapR流

如何使用Kafka API入门Spark流和MapR流

接下来,我们使用DStream foreachRDD方法将处理应用于此DStream中的每个RDD。 我们使用DStream上的map操作将消息值解析为CallDataRecord对象,然后将RDD转换为DataFrame,这允许您对流数据使用DataFrames和SQL操作。

如何使用Kafka API入门Spark流和MapR流

这是cdrDF.show的示例输出:

如何使用Kafka API入门Spark流和MapR流

CallDataRecord RDD对象由squareId分组和计数。 然后,此sendToKafka方法用于发送带有squareId的消息并计数到主题。

如何使用Kafka API入门Spark流和MapR流

如何使用Kafka API入门Spark流和MapR流

squareId和count的示例输出如下所示:

如何使用Kafka API入门Spark流和MapR流

要开始接收数据,我们必须在StreamingContext上显式调用start(),然后调用awaitTermination以等待流计算完成。

如何使用Kafka API入门Spark流和MapR流

本教程将在MapR v5.2 沙盒上运行,该沙盒包括MapR流和Spark 1.6.1。 您可以从此处下载代码,数据和说明以运行此示例:代码: https//github.com/caroljmcdonald/mapr-streams-spark

在此博客文章中,您学习了如何将Spark Streaming与MapR Streams集成以使用Kafka API来消费和产生消息。

翻译自: https://www.javacodegeeks.com/2016/09/get-started-spark-streaming-mapr-streams-using-kafka-api.html