flink高版本弃用了split,原因是性能缺失和bug太多。flink官方采用了替代方案。官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/side_output/
示例
package stream.transformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object SplitSelectDemo {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val numDs: DataStream[Int] = senv.fromCollection(List(1,2,3,4,5,6))
// //使用split把数据流分成奇数和偶数
// val splitStream: SplitStream[Int] = numDs.split(
// i => {
// val res: Int = i % 2
// if (res == 0) { //偶数
// List("even")
// } else { //奇数
// List("odd")
// }
// //只是名称,需要select去选择获取
// }
// )
// //从数据流中获取
// val evenDs: DataStream[Int] = splitStream.select("even")
// val oddDs: DataStream[Int] = splitStream.select("odd")
// val allDs: DataStream[Int] = splitStream.select("even","odd")
// evenDs.print()
// oddDs.print()
// allDs.print()
//split被启用,性能缺失以及众多BUG原因。新方法为process output
//先定义标签
val evenTag: OutputTag[String] = OutputTag[String]("even")
val oddTag:OutputTag[String] = OutputTag[String]("odd")
val resDs: DataStream[Int] = numDs.process(
//再通过此类重写processElement函数进行流数据处理,并按标签输出
new ProcessFunction[Int, Int] {
/**
*
* @param i 原数据
* @param context 窗口上下文
* @param collector
*/
override def processElement(i: Int, context: ProcessFunction[Int, Int]#Context, collector: Collector[Int]): Unit = {
if (i % 2 == 0) {
context.output(evenTag, i.toString)
} else {
context.output(oddTag, i.toString)
}
}
}
)
//直接输出是空的
resDs.print()
//全部
val evenDs: DataStream[String] = resDs.getSideOutput(evenTag)
val oddDs: DataStream[String] = resDs.getSideOutput(oddTag)
evenDs.connect(oddDs).map(l=>l,r=>r).print()
//标签输出
// resDs.getSideOutput(evenTag).print()
senv.execute()
}
}
微信扫描下方的二维码阅读本文

