Scala和apache flink的新手为什么我的map函数能够正确运行REPL但在Flink中失败
我正在尝试(失败)在Apache Flink中运行一个简单的hello世界类型程序。该代码从Apache Kafka收到一条消息后添加了一个“。”在每个字母后面并将新字符串打印到stdout。该代码正确地从Kafka获取消息,但是添加了“。”的映射函数。失败。我试过在REPL提示符下的函数,并且scala代码在那里正确地工作。 Scala代码:Scala和apache flink的新手为什么我的map函数能够正确运行REPL但在Flink中失败
scala> input = "hello"
input: String = hello
scala> val output = input.flatMap(value => value + ".")
output: String = h.e.l.l.o
弗林克程序: flink code 切断线读取
val messageStream = env.addSource(new FlinkKafkaConsumer09("CL", new SimpleStringSchema, properties))
我无法弄清楚我要去哪里错了,我已经试过Apache文档无济于事。你可以给我的任何帮助都会受到好评。
所有我会建议学习有关函数式编程和操作,如地图的一些基础知识首先/ flatMap /缩小等
所有这些提到的功能应用到集合。在您的斯卡拉例如,作为@pedrofuria指出,在应用flatMap
功能String
这是char
小号
在弗林克例如messageStream
可以抽象为一个字符串的收藏,所以要执行到您所描述的操作,你应该做某事喜欢:
val stream = messageStream.map(str => str.mkString("."))
我用mkString
代替flatMap
从你的榜样,因为前者而不是
h.e.l.l.o(为你写)
它产生
h.e.l.l.o.
但是,再次真正从函数式编程的基础知识入手。
感谢您的帮助,我想出了问题,有点。 尽管flatMap函数在scala提示符下工作,但它并不适用于Flink,因为Flink要求FlatMap通过覆盖传递一个新的FlatMapFunction。仍然不确定它为什么在flink的scala提示符下工作,但是代码现在编译并按预期运行。
因为scala提示符中的flatMap与flink程序中的flatMap不一样。
您的scala提示符中的flatMap只是scala中的一个函数。
的弗林克flatMap可以在下面的输入喜欢的应用:
val input = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = input
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
参见:scala-shell
如果你告诉我们什么是错误这将有助于。你有没有尝试在cmd行编译?已知Scala IDE发布虚假错误。 – pedrofurla
此外,它似乎而不是flatMap你可能想使用messageStream中的地图。 flatMap在'input'示例中起作用,因为'value'是一个char和'+“。”'将它变成s String。 – pedrofurla
错误消息缺少参数类型。我也认为它可能是IDE,但当我尝试编译maven中的代码时,我得到了同样的错误,这阻止了我部署代码。这两个环境中的平面地图应该不一样吗?每次查看字符串一个字符? –