聊聊flink DataStream的split操作

本文主要研究一下flink DataStream的split操作

实例

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {     @Override     public Iterable<String> select(Integer value) {         List<String> output = new ArrayList<String>();         if (value % 2 == 0) {             output.add("even");         }         else {             output.add("odd");         }         return output;     } });
  • 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

DataStream.split

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public public class DataStream<T> {      //......      public SplitStream<T> split(OutputSelector<T> outputSelector) {         return new SplitStream<>(this, clean(outputSelector));     }      //...... }
  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolving public interface OutputSelector<OUT> extends Serializable {      Iterable<String> select(OUT value);  }
  • OutputSelector定义了select方法用于给element打上outputNames

SplitStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolving public class SplitStream<OUT> extends DataStream<OUT> {      protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {         super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));     }      public DataStream<OUT> select(String... outputNames) {         return selectOutput(outputNames);     }      private DataStream<OUT> selectOutput(String[] outputNames) {         for (String outName : outputNames) {             if (outName == null) {                 throw new RuntimeException("Selected names must not be null");             }         }          SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));         return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);     }  }
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

StreamGraphGenerator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@Internal public class StreamGraphGenerator {      //......      private Collection<Integer> transform(StreamTransformation<?> transform) {          if (alreadyTransformed.containsKey(transform)) {             return alreadyTransformed.get(transform);         }          LOG.debug("Transforming " + transform);          if (transform.getMaxParallelism() <= 0) {              // if the max parallelism hasn't been set, then first use the job wide max parallelism             // from theExecutionConfig.             int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();             if (globalMaxParallelismFromConfig > 0) {                 transform.setMaxParallelism(globalMaxParallelismFromConfig);             }         }          // call at least once to trigger exceptions about MissingTypeInfo         transform.getOutputType();          Collection<Integer> transformedIds;         if (transform instanceof OneInputTransformation<?, ?>) {             transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);         } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {             transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);         } else if (transform instanceof SourceTransformation<?>) {             transformedIds = transformSource((SourceTransformation<?>) transform);         } else if (transform instanceof SinkTransformation<?>) {             transformedIds = transformSink((SinkTransformation<?>) transform);         } else if (transform instanceof UnionTransformation<?>) {             transformedIds = transformUnion((UnionTransformation<?>) transform);         } else if (transform instanceof SplitTransformation<?>) {             transformedIds = transformSplit((SplitTransformation<?>) transform);         } else if (transform instanceof SelectTransformation<?>) {             transformedIds = transformSelect((SelectTransformation<?>) transform);         } else if (transform instanceof FeedbackTransformation<?>) {             transformedIds = transformFeedback((FeedbackTransformation<?>) transform);         } else if (transform instanceof CoFeedbackTransformation<?>) {             transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);         } else if (transform instanceof PartitionTransformation<?>) {             transformedIds = transformPartition((PartitionTransformation<?>) transform);         } else if (transform instanceof SideOutputTransformation<?>) {             transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);         } else {             throw new IllegalStateException("Unknown transformation: " + transform);         }          // need this check because the iterate transformation adds itself before         // transforming the feedback edges         if (!alreadyTransformed.containsKey(transform)) {             alreadyTransformed.put(transform, transformedIds);         }          if (transform.getBufferTimeout() >= 0) {             streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());         }         if (transform.getUid() != null) {             streamGraph.setTransformationUID(transform.getId(), transform.getUid());         }         if (transform.getUserProvidedNodeHash() != null) {             streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());         }          if (transform.getMinResources() != null && transform.getPreferredResources() != null) {             streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());         }          return transformedIds;     }      private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {         StreamTransformation<T> input = select.getInput();         Collection<Integer> resultIds = transform(input);          // the recursive transform might have already transformed this         if (alreadyTransformed.containsKey(select)) {             return alreadyTransformed.get(select);         }          List<Integer> virtualResultIds = new ArrayList<>();          for (int inputId : resultIds) {             int virtualId = StreamTransformation.getNewNodeId();             streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());             virtualResultIds.add(virtualId);         }         return virtualResultIds;     }      private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {          StreamTransformation<T> input = split.getInput();         Collection<Integer> resultIds = transform(input);          // the recursive transform call might have transformed this already         if (alreadyTransformed.containsKey(split)) {             return alreadyTransformed.get(split);         }          for (int inputId : resultIds) {             streamGraph.addOutputSelector(inputId, split.getOutputSelector());         }          return resultIds;     }      //...... }
  • StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
  • transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
  • transformSplit方法则根据split.getOutputSelector()来addOutputSelector

小结

  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
  • OutputSelector定义了select方法用于给element打上outputNames
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

doc

脚本宝典为你提供优质服务
脚本宝典 » 聊聊flink DataStream的split操作

发表评论

提供最优质的资源集合

立即查看 了解详情