Kafka安装以及验证

发布时间:2022-06-08 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了Kafka安装以及验证脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

目录
  • 1 Kafka安装
    • 1.1 下载安装
    • 1.2 配置启动zookeePEr
    • 1.3 配置kafka
      • 1.3.1 修改配置文件
      • 1.3.2 配置环境变量
      • 1.3.3 配置服务启动脚本
      • 1.3.4 启动kafka服务
    • 1.4 kafka使用简单入门
      • 1.4.1 创建主题topics
      • 1.4.2 发送一些消息
      • 1.4.3 启动消费者
    • 1.5 设置多代理kafka群集
      • 1.5.1 准备配置文件
      • 1.5.2 开启集群另2个kafka服务
      • 1.5.3 在集群中进行操作
  • 注释:第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分区用于此主题,因此只有一行。
  • “leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  • “replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
  • “isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
  • 请注意,Leader: 2,在我的示例中,节点2 是该主题的唯一分区的Leader。
      • 1.5.4 测试集群的容错性
    • 1.6 使用Kafka Connect导入/导出数据
  • 注:Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是连接器,它从输入文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取消息并将每个消息生成为输出文件中的一行。

1 Kafka安装

1.1 下载安装

到官网http://kafka.apache.org/downloads.htML下载想要的版本

注:由于Kafka控制台脚本对于基于UnixWindows的平台是不同的,因此在Windows平台上使用binwindows 而不是bin/ 将脚本扩展名更改为.bat。

[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[root@along ~]# cd /data/kafka_2.11-2.1.0/

1.2 配置启动zookeeper

kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。 点击了解zookeeper安装步骤

1.3 配置kafka

1.3.1 修改配置文件

[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.PRoperties
broker.id=0  
listeners=PLaiNTEXT://localhost:9092  
num.network.threads=3  
num.io.threads=8  
socket.send.buffer.bytes=102400  
socket.receive.buffer.bytes=102400  
socket.request.max.bytes=104857600  
LOG.dirs=/tmp/kafka-logs
num.partITions=1  
num.recovery.threads.per.data.dir=1  
offsets.topic.replication.factor=1  
transaction.state.log.replication.factor=1  
transaction.state.log.min.isr=1  
log.retention.hours=168  
log.segment.bytes=1073741824  
log.retention.check.interval.ms=300000  
zookeeper.connect=localhost:2181  
zookeeper.connection.timeout.ms=6000  
group.initial.rebalance.delay.ms=0

注:可根据自己需求修改配置文件

broker.id:#唯一标识ID listeners=PLAINTEXT://localhost:9092:#kafka服务监听地址和端口 log.dirs:#日志存储目录 zookeeper.connect:#指定zookeeper服务

1.3.2 配置环境变量

[root@along ~]# vim /etc/profile.d/kafka.sh  
export KAFKA_HOME="/data/kafka_2.11-2.1.0"  
export PATH="${KAFKA_HOME}/bin:$PATH"  
[root@along ~]# source /etc/profile.d/kafka.sh

1.3.3 配置服务启动脚本

[root@along ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
   
source /etc/rc.d/init.d/functions  
   
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
   
[ -e /etc/Sysconfig/kafka ] && . /etc/sysconfig/kafka
   
# See how we were called.
case "$1" in  
   
  start)
    echo -n "Starting Kafka:"  
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"  
    echo " done."  
    exit 0
    ;;
   
  stop)
    echo -n "Stopping Kafka: "  
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9"  
    echo " done."  
    exit 0
    ;;
  hardstop)
    echo -n "Stopping (hard) Kafka: "  
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}' | xargs kill -9"  
    echo " done."  
    exit 0
    ;;
   
  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if [ "$c_pid" = "" ] ; then  
      echo "Stopped"  
      exit 3
    else  
      echo "Running $c_pid"  
      exit 0
    fi  
    ;;
   
  restart)
    stop
    start
    ;;
   
  *)
    echo "usage: kafka {start|stop|hardstop|status|restart}"  
    exit 1
    ;;
   
esac

1.3.4 启动kafka服务

后台启动zookeeper服务

