Warning: Undefined variable $post_id in /data/www/wwwroot/blog.ymypay.cn/wp-content/plugins/wp-baidu-record/wp-baidu-record.php on line 56

大数据技术 / 学习日志 · 2022年9月29日 0

scala Flink split弃用替代


Warning: Undefined variable $post_id in /data/www/wwwroot/blog.ymypay.cn/wp-content/plugins/wp-baidu-record/wp-baidu-record.php on line 56

flink高版本弃用了split,原因是性能缺失和bug太多。flink官方采用了替代方案。官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/side_output/

示例

package stream.transformation

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object SplitSelectDemo {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val numDs: DataStream[Int] = senv.fromCollection(List(1,2,3,4,5,6))
//    //使用split把数据流分成奇数和偶数
//    val splitStream: SplitStream[Int] = numDs.split(
//      i => {
//        val res: Int = i % 2
//        if (res == 0) { //偶数
//          List("even")
//        } else { //奇数
//          List("odd")
//        }
//        //只是名称,需要select去选择获取
//      }
//    )
//    //从数据流中获取
//    val evenDs: DataStream[Int] = splitStream.select("even")
//    val oddDs: DataStream[Int] = splitStream.select("odd")
//    val allDs: DataStream[Int] = splitStream.select("even","odd")
//    evenDs.print()
//    oddDs.print()
//    allDs.print()
    //split被启用,性能缺失以及众多BUG原因。新方法为process output

    //先定义标签
    val evenTag: OutputTag[String] = OutputTag[String]("even")
    val oddTag:OutputTag[String] = OutputTag[String]("odd")
    val resDs: DataStream[Int] = numDs.process(
      //再通过此类重写processElement函数进行流数据处理,并按标签输出
      new ProcessFunction[Int, Int] {
        /**
          *
          * @param i 原数据
          * @param context 窗口上下文
          * @param collector
          */
        override def processElement(i: Int, context: ProcessFunction[Int, Int]#Context, collector: Collector[Int]): Unit = {
          if (i % 2 == 0) {
            context.output(evenTag, i.toString)
          } else {
            context.output(oddTag, i.toString)
          }
        }
      }
    )
    //直接输出是空的
    resDs.print()
    //全部
    val evenDs: DataStream[String] = resDs.getSideOutput(evenTag)
    val oddDs: DataStream[String] = resDs.getSideOutput(oddTag)
    evenDs.connect(oddDs).map(l=>l,r=>r).print()
    //标签输出
//    resDs.getSideOutput(evenTag).print()


    senv.execute()
  }
}


微信扫描下方的二维码阅读本文