使用Spark的Twitter流式传输
问题描述:
我试图使用Spark Scala代码流式传输twitter数据。我能够获取数据并创建数据框并查看它。但是,当尝试提取status.getPlace.getCountry()时,我得到显示java.lang.NullPointerException。使用Spark的Twitter流式传输
星火版本:2.0.0, 斯卡拉版本:
2.11.8试图用if条件,检查值等,但不成功。
代码:
val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
val filters:Seq[String] = Seq("hadoop")
val cb = new ConfigurationBuilder()
.setOAuthConsumerKey("******")
.setOAuthConsumerSecret("******")
.setOAuthAccessToken("********")
.setOAuthAccessTokenSecret("******").build()
val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status =>
val places = status.getPlace
val id = status.getUser.getId
val date = status.getUser.getCreatedAt.toString()
val user = status.getUser.getName()
val place = places.getCountry()
(id,date,user,place)
}
data.foreachRDD{rdd =>
import spark.implicits._
rdd.toDF("id","date","user","place").show()
}
ssc.start()
ssc.awaitTermination()
是否有来自Twitter的访问位置信息有任何限制? 任何建议都会有帮助。
感谢
答
您可以使用Option
处理null
S:
val data = tweetsdstream.map {
status =>
val place = Option(status.getPlace).map(_.getCountry).orNull
val id = status.getUser.getId
val user = status.getUser.getName
val date = status.getUser.getCreatedAt.toString
(id, date, user, place)
}
这样一来,你就能够想象的所有微博,无论他们是否有一个国家或没有(和它在国家未定义的情况下将为空)。
Option
对于处理可能丢失的数据非常有用,可以将其用于其他可能的空字段。
+0
,您的解决方案适用于我。非常感谢。 –
实际上大部分时间'getPlace'和'getCountry'都包含null值,您可以尝试使用geoLocation而不是 –