[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &

启动kafka服务

[root@along ~]# service kafka start  
Starting kafka (via systemctl): [ OK ]  
[root@along ~]# service kafka status  
Running 86018  
[root@along ~]# ss -nutl  
Netid State      recv-Q Send-Q     Local Address:Port                    Peer Address:Port                                
tcp   LISTEN     0      50                    :::9092                              :::*
tcp   LISTEN     0      50                    :::2181                              :::*

1.4 kafka使用简单入门

1.4.1 创建主题topics

创建一个名为along的主题,它只包含一个分区,只有一个副本:

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".

如果我们运行list topic命令,我们现在可以看到该主题:

[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181  
along

1.4.2 发送一些消息

Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message

1.4.3 启动消费者

Kafka还有一个命令行使用者,它会将消息转储到标准输出。

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --From-beginning
This is a message
This is another message

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

1.5 设置多代理kafka群集

到目前为止,我们一直在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动一些代理实例之外没有太多变化。但是为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

1.5.1 准备配置文件

[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.11-2.1.0]# vim config/server-1.properties
    broker.id=1  
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1  
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
    broker.id=2  
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

注:该broker.id 属性是群集中每个节点的唯一且永久的名称。我们必须覆盖端口和日志目录,因为我们在同一台机器上运行这些,并且我们希望让所有代理尝试在同一端口上注册或覆盖彼此的数据。

1.5.2 开启集群另2个kafka服务

[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &  
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &  
[root@along ~]# ss -nutl  
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                            
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*

1.5.3 在集群中进行操作

现在创建一个复制因子为3的新主题my-replicated-topic

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

在一个集群中,运行describe topics命令查看哪个broker正在做什么

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
    Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

注释:第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分区用于此主题,因此只有一行。

“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。

“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。

“isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。

请注意,Leader: 2,在我的示例中,节点2 是该主题的唯一分区的Leader。

可以在我们创建的原始主题上运行相同的命令,以查看它的位置

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along  
Topic:along PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: along    Partition: 0 Leader: 0 Replicas: 0 Isr: 0

向我们的新主题发布一些消息:

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>;my test message 1
>my test message 2

现在让我们使用这些消息:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

1.5.4 测试集群的容错性

现在让我们测试一下容错性。Broker 2 充当leader 所以让我们杀了它:

[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737  
[root@along ~]# kill -9 106737
[root@along ~]# ss -nutl
tcp LISTEN 0      50      ::ffff:127.0.0.1:9092                              :::*                         
tcp LISTEN 0      50      ::ffff:127.0.0.1:9093                              :::*

leader 已切换到其中一个从属节点,节点2不再位于同步副本集中:

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1

即使最初接受写入的leader 已经失败,这些消息仍可供消费:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

1.6 使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,您可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。

Kafka ConnectKafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。我们将了解如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。

首先创建一些种子数据进行测试:

[root@along ~]# echo -e "foonbar" > test.txt
或者在Windows上:
> echo foo> test.txt
> echo bar>> test.txt

接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。提供三个配置文件作为参数。

第一个始终是Kafka Connect流程的配置,包含常见配置,例如要连接的Kafka代理和数据的序列化格式。

其余配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:  
... ...

注:Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是源连接器,它从输入文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取消息并将每个消息生成为输出文件中的一行。

验是否导入成功(另起终端) 在启动过程中,您将看到许多日志消息,包括一些指示正在实例化连接器的日志消息。

  • 一旦Kafka Connect进程启动,源连接器应该开始从test.txt主题读取行并将其生成到主题connect-test,并且接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:
[root@along ~]# cat test.sink.txt  
foo  
bar

请注意,数据存储在Kafka主题中connect-test,因此我们还可以运行控制台使用者来查看主题中的数据(或使用自定义使用者代码来处理它):

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

继续追加数据,验证

[root@along ~]# echo Another line>> test.txt
[root@along ~]# cat test.sink.txt
foo
bar
Another line
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"

脚本宝典总结

以上是脚本宝典为你收集整理的Kafka安装以及验证全部内容,希望文章能够帮你解决Kafka安装以及验证所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。