Flume的KafkaSink配置探究

Flume在大数据环境下,通常用来进行数据的搬运,它提供了丰富的Source和Sink组件,使得我们可以很方便的进行各组异构环境下的配置。KafkaSink是其中较为普遍使用的sink组件,它支持Flume将采集到的数据发送到Kafka消息队列中。

在Flume提供的KafkaSink的相关配置参数中,有一些与kafka的topic以及分区相关的配置参数,可以帮助我们在面对多Topic或多分区的情况下,更灵活地利用Flume的Sink机制,将数据分发到正确的队列中。

指定Topic

  • kafka.topic

  • allowTopicOverride

  • topicHeader

kafka.topic参数指定了消息将要被发送到的kafka的topic名称。默认情况下,一旦指定了这个参数,所有channel过来的Event将全部发送到该topic下。

但是,当Event消息的Header中包含有topic的键时,该Event消息的目的地将由该topic键的键值决定,即如果每个Event消息的Header中都指定了目标topic,那么每个Event将会不再理会kafka.topic的配置,它们的目的地将由各自Header中的topic键的键值决定。

上述的override效果,则是由allowTopicOverride以及topicHeader这两个参数配合的效果。当allowTopicOverride=true,topicHeader=topic时,则取header中key=topic的value值为目标topic。

分区Partition

  • defaultPartitionId
  • partitionIdHeader

一般情况下,一个Kafka的Topic会被会为若干个partition分区,便于并行处理。defaultPartitionId参数指定了分区的id,所有的event都将分发到指定topic的该ID分区中。但如果设置了partitionIdHeader参数,则会以该参数的值为键名,在Header获取对应的value值作为指定的分区,如果分区ID是无效的,则会抛出EventDeliveryException的异常。

如果想要均匀的将数据sink到不同的分区中,则需在Header中设置以“key”为键名的值,kafak会以该键值作Hash,将消息打散到不同的分区中去。

源码分析

Flume KafkaSink源码(1.8.0)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();

//如果设置了allowTopicOverride参数
if (allowTopicOverride) {
//获取header中topicHeader指定的Key的Value作为topic
eventTopic = headers.get(topicHeader);
if (eventTopic == null) {
eventTopic = BucketPath.escapeString(topic, event.getHeaders());
}
} else {
//直接获取kafka.topic
eventTopic = topic;
}

//读取header中默认key的值,KEY_HEADER="key"
eventKey = headers.get(KEY_HEADER);
Integer partitionId = null;
try {
ProducerRecord<String, byte[]> record;
if (staticPartitionId != null) {
partitionId = staticPartitionId;
}
//Allow a specified header to override a static ID
//如果指定了partitionIdHeader,则从Header中获取对应的value值作为partitionId
if (partitionHeader != null) {
String headerVal = event.getHeaders().get(partitionHeader);
if (headerVal != null) {
partitionId = Integer.parseInt(headerVal);
}
}
//如果设置了partitionId,则ProducerRecord带上partitionId参数,否则只有topic和eventKey
if (partitionId != null) {
record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
serializeEvent(event, useAvroEventFormat));
} else {
record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
serializeEvent(event, useAvroEventFormat));
}

kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
} catch (NumberFormatException ex) {
throw new EventDeliveryException("Non integer partition id specified", ex);
} catch (Exception ex) {
// N.B. The producer.send() method throws all sorts of RuntimeExceptions
// Catching Exception here to wrap them neatly in an EventDeliveryException
// which is what our consumers will expect
throw new EventDeliveryException("Could not send event", ex);
}
ovasty.com

进入Kafka-Clients源码,持续追踪ProducerRecord的调用,最后找到partition函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}

public interface Partitioner extends Configurable, Closeable {

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

/**
* This is called when partitioner is closed.
*/
public void close();

}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//如果header中没有设置key
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//设置了key,则为key指定的value作为keyBytes,进行hash操作
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
ovasty.com

从上面代码可以看到,当在Event的Header中设置了key=xxx后,kakfa会对xxx使用murmur2的Hash函数,再对分区数量进行取余,将所有的Event消息发送到分区中。这样,也可以保证了包含有相同的key=xxx的Event消息,可以被发送到同一个kafka分区中。

作者

ovasty

发布于

2021-01-28

更新于

2021-02-01

许可协议

评论

You forgot to set the shortname for Disqus. Please set it in _config.yml.