第一章 flink简介与wordcount

发布时间:2022-07-04 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了第一章 flink简介与wordcount脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

一、简介

flink是德国和欧洲的一些大学联合发起的项目,14年进入了apache基金会,不到一年的时间,flink就成为了Apache的顶级项目,flink的理念是“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用打造的开流处理框架,用于对无界和有界数据流进行有状态计算。”。

flink有几个重要的特点:

  1. 事件驱动型

    应用是有状态的,从事件流提取数据,并根据到来的事件触发计算、状态更新或者其他动作。以kafka为代表的消息队列几乎都是事件驱动型应用。

与之不同的是SparkStreaming微批次,它将一批批数据看作连续的流,进行处理。

  1. 流与批的世界观

    批处理的特点是有界、持久、大量,非常适合全套记录才能完成的计算,一般用于离线计算。流处理的特点是无界、实时,无需针对整个数据集操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。在 spark 的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。 而在 flink 的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

  2. 分层API

第一章 flink简介与wordcount

 

最底层的抽象仅仅提供了有状态流,它将通过过程函数被嵌入到DataStream API中。实际上用户只需要使用核心API编程计科,比如DataStream API(有界或者无界流数据)以及DataSet API(有界数据集)。Table API是以表为中心的声明式编程,其中表有可能会动态变化(在表达流数据时)。

目前Flink作为批处理还不是主流,不如Spark成熟,所以DataSet使用的并不是很多。所以主要是学习DataStream API 的使用。Flink Table API和Flink SQL也并不完善,很多大厂都在自己定制。

Flink的几大模块主要如下:

  • Flink Table&Flink SQL

  • Flink Gelly图计算

  • Flink CEP复杂事件处理

二、Wordcount

Wordcount在大数据中有点像Hello World,当我们输出Hello World的时候,就说名成功了,同样在大数据项目中如果成功的统计出了文本或者socket流中的单词数量,也相当于成功运行了第一个项目。flink是一个流批一体的计算引擎,所以wordcount分为两种,从文本或者其它存储中读取的批处理和从socket读取的流处理wordcount。

项目使用Maven进行项目管理,首先编写pom文件,添加下面两项依赖。

<dePEndencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
</dependencies>

添加依赖后发现在第二个依赖中有2.12的后缀,flink使用Java编写,但是他依赖的底层使用的是akka进行通信,akka使用Scala进行编写,这里的2.12是指Scala的版本。

Spark原先使用akka进行通信,后来从Spark1.3.1版本开始,为了解决大块数据(如shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。

批处理WordCount

package flink;
​
import org.apache.flink.api.COMmon.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Executionenvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
​
/**
 * @classname WordCount
 * @Date 2021/10/4 17:20
 * @Created by gIThub.com/myxiaoxin
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        //创建执行环境,每一个ExecutionEnvironment都是一个flink任务
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //从文件
        String inputPath = "README.md";
        DataSet<String> inputDataset = env.readTextFile(inputPath);
​
        DataSet<Tuple2<String,Integer>> wordCountDataSet = inputDataset
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] words = s.split(" ");
                        for(String word : words){
                            collector.collect(new Tuple2<String,Integer>(word,1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);
        wordCountDataSet.PRint();
    }
}

流处理WordCount

package flink;
​
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
​
/**
 * @Classname StreamWordCount
 * @Date 2021/10/4 19:46
 * @Created by github.com/myxiaoxin
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        ParameterTool parameterTool = ParameterTool.FromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");
​
        DataStream<String> inputDStream = env.socketTextStream(host,port);
        DataStream<Tuple2<String,Integer>> wordCountDataStream = inputDStream
                .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] arr = s.split(" ");
                        for (int i = 0; i < arr.length; i++) {
                            collector.collect(new Tuple2<String,Integer>(arr[i],1));
                        }
                    }
                })
                .keyBy(0)
                .sum(1);
        wordCountDataStream.print().setParallelism(1);
        env.execute();
    }
}

测试——在Linux中通过netcat命令进行发送测试。

nc -lk 7777

脚本宝典总结

以上是脚本宝典为你收集整理的第一章 flink简介与wordcount全部内容,希望文章能够帮你解决第一章 flink简介与wordcount所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。