除了 Source、channel、Sink外,Flume Agent 还允许用户设置其他组件更灵活地控制数据流,包括 Interceptor,Channel Selector 和 Sink Processor。
Interceptor
Flume 中的拦截器(Interceptor),当 Source 读取 Event 发送到 Sink 的 Event 时候,在 Event header 中加入一些有用的信息,或者对 Event 的内容进行过滤,完成初步的数据清洗。
用户可配置多个 Interceptor,形成一个 Interceptor 链。
1 |
a1.sources.r1.interceptors=i1 i2 |
这在实际业务场景中非常有用,Flume-ng 1.7 中目前提供了以下拦截器:
-
Timestamp Interceptor:该 Interceptor 在每个 Event 头部插入时间戳,其中key是timestamp,value为当前时刻。
-
Host Interceptor:该 Interceptor 在每个 Event 头部插入当前 Agent 所在机器的host或ip,其中key是host(也可自定义)。
1 |
vi host_agent.properties |
bin/flume-ng agent -c conf/ -f conf/host_agent.properties -n a1 -Dflume.root.logger=INFO,console
-
Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。
1
2
3
4a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = static_key
a1.sources.r1.interceptors.i1.value = static_value -
UUID Interceptor:该 Interceptor 在每个 Event 头部插入一个128位的全局唯一标示,例如 b5755073-77a9-43c1-8fad-b7a586fc1b97
1 |
#type的参数不能写成uuid,得写具体,否则找不到类 |
-
Regex Filtering Interceptor:该 Interceptor 可根据正则表达式过滤或者保留符合要求的 Event
1
2
3
4
5
6
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^bai1234.*
#该配置表示过滤掉不是以bai1234开头的events。如果excludeEvents设为true,则表示过滤掉以bai1234开头的events。
a1.sources.r1.interceptors.i1.excludeEvents = false -
Regex Extractor Interceptor:该 Interceptor 可根据正则表达式取出对应的值,并插入到头部
1
2
3
4
5
6a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
该配置从原始events中抽取出cookieid和ip,加入到events header中。
Channel Selector
Channel Selector 允许 Flume Source 选择一个或多个目标 Channel,并将当前 Event 写入这些 Channel。
Flume 提供了两种 Channel Selector 实现:
- Replicating Channel Selector:将每个 Event 指定多个 Channel,通过该 Selector,Flume 可将相同数据导入到多套系统中,一遍进行不同地处理。这是Flume 默认采用的 Channel Selector。
demo:
1 |
a1.sources = r1 |
- Multiplexing Channel Selector:根据 Event 头部的属性值,将 Event写入对应的 Channel
1 |
a1.sources = r1 |
Sink Processor
Flume 允许将多个 Sink 组装在一起形成一个逻辑实体,成为 Sink Group。而 Sink Processor 则在 Sink Group 基础上提供负载均衡以及容错功能。当一个 Sink 挂掉了,可由另一个 Sink 接替。
demo:
1 |
a1.sinkgroups = g1 |
Flume 提供了多种 Sink Processor 实现:
- Default Sink Processor:默认的 Sink Processor,仅仅接受一个 Sink,实现了最简单的 source – channel – sink,每个组件只有一个
- Failover Sink Processor:故障转移接收器,Sink Group 中每个 Sink 均被赋予一个优先级,Event 优先由高优先级的 Sink 发送,如果高优先级的 Sink 挂了,则次高优先级的 Sink 接替
1 |
a1.sinkgroups = g1 |
- Load balancing Sink Processor:负载均衡接收处理器,Channel 中的 Event 通过某种负载均衡机制,交给 Sink Group 中的所有 Sink 发送,
- 目前 Flume支持两种负载均衡机制,分别是:round_robin(轮训),random(随机)。
demo:
1 |
a1.sinkgroups = g1 |