Flume 学习之路(三)Flume的Source类型

官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources

Avro Source

内置 Avro Server,可接受 Avro 客户端发送的数据

官方文档描述:
Flume 学习之路(三)Flume的Source类型

Flume 学习之路(三)Flume的Source类型

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
vim avro_source.properties

a1.sources = s1
a1.sinks = k1
a1.channels = c1

// 配置source
a1.sources.s1.channels = c1
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 6666

// 配置channels
a1.channels.c1.type = memory

// 配置sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

// 为sources和sinks绑定channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动 Flume NG:

1
bin/flume-ng agent -c conf/ -f conf/avro_source.properties -n a1 -Dflume.root.logger=INFO,console

开始输入测试数据:

1
2
3
4
5
vim  666.txt 

123
123
123

客户端输入:

1
bin/flume-ng avro-client -c conf/ -H bigdata -p 6666 -F 666.txt

Thrift Source

内置 Thrift Server,可接受 Thrift 客户端发送的数据。ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可

Exec Source

执行指定的shell,并从该命令标准输出中获取数据,但考虑到该flume agent不运行或者指令出错时,将无法收集到数据,故生产环境很少采用 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vim exec_source.properties

// 配置文件
a1.sources= s1
a1.sinks= k1
a1.channels= c1

// 配置sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/test.log
a1.sources.s1.channels = c1

// 配置sinks
a1.sinks.k1.type= logger
a1.sinks.k1.channel= c1

// 配置channel
a1.channels.c1.type= memory

启动 Flume NG:

1
bin/flume-ng agent --conf conf/ -f conf/exec_source.properties --name a1 -Dflume.root.logger=DEBUG,console

生成测试数据:

1
2
3
cd /tmp/test.log

echo "hello world" >> test.log

Spooling Directory Source

监听一个文件夹下新产生的文件,并读取内容,发至 channel。使用该 Source 需要注意两点:第一个是拷贝到 spool 目录下的文件不可以再打开编辑,第二个是 spool 目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。由于该Source可靠性和稳定性较好,被不少公司采用。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vim spooling_dir_source.properties  

a1.sources = s1
a1.sinks = k1
a1.channels = c1

// Describe/configure the source
a1.sources.s1.type =spooldir
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.fileHeader= true
a1.sources.s1.channels =c1

// Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

// Use a channel which buffers events inmemory
a1.channels.c1.type = memory

启动 Flume NG:

1
bin/flume-ng agent --conf conf/ -f conf/spooling_dir_source.properties --name a1 -Dflume.root.logger=DEBUG,console

生成测试数据

1
mv test.log logs

KafKa Source

内置了Kafka Consumer,可从 KaFka Broker 中读取某个 topic的数据,写入Channel。

Taildir Source

可以实时监控一个目录下文件的变化,并实时读取新增数据,记录断点,保证重启 Agent 后数据不丢失或被重复传输。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
vim tail_dir_source.properties

a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
// positionFile 检查点文件路径
a1.sources.r1.positionFile = /tmp/flume/taildir_position.json
// 指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控tail多个目录中的文件)
a1.sources.r1.filegroups = f1
// 指定监控的文件目录f1,匹配的文件信息
a1.sources.r1.filegroups.f1 = /tmp/baihe-flume/log_.*.log
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

// sink1 配置
a1.sinks.s1.type = file_roll
a1.sinks.s1.sink.directory = /tmp/flumefiles
a1.sinks.s1.sink.rollInterval = 0


// fileChannel 配置
a1.channels.c1.type = file
// -->检测点文件所存储的目录
a1.channels.c1.checkpointDir = /tmp/flume/checkpoint/baihe/
// -->数据存储所在的目录设置
a1.channels.c1.dataDirs = /tmp/flume/data/baihe/
// -->隧道的最大容量
a1.channels.c1.capacity = 10000
// -->事务容量的最大值设置
a1.channels.c1.transactionCapacity = 200

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1

启动 Flume NG:

1
bin/flume-ng agent --conf conf/ -f conf/tail_dir_source.properties --name a1 -Dflume.root.logger=DEBUG,console

生成测试数据:

采集目录下生成正则对应的文件信息,写入数据

1
2
3
4
5
6
7
cd /tmp/baihe-flume

echo "message1" >> log_20190915.log
echo "message12" >> log_20190915.log

// 查看sink文件
tail -f /tmp/flumefiles/1568530522032-1

TailSource使用了RandomAccessFile来根据positionFile中保存的文件位置来读取文件的,在agent重启之后,亦会先从positionFile中找到上次读取的文件位置,保证内容不会重复发送

Syslog

Syslog 分为 Tcp Source和 UDP Source两种,分别接受tcp和udp协议发过来的数据,写入Channel

HTTP source

可接受HTTP协议发来的数据,宾写入Channel

如何选择Flume Source?

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论