logstash收集日志并写入kafka再到es集群

发布时间:2022-07-03 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了logstash收集日志并写入kafka再到es集群脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

条件: 有kafka环境

图形架构:

logstash收集日志并写入kafka再到es集群

环境准备

172.31.2.101 es1 + kibana
172.31.2.102 es2
172.31.2.103 es3
172.31.2.104 LOGstash1
172.31.2.105 logstash2
172.31.2.41 zookeePEr + kafka
172.31.2.42 zookeeper + kafka
172.31.2.43 zookeeper + kafka
172.31.2.107 web1

先启动zookeeper

[root@mq1 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
[root@mq2 ~]# /usr/local/zookeeper/bin/zkServer.sh restart
[root@mq3 ~]# /usr/local/zookeeper/bin/zkServer.sh restart

启动kafka

[root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.PRoperties

[root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

[root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

查看端口

[root@mq1 ~]# ss -tanl | grep 9092

LISTEN  0        50          [::ffff:172.31.2.41]:9092                 *:*

web服务器改配置写入到kafka

[root@es-web1 ~]# cat /etc/logstash/conf.d/kafka-nginx-es.conf

input {
  file {
    path => "/VAR/log/nginx/access.log"
    start_posITion => "beginning"
    stat_interval => 3
    type => "nginx-accesslog"
    codec => "json"
 }

 file {
  path => "/apps/nginx/logs/error.log"
  start_position => "beginning"
  stat_interval => 3
  type => "nginx-errorlog"
  }
}

output {
  if [type] == "nginx-accesslog" {
    kafka {
      bootstrap_servers => "172.31.2.41:9092"
      topic_id => "long-linux21-accesslog"
      codec => "json"
  }}

  if [type] == "nginx-errorlog" {
    kafka {
      bootstrap_servers => "172.31.2.41:9092"
      topic_id => "long-linux21-errorlog"
      #codec => "json"
  }}
}

重启

root@long:~# Systemctl restart logstash

在logstash服务器配置写入elasticseArch

[root@logstash1 ~]# cat /etc/logstash/conf.d/kafka-to-es.conf

input {
  kafka {
    bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
    topics => "long-linux21-accesslog"
    codec => "json"
 }

  kafka {
    bootstrap_servers => "172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092"
    topics => "long-linux21-errorlog"
    codec => "json"
  }
}

output {
  if [type] == "nginx-accesslog" {
    elasticsearch {
      hosts => ["172.31.2.101:9200"]
      index => "n19-long-kafka-nginx-accesslog-%{+yyYY.MM.dd}"
  }}

  if [type] == "nginx-errorlog" {
    elasticsearch {
      hosts => ["172.31.2.101:9200"]
      index => "n17-long-kafka-nginx-errorlog-%{+YYYY.MM.dd}"
  }}
}

测试

[root@logstash1 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf -t

重启

root@long:~# systemctl restart logstash

kafak工具

logstash收集日志并写入kafka再到es集群

logstash收集日志并写入kafka再到es集群

写入kibana

logstash收集日志并写入kafka再到es集群

脚本宝典总结

以上是脚本宝典为你收集整理的logstash收集日志并写入kafka再到es集群全部内容,希望文章能够帮你解决logstash收集日志并写入kafka再到es集群所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。