初识kafka,环境部署与(Springboot+SpringCloud+Eurka)应用(Linux)

发布时间:2022-06-28 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了初识kafka,环境部署与(Springboot+SpringCloud+Eurka)应用(Linux)脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

Kafka介绍和部署应用

kafka介绍        

kafka是一个高吞吐量的分布式订阅消息系统,可以处理消费者在网站中的所有动作流数据,像hadoop一样的日志数据和离线分析系统,但是又要求实时处理的限制,kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

kafka的特性

  • 通过O(1)的磁盘数据提供消息的持久化,这种结构对于即使数据量为TB级的消息存储也能够保持长时间的稳定性能。

  • 高吞吐量F1a;即使是非常普通的硬件Kafka也能支持每秒数百万计的消息。

  • 支持通过Kafka服务器和消费机集群来区分消息。

  • 这次Hadoop并行数据加载。

流媒体平台三个关键功能

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统

  • 以容错的持久方式存储记录流

  • 记录发生时处理流

kafka的应用场景

  1. 构建可在系统或应用程序之间可靠获取的实时流数据管道
  2. 构建转换响应数据的实时流应用程序

例如:

  • 日志收集:可以用kafka收集各种服务的LOG,通过kafka以统一接口服务的方式开放给各种consumer(消费者)

  • 消息系统:解耦生产者和消费者、缓存消息等

  • 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动消息可以被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库

  • 运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告

  • 流式处理:比如spark streaming和storm。

kafka的应用原理

1、Kafka作为一个集群运行在一个或者多个可跨多个数据中心的服务器上

2、Kafka集群以称为** topics主题**的类别存储记录流

3、每条记录都包含一个键,一个值和一个时间戳

kafka的核心

  1. PRoducer API(生产者API)允许应用程序发布记录流至一个或多个kafka的topics(主题)。

  2. Consumer API(消费者API)允许应用程序订阅一个或多个topics(主题),并处理所产生的对他们记录的数据流。

  3. Streams API(流API)允许应用程序充当流处理器,从一个或多个topics(主题)消耗的输入流,并产生一个输出流至一个或多个输出的topics(主题),有效的变换所述的输入流,以输出流。

  4. Connector API(连接器API)允许构建和运行kafka topics(主题)连接到现有的应用程序或数据系统中重用生产者或消费者。例如,关系型数据库的连接器可能捕获对表的每个更改。

在Kafka中,客户端和服务器之间的通信是通过简单,高性能,语言无关的TCP协议完成的,Kafka提供Java客户端,但是客户端提供多语言版本。

Kafka的消费模式

kafka的消费模式主要有两种,一种是一对一的消费,也就是点对点的通信,即一个发送一个接收。第二种为一对多消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。

  • 一对一消费:消息生产者发布消息到队列中,通知消费者从队列中拉取消息进行消费。`消息被消费之后则删除`,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。

  • 一对多消费:这种模式称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此Topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,`消费者消费数据之后,数据不会被清除`,Kafka会默认保存一段时间,然后再删除。

初识kafka,环境部署与(Springboot+SpringCloud+Eurka)应用(Linux)

Kafka基础结构

Kafka像其他MQ一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群的broker、消费者Consumer,注册消息ZookeePEr等。

zookeeper(Linux)部署与应用

  1. Producer:消息生产者,向kafka发送消息的角色。

  2. Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。

  3. Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费。

  4. Broker:代理,经纪人,一台kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。

  5. Topic:主题,可以理解成一个队列,生产者和消费者都是面向一个Topic。

  6. PartITion:分区,为了实现拓展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区内部有序,但不能保证全局有序)

  7. Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follwer。

  8. Leader:每个分区多个副本的主角色,生产者发送数据对象,以及消费者消费数据的对象都是Leader。

  9. Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follwer会成为新的Leader。

上述一个Topic会产生多个分区Partition,分区中区分为Leader和Follwer,消息一般发送到Lerder,Follwer通过数据的同步与Leader保持同步,消费的话也是再Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follwer中的消息,当Leader发生故障的时候,某个Follwer会成为主节点,此时会对齐消息的偏移量。

消息队列的特性

       耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,也不会影响到当前的功能。

传统调用方式:A -调用-> B  

中间件:A-发送->kafka->订阅->B

        异步处理,代替了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下的让其他业务处理接口从消息队列中拉取消费处理即可。

        流量削峰,高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而止了系统的高请求,减轻服务器的请求处理压力

kafka的部署应用(单机环境安装,基于Linux)

kafka需要依赖java环境运行,kafka的安装包可以在这里下载:

kafka和zookeeper的安装包(Liunx)

版本是:kafka_2.13-3.0.0

将包下载到相关的目录,这里新建了一个kafka的文件夹,然后解压到指定目录中;

cd /kafka
tar -zxvf kafka_2.13-3.0.0.tgz` 重命名为:`;mv kafka_2.13-3.0.0 kafka_2.13

配置日志:进入到kafka/kafka_2.13中,创建日志目录logs;

cd /kafka/kafka_2.13
mkdir logs

修改kafka配置文件;

