Scala和apache flink的新手为什么我的map函数能够正确运行REPL但在Flink中失败

Scala和apache flink的新手为什么我的map函数能够正确运行REPL但在Flink中失败

问题描述:

我正在尝试(失败)在Apache Flink中运行一个简单的hello世界类型程序。该代码从Apache Kafka收到一条消息后添加了一个“。”在每个字母后面并将新字符串打印到stdout。该代码正确地从Kafka获取消息,但是添加了“。”的映射函数。失败。我试过在REPL提示符下的函数,并且scala代码在那里正确地工作。 Scala代码:Scala和apache flink的新手为什么我的map函数能够正确运行REPL但在Flink中失败

scala> input = "hello" 
input: String = hello 
scala> val output = input.flatMap(value => value + ".") 
output: String = h.e.l.l.o 

弗林克程序: flink code 切断线读取

val messageStream = env.addSource(new FlinkKafkaConsumer09("CL", new SimpleStringSchema, properties)) 

我无法弄清楚我要去哪里错了,我已经试过Apache文档无济于事。你可以给我的任何帮助都会受到好评。

+0

如果你告诉我们什么是错误这将有助于。你有没有尝试在cmd行编译?已知Scala IDE发布虚假错误。 – pedrofurla

+0

此外,它似乎而不是flatMap你可能想使用messageStream中的地图。 flatMap在'input'示例中起作用,因为'value'是一个char和'+“。”'将它变成s String。 – pedrofurla

+0

错误消息缺少参数类型。我也认为它可能是IDE,但当我尝试编译maven中的代码时,我得到了同样的错误,这阻止了我部署代码。这两个环境中的平面地图应该不一样吗?每次查看字符串一个字符? –

所有我会建议学习有关函数式编程和操作,如地图的一些基础知识首先/ flatMap /缩小等

所有这些提到的功能应用到集合。在您的斯卡拉例如,作为@pedrofuria指出,在应用flatMap功能String这是char小号

在弗林克例如messageStream可以抽象为一个字符串的收藏,所以要执行到您所描述的操作,你应该做某事喜欢:

val stream = messageStream.map(str => str.mkString(".")) 

我用mkString代替flatMap从你的榜样,因为前者而不是

h.e.l.l.o(为你写)

它产生

h.e.l.l.o.

但是,再次真正从函数式编程的基础知识入手。

感谢您的帮助,我想出了问题,有点。 尽管flatMap函数在scala提示符下工作,但它并不适用于Flink,因为Flink要求FlatMap通过覆盖传递一个新的FlatMapFunction。仍然不确定它为什么在flink的scala提示符下工作,但是代码现在编译并按预期运行。

因为scala提示符中的flatMap与flink程序中的flatMap不一样。

您的scala提示符中的flatMap只是scala中的一个函数。

的弗林克flatMap可以在下面的输入喜欢的应用:

val input = benv.fromElements(
      "To be, or not to be,--that is the question:--", 
      "Whether 'tis nobler in the mind to suffer", 
      "The slings and arrows of outrageous fortune", 
      "Or to take arms against a sea of troubles,") 
val counts = input 
      .flatMap { _.toLowerCase.split("\\W+") } 
      .map { (_, 1) }.groupBy(0).sum(1) 

参见:scala-shell