Fork me on GitHub

ELK + Kafka日志收集平台部署

最近生产环境中准备使用ELK,于是自己动手开始搭建环境测试。

ELK架构拓扑


  • 使用一台Nginx代理访问kibana的请求
  • 两台es组成es集群,并且在两台es上面都安装kibana
  • 中间三台服务器就是kafka(zookeeper)集群
  • 最后面的就是一大堆的生产服务器,上面使用的是logstash

角色信息

主机名 IP地址 操作系统 用途
nginx-upstream 10.16.50.22 Centos 6.5 x64 反向代理至Kibana
es-kibana-1 10.16.50.16 Centos 6.5 x64 es集群+Kibana
es-kibana-2 10.16.50.17 Centos 6.5 x64 es集群+Kibana
kafka-zookeeper-1 10.16.50.19 Centos 6.5 x64 kafka+zookeeper集群
kafka-zookeeper-2 10.16.50.20 Centos 6.5 x64 kafka+zookeeper集群
kafka-zookeeper-3 10.16.60.21 Centos 6.5 x64 kafka+zookeeper集群
nginx-client 10.16.50.18 Centos 6.5 x64 模拟线上生产环境,产生日志

软件选用

  • elasticsearch-1.7.3.tar.gz
  • kibana-4.1.2-linux-x64.tar.gz
  • logstash-2.4.0.noarch.rpm

以上软件均可从官网下载,https://www.elastic.co/downloads

  • nginx-1.8.1.tar.gz
  • jdk-8u101-linux-x64.rpm
  • kafka_2.11-0.8.2.1.tgz

部署步骤

  1. ES集群安装配置
  2. Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志)
  3. Kafka(zookeeper)集群配置;(Logstash写入数据到Kafka消息系统)
  4. Kibana部署
  5. Nginx负载均衡Kibana请求

ES集群安装配置


所有服务器时间同步:
1
ntpdate 133.100.11.8

安装java-1.8.0以及依赖包


1
2
3
4
5
6
7
8
9
10
yum -y install jdk-8u101-linux-x64.rpm git
vim /etc/profile.d/java.sh #配置java环境变量
export JAVA_HOME=/usr/java/jdk1.8.0_101/
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
. /etc/profile.d/java.sh #重载配置文件
java -version #检测jdk是否安装成功
java version "1.8.0_101"
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.101-b13, mixed mode)

安装elasticsearch软件包


1
2
3
wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.g
tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

修改ES配置文件


es_kibana-1 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vim /usr/local/elasticsearch/config/elasticsearch.yml
cluster.name: es-cluster #组播的名称地址
node.name: "node1" #节点名称,不能和其他节点重复
node.master: true #节点能否被选举为master
node.data: true #节点是否存储数据
index.number_of_shards: 5 #索引分片的个数
index.number_of_replicas: 1 #分片的副本个数
path.data: /usr/local/elasticsearch/config/ #配置文件的路径
path.data: /work/es/data #数据目录路径
path.work: /work/es/work #工作目录路径
path.logs: /work/es/logs #日志文件路径
path.plugins: /work/es/plugins #插件路径
bootstrap.mlockall: true #内存不向swap交换
http.port: 9200 #启用http
mkdir -pv /work/es/{logs,data,plugins,work} #创建所需要的工作目录
git clone https://github.com/elastic/elasticsearch-servicewrapper.git
mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
/usr/local/elasticsearch/bin/service/elasticsearch install
#这时就会在/etc/init.d/目录下安装上es的管理脚本啦
#修改其配置:
vim /etc/init.d/elasticsearch
ES_HOME=/usr/local/elasticsearch

es_kibana-2 的配置只需要把node.name和IP地址改一下即可

检查其服务是否正常


1
2
3
4
ss -tanl
# 确定9200和9300端口启动
# 9200:集群之间事务通信
# 9300:集群选举等