进入到目录
/kafka/kafka_2.13/config
修改server.properties文件
编辑相应的参数vim server.properties 
broker.id=0
port=9092
host.name=127.0.0.1 # 服务器ip地址,修改为自己的服务器ip
log.dirs=/kafka/kafka_2.13/logs # 日志存放路径,上面创建的目录  
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
然后保存退出
kafka需要基于Zookeeper服务使用,因此需要安装zookeeper环境先`;(详见zookeeper部署与应用)
注释去掉,`listeners=PLaiNTEXT://:9092
注释去掉,把`advertised.listeners`值改为`PLAINTEXT://host_ip:9092(改成服务器ip)

启动kafka

Kafka支持内置的Zookeeper和引用外部的Zookeeper,这里使用远程服务器的Zookeeper。

先启动zookeeper

进入bin目录,启动:

./kafka-server-start.sh /config/server.properties &

应用整合及使用(基于SpringBoot、SpringCloud、Eruka)

Springboot中使用kafka

pom文件依赖

        <!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置yML信息

spring:
    # kafka配置信息
    kafka:
        bootstrap-servers: 192.168.115.79:9092 # 指定kafka服务地址 集群用逗号隔开
        producer:
        retries: 1 # 发生错误后,消息重发次数。
        #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
        batch-Size: 16384
        buffer-memory: 33554432 # 设置生产者内存缓冲区的大小。
        key-serializer: org.apache.kafka.COMmon.serialization.StringSerializer # 键的序列化方式
        value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式
        # =0:生产者在成功写入消息之前不会等待任何来自服务器的响应。
        # =1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
        # =all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
        acks: 1
        consumer:
        # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
        auto-commit-interval: 1s
        # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
        # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
        # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
        auto-offset-reset: earliest
        # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        enable-auto-commit: false
        # 键的反序列化方式
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        # 值的反序列化方式
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
        # 在侦听器容器中运行的线程数。
        concurrency: 5
        #listner负责ack,每调用一次,就立即commit
        ack-mode: manual_immediate
        missing-topics-fatal: false

封装生产者_KafkaProducer

/**
    * kafka生产者_配置
    */
    @component
    @Slf4j
    public class KafkaProducer {

        /**
        * kafka使用模板
        */
        @Autowired
        private KafkaTemplate<String,Object> kafkaTemplate;

        /**
        * 自定义topic
        */
        public static final String TOPIC_test1 = "topic_test1";

        public static final String TOPIC_test2 = "topic_test2";

        /**
        * 自定义组
        */
        public static final String TOPIC_GROUP1 = "topic_group1";

        public static final String TOPIC_GROUP2 = "topic_group2";

        /**
        * 生产消息_发送
        */
        public void send(Object obj) {
            String obj2String = JSONObject.toJSONString(obj);

            log.info("准备发送消息为:{}",obj2String);

            // 发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST1,obj);

            // 监听消息加入队列结果返回
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                /**
                * 发送失败处理
                * @param throwable
                */
                @override
                public void onFailure(Throwable throwable) {
                    log.info(TOPIC_TEST1 + " - 生产者 发送消息失败:" + throwable.getMessage());
                }

                /**
                * 发送成功处理
                * @param stringObjectSendResult
                */
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    // 发送成功处理
                    log.info(TOPIC_TEST1 + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
                }
            });
        }
    }

封装消费者_KafkaConsumer

/**
    * kafka消费者_配置_topic
    * 实现topic监听
    */
    @Component
    @Slf4j
    public class KafkaConsumer {
        @KafKalistener(topics = KafkaProducer.TOPIC_TEST1,groupId = KafkaProducer.TOPIC_GROUP1)
        public void topic_test1(ConsumerRecord<?,?> record, Acknowledgment ack,
                                @Header(KafkaHeaders.RECeiVED_TOPIC) String topic) {
            Optional message = Optional.ofNullable(record.value());
            if (message.isPresent()) {
                Object msg = message.get();
                log.info("topic_test1 消费了:Topic:" + topic + ",Message" + msg);
                ack.acknowlEdge();
            }
        }

        @KafkaListener(topics = KafkaProducer.TOPIC_TEST2,groupId = KafkaProducer.TOPIC_GROUP2)
        public void topic_test2(ConsumerRecord<?,?> record, Acknowledgment ack,
                                @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
            Optional message = Optional.ofNullable(record.value());
            if (message.isPresent()) {
                Object msg = message.get();
                log.info("topic_test2 消费了:Topic:" + topic + ",Message" + msg);
                ack.acknowledge();
            }
        }
    }

测试使用_KafkaController

/**
    * kafka接口测试
    */
    @RestController
    @Slf4j
    @RequestMapping("kafka")
    @Api(value = "测试kafka接口",tags = "测试kafka接口实现")
    public class KafkaController {

        /**
        * 注入生产者
        */
        @Autowired
        private KafkaProducer kafkaProducer;

        @GetMapping("send")
        @Transactional(rollbackFor = Exception.class)
        public void send() {
            kafkaProducer.send("这是 kafka 的测试 topic 数据");
        }
    }

脚本宝典总结

以上是脚本宝典为你收集整理的初识kafka,环境部署与(Springboot+SpringCloud+Eurka)应用(Linux)全部内容,希望文章能够帮你解决初识kafka,环境部署与(Springboot+SpringCloud+Eurka)应用(Linux)所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。