//读取header中默认key的值,KEY_HEADER="key" eventKey = headers.get(KEY_HEADER); Integer partitionId = null; try { ProducerRecord<String, byte[]> record; if (staticPartitionId != null) { partitionId = staticPartitionId; } //Allow a specified header to override a static ID //如果指定了partitionIdHeader,则从Header中获取对应的value值作为partitionId if (partitionHeader != null) { String headerVal = event.getHeaders().get(partitionHeader); if (headerVal != null) { partitionId = Integer.parseInt(headerVal); } } //如果设置了partitionId,则ProducerRecord带上partitionId参数,否则只有topic和eventKey if (partitionId != null) { record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey, serializeEvent(event, useAvroEventFormat)); } else { record = new ProducerRecord<String, byte[]>(eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)); } kafkaFutures.add(producer.send(record, new SinkCallback(startTime))); } catch (NumberFormatException ex) { thrownew EventDeliveryException("Non integer partition id specified", ex); } catch (Exception ex) { // N.B. The producer.send() method throws all sorts of RuntimeExceptions // Catching Exception here to wrap them neatly in an EventDeliveryException // which is what our consumers will expect thrownew EventDeliveryException("Could not send event", ex); } ovasty.com
/** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */ publicProducerRecord(String topic, Integer partition, K key, V value){ this(topic, partition, null, key, value, null); }
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ publicintpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/** * This is called when partitioner is closed. */ publicvoidclose();
}
publicintpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); //如果header中没有设置key if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { //设置了key,则为key指定的value作为keyBytes,进行hash操作 // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } ovasty.com