查看健康信息


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
curl -XGET 'http://10.16.50.16:9200/_cluster/health?pretty'
{
"cluster_name" : "es-cluster",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 2,
"active_primary_shards" : 0,
"active_shards" : 0,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0
}

查看节点数


1
2
3
4
5
curl -XGET 'http://10.16.50.16:9200/_cat/nodes?v'
host ip heap.percent ram.percent load node.role master name
es-kibana-1 10.16.50.16 10 17 0.30 d m node1
es-kibana-2 10.16.50.17 10 17 0.37 d * node2
# 由此可见,10.16.50.16已经成为主节点

安装head插件


1
/usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

验证head插件


  • 好了,es集群终于部署完成…

Logstash客户端安装配置


在nginx_client上安装logstash

安装依赖环境


1
2
yum -y install jdk-8u101-linux-x64.rpm
# 配置环境变量同上

安装logstash


1
yum -y install logstash-2.4.0.noarch.rpm

编写一个logstash配置文件


1
2
3
4
5
6
7
8
9
vim /etc/logstash/conf.d/logstash_server.conf
input {
stdin {} # 数据的输入是标准输入
}
output {
elasticsearch { # 数据的输出是es集群
hosts => ["10.16.50.16:9200","10.16.50.17:9200"]
}
}

检查配置文件是否有语法错误


1
2
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash_server.conf --configtest
Configuration OK

启动logstash,并写入数据


1
2
3
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash_server.conf
test ELK
chen-hao.com.cn


上图已经看到logstash已经可以正常工作了

下面演示一下如何收集nginx日志信息


这里为了省事,就直接yum安装nginx,安装完毕后并启动nginx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
vim /etc/logstash/conf.d/logstash_nginx.conf
input {
file {
path => ["/var/log/nginx/access.log"] # 这是日志文件的绝对路径
type => "nginx_log" # 日志类型
start_position => "beginning" # 这个表示从messages的第一行读取
}
}
output {
elasticsearch {
hosts => ["10.16.20.19:9200","10.16.20.20:9200"]
index => "system-messages-%{+YYYY-MM}" # 这里将按照这个索引格式来创建索引
}
}

检测配置无误后,启动logstash,访问nginx网页,让我们再看head这个插件的web页面

  • nginx日志我们已经成功的收集,并且已经写入到es集群中,那上面的演示是logstash直接将日志写入到es集群中的,这种场合我觉得如果量不是很大的话直接像上面已将将输出output定义到es集群即可,如果量大的话需要加上消息队列来缓解es集群的压力。前面已经提到了我这边之前使用的是单台redis作为消息队列,但是redis不能作为list类型的集群,也就是redis单点的问题没法解决,所以这里我选用了kafka ;

Kafka集群安装配置


搭建kafka集群时,需要提前安装zookeeper集群(同样需要依赖jdk),当然kafka已经自带zookeeper程序只需要解压并且安装配置就行了

解压软件包(3个节点都需要配置)


1
2
3
4
wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
tar -zxf kafka_2.11-0.8.2.1.tgz -C /usr/local/
cd /usr/local/
ln -sv kafka_2.11-0.8.2.1 kafka

配置hosts

1
2
3
4
vim /etc/hosts
10.16.50.19 kafka-zookeeper-1
10.16.50.20 kafka-zookeeper-2
10.16.50.21 kafka-zookeeper-3

配置zookeeper集群,修改配置文件


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vim /usr/local/kafka/config/zookeeper.propertie
dataDir=/work/zookeeper
clientPort=2181
maxClientCnxns=0
tickTime=2000
initLimit=20
syncLimit=10
server.2=10.16.50.19:2888:3888
server.3=10.16.50.20:2888:3888
server.4=10.16.50.21:2888:3888
# 说明:
# tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
# 2888端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
# 3888端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

创建zookeeper所需要目录及文件


