Flume

一.Flume是什么

Flume是一个分布式的,可靠的,高可用的海量日志采集,聚合,传输系统,以Event为单位进行数据传输。当Source捕获事件后进行特定格式的转换,然后source将事件推入channel中,channel可以看做是一个缓冲区,它将保存事件直到Sink处理完该事件.Sink负责持久化日志或者把事件推向另一个Source.

二.Flume的三大组件

source

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source

channel

连接Source和Sink的组件,可以认为是一个缓冲区,常用的channel为File Channle,Memory Channel

sink

从Channel中取出数据,然后将数据传输到文件系统或者作为其它agent的source

三.Flume的操作

1.监控指定端口,并采集数据,输出到控制台

source使用netcat,监控指定的端口

先检测要监听的端口是否已经被占用

1
2
3
sudo netstat -tunlp | grep 44444
参数说明:
-t:tcp -u:udp -n:网络连接 -l:listener p:进程

channel使用memory

sink使用logger

创一个配置文件:vi netcat.flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Name the components on this agent
a1.sources = r1 // a1:agent的名称 r1:source的名称
a1.sinks = k1 // k1:sink的名称
a1.channels = c1 //c1:channel的名称

# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost //绑定本机
a1.sources.r1.port = 44444 //监听对应端口

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 // source输出到指定channel
a1.sinks.k1.channel = c1 //channel到指定的sink

执行命令,启动agent:

1
2
3
flume-ng agent -c flume/conf/ -f netcat.flm -n a1 
-Dflume.root.logger=INFO,console
//-c:指定flume的配置文件 -f:指定要执行的配置文件 -n:agent的名字,要与配置中一致

使用telnet测试

1
telnet local host message

2.实时采集数据并输出到控制台

​ source使用exec

​ 配置文件 vi exec.flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/briup/log/test.log # 要监控的文件

a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行agent

1
flume-ng agent -c flume/conf/ -f exec.flm -n a1 -Dflume.root.logger=INFO,console

3.Spool监测配置的目录下新增的文件

source: spooldir

1
mkdir spool //先创建一个目录,指定这个目录为要监听的目录

配置文件:vi spool.flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir =/home/master/spool
a1.sources.r1.fileHeader = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent

1
flume-ng agent -c flume/conf/ -f spool.flm -n a1 -Dflume.root.logger=INFO,console

4.Syslogtcp监听TCP的端口做为数据源

source:syslogtcp

vi syslogtcp.flm //配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent

1
flume-ng agent -c flume/conf/ -f syslogtcp.flm -n a1 -Dflume.root.logger=INFO,console

测试

1
echo "hello world" | nc localhost 5140

5.读取本地文件到hdfs

source:exec sink:hdfs

vi flume_hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

执行

1
flume-ng agent -c flume/conf/ -f flume_hdfs.conf -n a2

三.agent执行

  1. source接受event
  2. 在进入channel之前,被拦截器拦截
  3. 拦截器处理后,进入channel选择器,根据channel选择器的选择结果,将事件写入对应的channel
  4. Sink选择器选择其中一个Sink去获取Channel数据,并将数据写入到下一个阶段

channel选择器:

1. Replicating Channel Selector:将source发过来的events发往所有channel
2. Multiplexing channel Selector:可以将事件配置发往哪些Channel

四.Flume的可靠性

flume利用事物的特性来保证event传输过程的可靠性。sink必须在event传入channel或者event已经被传到下一个agent中,又或者被保存到文件系统中才将event从channel中移除,这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠

事物机制:flume使用两个独立的事物分别负责从source到channel(put),channel到sink(take)

比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成

五.flume的高可用

1.故障转移

利用备份agent,当原来的agent挂掉后,切换到备份agent,需要使用到一个前置agent,或者是一个java程序来读取原来的数据,通过sink组(processor)输出到不同的agent.

配置实现

首先配置前置的agent,vi pre_flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#这个是配置failover的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#处理的类型是failover
a1.sinkgroups.g1.processor.type = failover
#优先级,数字越大优先级越高,每个sink的优先级必须不相同
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 6666

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

第二配置agent与back_agent vi agent.flm vi back_agent.flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#agent.flm
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
Describe the sink
a1.sinks.k1.type = logger
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#back_agent.flm
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
Describe the sink
a1.sinks.k1.type = logger
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

然后就可以使用命令启动这些配置了,先启动agent和back_agent,再启动pre_flm

然后向pre_flm发送数据,发现agent会接受到数据,使用crtl c关闭agent,会发现back_agent可以接受到数据。

1
2
3
4
5
flume-ng agent -c flume/conf/ -f pre.flm -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c flume/conf/ -f agent.flm -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c flume/conf/ –f agent_back.flm -n a1 -Dflume.root.logger=INFO,console

2.负载均衡

利用前置的agent,通过processor向多个agent轮询的发送数据

首先配置前置的agent,vi pre_flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

#这个是配置Load balancing的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance
#是否是开启退避功能
a1.sinkgroups.g1.processor.backoff = true
#轮询
a1.sinkgroups.g1.processor.selector = round_robin

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 6666

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

第二配置agent a与agnet b vi a.flm vi b.flm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#a.flm
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


#b.flm
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
文章目录
  1. 1. 一.Flume是什么
  2. 2. 二.Flume的三大组件
    1. 2.1. source
    2. 2.2. channel
    3. 2.3. sink
  3. 3. 三.Flume的操作
    1. 3.1. 1.监控指定端口,并采集数据,输出到控制台
    2. 3.2. 2.实时采集数据并输出到控制台
    3. 3.3. 3.Spool监测配置的目录下新增的文件
    4. 3.4. 4.Syslogtcp监听TCP的端口做为数据源
    5. 3.5. 5.读取本地文件到hdfs
  4. 4. 三.agent执行
    1. 4.0.1. channel选择器:
  • 5. 四.Flume的可靠性
  • 6. 五.flume的高可用
    1. 6.1. 1.故障转移
    2. 6.2. 2.负载均衡
  • |
    载入天数...载入时分秒...