Apache Kafka 版本对比:2.0、3.0、4.0 的变化
Kafka 2.0
在 2.0 版本中,Kafka 引入了对线程协议的更改。
为了确保升级过程的平稳过渡,官方建议在滚动升级时更新所有代理的 server.properties
文件,并添加相应的属性配置。
此外,2.0 版本还带来了其他重大更改,用户在升级前需要仔细查阅相关文档以了解详细信息。
参考来源
Kafka 3.0
Kafka 3.0 版本带来了多项新特性和改进,主要包括:
-
性能提升:
在百万分区的情况下,Kafka 3.0 的启动和停止时间相比 2.0 版本有了显著减少,提升了至少 10 倍。
参考来源 -
默认配置变更:
- 生产者默认启用了更强的交付保证,即
acks=all
和enable.idempotence=true
。 - 消费者的会话超时时间默认值从 10 秒增加到 45 秒,以更好地适应暂时的网络故障。
参考来源
- 生产者默认启用了更强的交付保证,即
-
KRaft 模式引入:
3.0 版本引入了 KRaft 模式,这是 Kafka 内置的共识机制,旨在取代对 ZooKeeper 的依赖。
尽管 KRaft 在 3.0 版本中尚未被推荐用于生产环境,但它为未来的架构演进奠定了基础。
参考来源 -
弃用旧版本支持:
3.0 版本中弃用了对 Java 8 和 Scala 2.12 的支持,计划在 4.0 版本中彻底移除,以便开发者有时间进行调整。
参考来源
Kafka 4.0
Kafka 4.0 版本是一个重要的里程碑,标志着完全摆脱了对 Apache ZooKeeper® 的依赖。
通过默认运行在 KRaft 模式下,Kafka 简化了部署和管理,消除了维护独立 ZooKeeper 集群的复杂性。
Kafka 在 2.0、3.0 和 4.0 版本中不断演进,带来了 性能优化、配置改进 和 架构简化 等多方面的提升,
为用户提供了更高效、易用的分布式流处理平台。
1、安装 kafka
1.1、安装zookeeper
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=3 --memory=1024M --name zookeeper \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
wurstmeister/zookeeper
1.2、安装kafka
docker run -d --log-driver json-file --log-opt max-size=100m --log-opt max-file=4 \
--name kafka
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.200:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.200:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-v /etc/localtime:/etc/localtime \
wurstmeister/kafka
KAFKA_LISTENERS:负责绑定网卡
KAFKA_ADVERTISED_LISTENERS:负责发布外网地址,地址会发布到zookeeper中
2. 项目初始化
2.1 引入 Maven 依赖
本项目基于 Spring Boot 2.6.5 进行开发,首先需要在 pom.xml
文件中添加 Kafka 相关依赖:
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.2 配置 Kafka
spring:
kafka:
bootstrap-servers: 192.168.1.200:9092 # 集群地址逗号分隔节点
producer:
retries: 3 # 失败时的重试次数
acks: 1 # 消息确认机制(0、1、all)
batch-size: 16384 # 批量发送的大小
buffer-memory: 33554432 # 缓存大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
compression-type: gzip # 消息压缩格式
properties:
linger.ms: 100 # 发送延迟
max.block.ms: 60000 # 最大阻塞时间
partitioner.linger.ms: 1000
partitioner.max.block.ms: 6000
consumer:
group-id: testGroup # 消费者组
enable-auto-commit: true # 是否自动提交 offset
auto-commit-interval: 100 # 自动提交的时间间隔
auto-offset-reset: latest # offset 策略(earliest, latest, none)
max-poll-records: 100 # 每次拉取的最大消息数
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 600000 # 最大拉取间隔
session.timeout.ms: 120000 # session 超时时间
request.timeout.ms: 18000 # 请求超时时间
listener:
missing-topics-fatal: false # Topic 不存在时是否抛出异常
3. Kafka 基础使用
3.1 生产者示例
@RestController
public class KafkaSimpleProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("simple/{msg}")
public void sendSimpleMessage(@PathVariable("msg") String msg) {
kafkaTemplate.send("simple_topic", LocalDateTime.now() + " " + msg);
}
}
3.2 消费者示例
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"simple_topic"})
public void onNormalMessage(ConsumerRecord<String, Object> record) {
log.info("接收到消息: topic:{} partition:{} offset:{} key:{} msg:{} timestamp:{}",
record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
}
}
3.3 多消费者(指定 partition)每个消费者监听不同的分区
消费者 1(监听分区 0)
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaConsumer1 {
@KafkaListener(topics = "custom_topic", groupId = "consumer-group-1", properties = {"spring.kafka.consumer.auto-offset-reset=earliest"}, topicPartitions = @TopicPartition(topic = "custom_topic", partitions = {"0"}))
public void listenPartition0(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("Consumer 1 - Partition 0 - Message: {}", record.value());
ack.acknowledge();
}
}
消费者 2(监听分区 1)
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaConsumer2 {
@KafkaListener(topics = "custom_topic", groupId = "consumer-group-2", properties = {"spring.kafka.consumer.auto-offset-reset=earliest"}, topicPartitions = @TopicPartition(topic = "custom_topic", partitions = {"1"}))
public void listenPartition1(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("Consumer 2 - Partition 1 - Message: {}", record.value());
ack.acknowledge();
}
}
生产者使用 key 来确保消息被正确路由到指定分区。
消费者分别监听特定分区,实现定向消费。
适用于场景:负载均衡、高吞吐量、精准消息分发。
4. 生产者高级用法
4.1 带回调的生产者
Kafka 允许在消息发送后添加回调,以便处理成功或失败情况。
方式 1:使用 addCallback
@GetMapping("addCallback1/{msg}")
public void sendCallback1Message(@PathVariable("msg") String msg) {
kafkaTemplate.send("simple_topic", LocalDateTime.now() + " " + msg)
.addCallback(new SuccessCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("消息发送成功: topic:{} partition:{} offset:{}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.info("消息发送失败: {}", ex.getMessage());
}
});
}
方式 2:使用 ListenableFutureCallback
@GetMapping("addCallback2/{msg}")
public void sendCallback2Message(@PathVariable("msg") String msg) {
kafkaTemplate.send("simple_topic", LocalDateTime.now() + " " + msg)
.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.info("消息发送失败: {}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("消息发送成功: topic:{} partition:{} offset:{}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
4.2 使用 ProducerListener 监听消息状态
Kafka 提供 ProducerListener 接口,可以用来监控消息发送情况。例如,我们可以在 KafkaTemplate 中添加监听器,记录成功和失败的消息:
@Slf4j
@Configuration
public class KafkaConfig {
@Autowired
private ProducerFactory<String, Object> producerFactory;
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
log.info("ProducerListener 发送成功: topic:{} partition:{} offset:{} msg:{}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), producerRecord.value());
}
@Override
public void onError(ProducerRecord<String, Object> producerRecord, @Nullable RecordMetadata recordMetadata, Exception e) {
log.info("ProducerListener 发送失败: msg:{} errMsg:{}", producerRecord.value(), e.getMessage());
}
});
return kafkaTemplate;
}
}
4.3 自定义分区策略
Kafka 默认采用轮询方式分配分区,也支持自定义分区策略。当发送消息时:
1、若明确指定了分区号,则直接进入对应分区。
2、若未指定分区号,但提供了 key,Kafka 通过 key 的哈希值计算分区,保证相同 key 进入同一分区。
3、若 key 也未指定,Kafka 采用轮询方式将消息分配到不同分区。
可以自定义分区器来实现特定的分区策略,例如按某个业务字段进行分区:
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return key.hashCode() % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
在 application.yml 中配置自定义分区器:
spring:
kafka:
producer:
properties:
partitioner.class: com.example.kafka.CustomPartitioner
结束
以上介绍了 Kafka 在 Spring Boot 项目中的基本使用,包括生产者、消费者、回调机制以及自定义分区策略。结合回调、监听器和分区器,可以进一步优化 Kafka 消息的处理方式,提高系统的可靠性和可扩展性。