1
2
3
4
mkdir /work/zookeeper
# 在/work/zookeeper目录下创建myid文件,里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动的
echo 2 > /work/zookeeper/myid
# 另外两个节点分别替换为3 、 4 即可

kafka配置


1
2
3
4
5
6
7
8
vim /usr/local/kafka/config/server.properties
broker.id=2 # 唯一,填数字,本文中分别为2/3/4
port=9092
host.name=10.16.50.19
log.dirs=/work/kafka-logs # 该目录可以不用提前创建,在启动时自己会创建
zookeeper.connect=10.16.50.19:2181,10.16.50.20:2181,10.16.50.21:2181
num.partitions=16 # 需要配置较大 分片影响读写速度
log.retention.hours=168 # 时间按需求保留过期时间 避免磁盘满

修改完毕配置之后我们就可以启动了,这里先要启动zookeeper集群,才能启动kafka


启动顺序分别为 kafka1 –> kafka2 –>kafka3
1
2
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & # 启动命令
/usr/local/kafka/bin/zookeeper-server-stop.sh # 停止命令

zookeeper服务检查


1
2
3
4
5
6
7
8
9
10
11
12
13
[root@kafka-zookeeper-1 zookeeper]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 ::ffff:10.16.20.21:3888 :::* LISTEN 12223/java
tcp 0 0 :::2181 :::* LISTEN 12223/java
[root@kafka-zookeeper-2 work]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 ::ffff:10.16.20.22:3888 :::* LISTEN 10370/java
tcp 0 0 :::2181 :::* LISTEN 10370/java
tcp 0 0 ::ffff:10.16.20.22:2888 :::* LISTEN 10370/java
[root@kafka-zookeeper-3 work]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 ::ffff:10.16.20.24:3888 :::* LISTEN 10329/java
tcp 0 0 :::2181 :::* LISTEN 10329/java
# 可以看出,如果哪台是Leader,那么它就拥有2888这个端口

ok. 这时候zookeeper集群已经启动起来了,下面启动kafka,也是依次按照顺序启动

启动kafka


1
2
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
cat nohup.log 确认没有报错信息

此时三台上面的zookeeper及kafka都已经启动完毕,来检测以下吧

建立一个主题


1
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test_ch

查看有哪些主题已经创建


1
2
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.16.50.20:2181
test_ch

查看test_ch这个主题的详情


1
2
3
4
5
6
7
8
9
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.16.50.20:2181 --topic test_ch
Topic:test_ch PartitionCount:1 ReplicationFactor:3 Configs:
trueTopic: test_ch Partition: 0 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2
# 主题名称:test_ch
# Partition:只有一个,从0开始
# leader :id为4的broker
# Replicas 副本存在于broker id为2,3,4的上面
# Isr:活跃状态的broker

我们在zk2上,开一个终端,发送消息至kafka(zk2模拟生产者)


