Flume拦截器interceptor开发

Flume除了提供强大的Source,Sink以及Channel之外,还提供了Interceptor机制,在Source端获取数据后,可以立即对数据进行处理。

除了Flume提供的系统Interceptor拦截器之外,用户还可以通过自定义的方法利用代码实现更为复杂且灵活的拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CustomizedInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
return event;
//return null;
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> out = Lists.newArrayList();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}

@Override
public void close() {
//no-op
}

public static class CustomizedBuilder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomizedInterceptor();
}

@Override
public void configure(Context context) {
//no parameters
}
}
}

通过继承Interceptor,实现自定义的intercept函数,对Flume的每一条Event消息进行处理。另外,在initialize和close函数中,还可以在flume agent启动和关闭的过程中,进行其它需要的操作。

这里有几点要注意:

  • intercept函数如果返回null,则该条Event将被丢弃不再送往channel

  • intercept(List events)函数对events集合进行处理,返回结果的大小只可能等于或小于原有events中对象的数量,即source往channel送的event数量,只可以小于或等于source获取的数量,不可以大于原始数据量。

  • 自定义的interceptor打包为jar文件后,需要放置在flume的lib目录下,并在conf/flume.conf文件中进行配置:

    1
    2
    agent.sources.so.interceptors = i1
    agent.sources.so.interceptors.i1.type = packagename.CustimziedInterceptor$CustomizedBuilder

    通过实践证明,flume的拦截器运行还是相当稳定的。可以用来对数据进行清洗处理等,相比较后续对kafka消息的消费处理,拦截器对于数据传输的性能影响不是很大。

作者

ovasty

发布于

2021-02-01

更新于

2021-02-02

许可协议

评论

You forgot to set the shortname for Disqus. Please set it in _config.yml.