脚本宝典收集整理的这篇文章主要介绍了Kafka连接器建立数据管道,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
最近,有同学留言咨询Kafka连接器的相关内容,今天笔者给大家分享一下Kafka连接器建立数据管道的相关内容。
Kafka连接器是一种用于Kafka系统和其他系统之间进行功能扩展、数据传输的工具。通过Kafka连接器能够简单、快速的将大量数据集移入到Kafka系统,或者从Kafka系统中移出,例如Kafka连接器可以低延时的将数据库或者应用服务器中的指标数据收集到Kafka系统主题中。另外,Kafka连接器可以通过作业导出的方式,将Kafka系统主题传输到二次存储和查询系统中,或者传输到批处理系统中进行离线分析。
Kafka连接器通常用来构建数据管道,一般来说有两种使用场景。1. 开始和结束的端点第一种,将Kafka系统作为数据管道的开始和结束的端点。例如,将Kafka系统主题中的数据移出到HBase数据库,或者把oracle数据库中的数据移入到Kafka系统。
2. 数据传输的中间介质第二种,把Kafka系统作为一个中间传输介质。例如,为了把海量日志数据存储到ElasticSeArch中,可以先把这些日志数据传输到Kafka系统中,然后再从Kafka系统中将这些数据移出到ElasticSearch进行存储。
ElasticSearch是一个基于Lucene(Lucene是一款高性能、可扩展的信息检索工具库)实现的存储介质。它提供了一个分布式多用户能力的全文搜索引擎,基RESTful(一种软件架构风格、设计风格,但是并非标准,只是提供了一组设计原则和约束条件)接口实现。
Kafka连接器的存在,给数据管道带来很重要的价值。例如,Kafka连接器可以作为数据管道各个数据阶段的缓冲区,有效的将消费者实例和生产者实例进行解耦。Kafka系统解除耦合的能力、系统的安全性、数据处理的效率等方面均表现不俗,因而使用Kafka连接器来构建数据管道是一个最佳的选举。
Kafka连接器包含一些重要的特性,并且给数据管道提供了一个成熟稳定的框架。同时,Kafka连接器还提供了一些简单易用的工具库,大大降低的开发人员的研发成本。1. 特性Kafka连接器具体包含的特性如下。
2. 优势在Kafka连接器中有两个核心的概念,它们分别是Source和Sink。其中Source负责将数据导入到Kafka系统,而Sink则负责将数据从Kafka系统中进行导出。Source和Sink在实现数据导入和导出的过程被称之连接器,即Source连接器和Sink连接器。这两种连接器提供了对业务层面数据读取和写入的抽象接口,简化了生命周期的管理工作。在处理数据时,Source连接器和Sink连接器会初始化各自的任务,并将数据结构进行标准化的封装。在实际应用场景中,不同的业务中的数据格式是不一样的,因此,Kafka连接器通过注册数据结构,来解决数据格式验证和兼容性问题。当数据源发生变化时,Kafka连接器会生成新的数据结构,通过不同的处理策略来完成对数据格式的兼容。
在Kafka连接器中存在几个核心的概念,它们分别是连接器实例(Connectors)、任务数(Tasks)、事件线程数(Workers)、转换器(Converters)。1. 连机器实例在Kafka连接器中,连接器实例决定了消息数据的流向,即消息数据从何处复制,以及将复制的消息数据写入到何处。一个连接器实例负责Kafka系统与其他系统之间的逻辑处理,连接器实例通常以JAR包的形式存在,通过实现Kafka系统应用接口来完成。2. 任务数在分布式模式下,每一个连接器实例可以将一个作业切分成多个任务(Task),然后再将任务分发到各个事件线程(Worker)中去执行。任务不会保存当前的状态信息,通常由特定的Kafka主题来保存,例如指定具体属性offset.storage.topic和status.storage.topic的值来保存。在分布式模式中,会存在任务均衡的概念。当一个连接器实例首次提交到Kafka集群,所有的事件线程都会做一个任务均衡的操作,来保证每一个事件线程都运行差不多数量的任务,避免所有任务集中到某一个事件线程。3. 事件线程在Kafka系统中,连接器实例和任务数都是逻辑层面的,需要有具体的线程来执行。在Kafka连接器中,事件线程就是用来执行具体的任务,事件线程包含两种,分别是单机模式和分布式模式。4. 转换器转换器会将字节数据转换成Kafka连接器内部的格式,同时,也能将Kafka连接器内部存储的数据格式转换成字节数据。
连接器作为Kafka的一部分,随着Kafka系统一起发布,所以无需独立安装。在大数据应用场景下,建议在每台物理机上安装一个Kafka。根据实际需求,可以在一部分物理机上启动Kafka实例(即代理节点broker),在另一部分物理机上启动连接器。
在Kafka系统中,Kafka连接器最终是以一个常驻进程的形式运行在后台服务中,它提供了一个用来管理连接器实例的REST API。默认情况下,服务端口地址是8083。
提示:
RePResentational State transfer,简称REST,即表现层状态转移。REST是所有Web应用程序都应用遵守的一种规范。符合REST设计规范的应用接口,即REST API。
在Kafka连接器中,REST API支持获取、写入、创建等接口,具体内容如下图所示:
在Kafka系统中,Kafka连接器目前支持两种运行模式,它们分别是单机模式和分布式模式。
在单机模式下,所有的事件线程都在一个单进程中运行。单机模式使用起来更加简单,特别是在开发和定位分析问题的时候,使用单机模式会比较适合。(1)编辑单机模式配置文件。在单机模式下,主题的偏移量是存储在/tmp/connect.offsets目录下,在$KAFKA_HOME/config目录下有一个connect-standalone.proPErties文件,通过设置offset.storage.file.filename属性值来改变存储路径。每次Kafka连接器启动时,通过加载$KAFKA_HOME/config/connect-file-source.properties配置文件中的name属性来获取主题的偏移量,然后执行后续的读写操作。
# 设置连接器名称 name=local-file-source # 指定连接器类 connector.class=FileStreamSource # 设置最大任务数 tasks.max=1 # 指定读取的文件 file=/tmp/test.txt # 指定主题名 topic=connect_test
(2)在即将读取的文件中,添加数据,具体操作命令如下。# 新建一个test.txt文件并添加数据[hadoop@dn1 ~]$ vi /tmp/test.txt
# 添加内容如下 kafka hadoop kafka-connect # 保存并退出
在使用Kafka文件连接器时,连接器实例会监听配置的数据文件,如果文件中有数据更新,例如:追加新的消息数据。连接器实例会及时处理新增的消息数据。(3)启动Kafka连接器单机模式的命令与启动Kafka代理节点类似,具体操作命令如下。
# 启动一个单机模式的连接器 [hadoop@dn1 bin]$ ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties
(4)使用Kafka系统命令查看导入到主题(connect_test)中的数据,具体操作命令如下。
# 使用Kafka命令查看 [hadoop@dn1 bin]$ ./kafka-console-consumer.sh --zookeeper dn1:2181 --topic connect_test --From-beginning
在分布式模式中,Kafka连接器会自动均衡每个事件线程所处理的任务数。允许用户动态的增加或者减少,在执行任务、修改配置、以及提交偏移量时能够得到容错保障。在分布式模式中,Kafka连接器会在主题中存储偏移量、配置、以及任务状态。建议手动创建存储偏移量的主题,可以按需设置主题分区数和副本数。需要注意的是,除了配置一些通用的属性之外,还需要配置以下几个重要的属性。
在分布式模式中,Kafka连接器配置文件不能使用命令行,需要使用REST API来执行创建、修改和销毁Kafka连接器操作。(1)编辑分布式模式配置文件(connect-distributed.properties)
# 设置Kafka集群地址 bootstrap.servers=dn1:9092,dn2:9092,dn3:9092 # 设置连接器唯一组名称 group.id=connect-cluster # 指定键值对JSON转换器类 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # 启用键值对转换器 key.converter.schemas.enable=true value.converter.schemas.enable=true # 设置内部键值对转换器, 例如偏移量、配置等 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # 设置偏移量存储主题 offset.storage.topic=connect_offsets # 设置配置存储主题 config.storage.topic=connect_configs # 设置任务状态存储主题 status.storage.topic=connect_status # 设置偏移量持久化时间间隔 offset.flush.interval.ms=10000
(2)创建偏移量、配置、以及任务状态主题,具体操作命令如下。
# 创建配置主题 kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partITions 1 --topic connect_configs # 创建偏移量主题 kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6 --topic connect_offsets # 创建任务状态主题 kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6 --topic connect_status
(3)启动分布式模式连接器,具体操作命令如下。
# 启动分布式模式连接器 [hadoop@dn1 bin]$ ./connect-distributed.sh ../config/connect-distributed.properties
(4)执行REST API命令查看当前Kafka连接器的版本号,具体操作命令如下。
# 查看连接器版本号 [hadoop@dn1 ~]$ curl http://dn1:8083/
(5)查看当前已安装的连接器插件,通过浏览器访问http://dn1:8083/connector-plugins地址来查看
(6)创建一个新的连接器实例,具体操作命令如下。
# 创建一个新的连接器实例 [hadoop@dn1 ~]$ curl 'http://dn1:8083/connectors' -X POST -i –H "Content-Type:application/json" -d '{"name":"distributed-console-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max":"1","topic":"distributed_connect_test", "file":"/tmp/distributed_test.txt"}}'
然后在浏览器访问http://dn1:8083/connectors地址查看当前成功创建的连接器实例名称,如下图所示:
(7)查看使用分布式模式导入到主题(distributed_connect_test)中的数据,具体操作命令如下。
# 在文件/tmp/distributed_test.txt中添加消息数据 [hadoop@dn1 ~]$ vi /tmp/distributed_test.txt # 添加如下内容(这条注释不要写入到distributed_test.txt文件中) distributed_kafka kafka_connection kafka hadoop # 然后保存并退出(这条注释不要写入到distributed_test.txt文件中) # 使用Kafka系统命令,查看主题distributed_connect_test中的数据 [hadoop@dn1 ~]$ kafka-console-consumer.sh --zookeeper dn1:2181 –topic distributed_connect_test --from-beginning
Kafka 连接器可以从DB存储或应用程序服务器收集数据到Topic,使数据可用于低延迟的流处理。导出作业可以将数据从Topic传输到二次存储和查询系统,或者传递到批处理系统以便进行离线分析。
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
以上是脚本宝典为你收集整理的Kafka连接器建立数据管道全部内容,希望文章能够帮你解决Kafka连接器建立数据管道所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。