1
2
3
[root@kafka-zookeeper-2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.16.50.19:9092 --sync --topic test_ch
123456
test

我们在zk3上,开一个终端,显示消息的消费(zk3模拟消费者)


1
2
3
[root@kafka-zookeeper-3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.16.50.19:2181 --topic test_ch --from-beginning
test
12344567

如果能够像上面一样能够接收到生产者发过来的消息,那说明基于kafka的zookeeper集群就成功啦

下面我们将nginx-client上面的logstash的输出改到kafka上面,将数据写入到kafka中


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
vim /etc/logstash/conf.d/logstash_kafka.conf
input {
file {
type => "system-message"
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
stdout { codec => rubydebug }
kafka {
bootstrap_servers => "10.16.50.19:9092,10.16.50.19:9092,10.16.50.19:9092"
topic_id => "system-messages"
compression_type => "snappy"
}
}

检测配置并运行


1
2
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash_kafka.conf --configtest --verbose
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash_kafka.conf

验证数据是否写入到kafka,这里我们检查是否生成了一个叫system-messages的主题


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.16.50.19:2181
summer
system-messages
test_ch
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.16.50.19:2181 --topic system-messages
Topic:system-messages PartitionCount:8 ReplicationFactor:1 Configs:
trueTopic: system-messages Partition: 0 Leader: 3 Replicas: 3 Isr: 3
trueTopic: system-messages Partition: 1 Leader: 4 Replicas: 4 Isr: 4
trueTopic: system-messages Partition: 2 Leader: 2 Replicas: 2 Isr: 2
trueTopic: system-messages Partition: 3 Leader: 3 Replicas: 3 Isr: 3
trueTopic: system-messages Partition: 4 Leader: 4 Replicas: 4 Isr: 4
trueTopic: system-messages Partition: 5 Leader: 2 Replicas: 2 Isr: 2
trueTopic: system-messages Partition: 6 Leader: 3 Replicas: 3 Isr: 3
trueTopic: system-messages Partition: 7 Leader: 4 Replicas: 4 Isr: 4
trueTopic: system-messages Partition: 8 Leader: 2 Replicas: 2 Isr: 2
trueTopic: system-messages Partition: 9 Leader: 3 Replicas: 3 Isr: 3
trueTopic: system-messages Partition: 10 Leader: 4 Replicas: 4 Isr: 4
trueTopic: system-messages Partition: 11 Leader: 2 Replicas: 2 Isr: 2
trueTopic: system-messages Partition: 12 Leader: 3 Replicas: 3 Isr: 3
trueTopic: system-messages Partition: 13 Leader: 4 Replicas: 4 Isr: 4
trueTopic: system-messages Partition: 14 Leader: 2 Replicas: 2 Isr: 2
trueTopic: system-messages Partition: 15 Leader: 3 Replicas: 3 Isr: 3

  • 可以看出,这个主题生成了16个分区,每个分区都有对应自己的Leader,但是我想要有10个分区,3个副本如何办?还是跟我们上面一样命令行来创建主题就行,当然对于logstash输出的我们也可以提前先定义主题,然后启动logstash 直接往定义好的主题写数据就行啦,命令如下:
1
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.16.20.21:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME
  • 我们将logstash收集到的数据写入到了kafka中了,在实验过程中我使用while脚本测试了如果不断的往kafka写数据的同时停掉两个节点,数据写入没有任何问题。

那如何将数据从kafka中读取然后给我们的es集群呢?那下面我们在kafka集群上安装Logstash,安装步骤不再赘述;三台上面的logstash 的配置如下,作用是将kafka集群的数据读取然后转交给es集群,这里为了测试我让他新建一个索引文件,注意这里的输入日志还是messages,主题名称还是“system-messages”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vim /etc/logstash/conf.d/logstash_es.conf
input {
kafka {
zk_connect => "10.16.50.19:2181,10.16.50.20:2181,10.16.50.21:2181"
topic_id => "system-messages"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["10.16.50.16:9200","10.16.50.17:9200"]
index => "test-system-messages-%{+YYYY-MM}"
}
}

在三台kafka上面启动Logstash


1
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash_es.conf

在nginx-client上写入测试内容,即nginx-client上面利用message这个文件来测试

1
2
[root@nginx-client work]# echo 'chen-hao.com.cn' >> /var/log/messages
[root@nginx-client work]# echo 'ELK部署集群' >> /var/log/messages

客户端写入到kafka集群的同时也将其输入到终端,这里写入了2条内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"message" => "chen-hao.com.cn",
"@version" => "1",
"@timestamp" => "2016-11-16T06:18:32.565Z",
"path" => "/var/log/messages",
"host" => "0.0.0.0",
"type" => "system-message"
}
{
"message" => "ELK部署集群",
"@version" => "1",
"@timestamp" => "2016-11-16T06:18:46.584Z",
"path" => "/var/log/messages",
"host" => "0.0.0.0",
"type" => "system-message"
}

接下来可以看看我们的es管理界面


OK,我们的E+L部署完毕

Kibana部署


Kibana的作用,就是一个展示工具,报表内容非常的丰富,下面我们在两台es上面搭建两套kibana

安装kibana软件包


1
2
3
4
5
6
7
8
9
10
11
wget https://download.elastic.co/kibana/kibana/kibana-4.1.2-linux-x64.tar.gz
tar -zxf kibana-4.1.2-linux-x64.tar.gz -C /usr/local/
cd /usr/local/
ln -sv kibana-4.1.2-linux-x64 kibana
<h3>修改配置文件</h3>
```bash
vim /usr/local/kibana/config/kibana.yml
server.port: 5601 #默认端口可以修改的
server.host: "0.0.0.0" #kibana监听的ip
elasticsearch.url: "http://localhost:9200" #由于es在本地主机上面,所以这个选项打开注释即可

提供kibana服务管理脚本


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
cat /etc/init.d/kibana
#!/bin/bash
#chkconfig: 2345 55 24
#description: kibana service manager
KIBBIN='/usr/local/kibana/bin/kibana'
LOCK='/usr/local/kibana/locks'
START() {
trueif [ -f $LOCK ];then
truetrueecho -e "kibana is already \033[32mrunning\033[0m, do nothing."
trueelse
truetrueecho -e "Start kibana service.\033[32mdone\033[m"
truetruecd /usr/local/kibana/bin
nohup ./kibana & >/dev/null
touch $LOCK
truefi
}
STOP() {
trueif [ ! -f $LOCK ];then
truetrueecho -e "kibana is already stop, do nothing."
trueelse
truetrueecho -e "Stop kibana serivce \033[32mdone\033[m"
truetruerm -rf $LOCK
truetrueps -ef | grep kibana | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null
truefi
}
STATUS() {
Port=$(netstat -tunl | grep ":5602")
trueif [ "$Port" != "" ] && [ -f $LOCK ];then
truetrueecho -e "kibana is: \033[32mrunning\033[0m..."
trueelse
truetrueecho -e "kibana is: \033[31mstopped\033[0m..."
truefi
}
case "$1" in
start)
trueSTART
true;;
stop)
trueSTOP
true;;
status)
trueSTATUS
true;;
restart)
trueSTOP
sleep 2
START
true;;
*)
trueecho "Usage: /etc/init.d/kibana (|start|stop|status|restart)"
true;;
esac

启动kibana服务


1
2
chkconfig --add kibana
service kibana start

访问kibana页面



能成功的访问5601端口,那我把es1这台的配置放到es2上面去然后启动,效果跟访问es1一样

Nginx负载均衡kibana的请求


在nginx-upstream上安装nginx
1
yum -y install nginx

编写配置文件nginx.conf


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vim /etc/nginx/nginx.conf
upstream es {
server 10.16.50.17:5601 max_fails=3 fail_timeout=30s;
server 10.16.50.16:5601 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://es/;
index index.html index.htm;
#auth
auth_basic "ELK Private";
auth_basic_user_file /etc/nginx/.htpasswd;
}
}

创建认证


1
2
3
4
5
htpasswd -cm /etc/nginx/.htpasswd elk
New password:
Re-type new password:
Adding password for user elk
nginx -t && nginx -s restart #检测并启动nginx

入认证用户及密码就可访问http://10.16.50.22/

  • 基本上ELK集群算是安装完毕,那么在下次将介绍线上各种日志的方法和方式


    本文标题   :   ELK + Kafka日志收集平台部署

    文章作者   :   火柴

    发布时间   :   2016年10月14日 - 09时10分

    本文链接   :   http://www.chen-hao.com.cn/ELK-Kafka日志收集平台部署.html

    本文字数   :   本文一共有3,818字

    许可协议   :   Attribution-NonCommercial 4.0

    © 转载请保留以上信息,谢谢合作。