kafka 3.0 安装以及使用(全场景)

2025/03/27 源自  中间件

Apache Kafka 版本对比:2.0、3.0、4.0 的变化

Kafka 2.0

在 2.0 版本中,Kafka 引入了对线程协议的更改。
为了确保升级过程的平稳过渡,官方建议在滚动升级时更新所有代理的 server.properties 文件,并添加相应的属性配置。

此外,2.0 版本还带来了其他重大更改,用户在升级前需要仔细查阅相关文档以了解详细信息。
参考来源

Kafka 3.0

Kafka 3.0 版本带来了多项新特性和改进,主要包括:

  • 性能提升
    在百万分区的情况下,Kafka 3.0 的启动和停止时间相比 2.0 版本有了显著减少,提升了至少 10 倍。
    参考来源

  • 默认配置变更

    • 生产者默认启用了更强的交付保证,即 acks=allenable.idempotence=true
    • 消费者的会话超时时间默认值从 10 秒增加到 45 秒,以更好地适应暂时的网络故障。
      参考来源
  • KRaft 模式引入
    3.0 版本引入了 KRaft 模式,这是 Kafka 内置的共识机制,旨在取代对 ZooKeeper 的依赖。
    尽管 KRaft 在 3.0 版本中尚未被推荐用于生产环境,但它为未来的架构演进奠定了基础。
    参考来源

  • 弃用旧版本支持
    3.0 版本中弃用了对 Java 8 和 Scala 2.12 的支持,计划在 4.0 版本中彻底移除,以便开发者有时间进行调整。
    参考来源

Kafka 4.0

Kafka 4.0 版本是一个重要的里程碑,标志着完全摆脱了对 Apache ZooKeeper® 的依赖。
通过默认运行在 KRaft 模式下,Kafka 简化了部署和管理,消除了维护独立 ZooKeeper 集群的复杂性。

Kafka 在 2.0、3.0 和 4.0 版本中不断演进,带来了 性能优化配置改进架构简化 等多方面的提升,
为用户提供了更高效、易用的分布式流处理平台。

1、安装 kafka

1.1、安装zookeeper

docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=3  --memory=1024M --name zookeeper \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime \
wurstmeister/zookeeper

1.2、安装kafka

docker run -d  --log-driver json-file --log-opt max-size=100m --log-opt max-file=4 \
--name kafka 
-p 9092:9092 \ 
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.200:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.200:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-v /etc/localtime:/etc/localtime \
wurstmeister/kafka

KAFKA_LISTENERS:负责绑定网卡
KAFKA_ADVERTISED_LISTENERS:负责发布外网地址,地址会发布到zookeeper中

2. 项目初始化

2.1 引入 Maven 依赖

本项目基于 Spring Boot 2.6.5 进行开发,首先需要在 pom.xml 文件中添加 Kafka 相关依赖:

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2 配置 Kafka

spring:
  kafka:
    bootstrap-servers: 192.168.1.200:9092  # 集群地址逗号分隔节点
    producer:
      retries: 3  # 失败时的重试次数
      acks: 1  # 消息确认机制(0、1、all)
      batch-size: 16384  # 批量发送的大小
      buffer-memory: 33554432  # 缓存大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: gzip  # 消息压缩格式
      properties:
        linger.ms: 100  # 发送延迟
        max.block.ms: 60000  # 最大阻塞时间
        partitioner.linger.ms: 1000
        partitioner.max.block.ms: 6000
    consumer:
      group-id: testGroup  # 消费者组
      enable-auto-commit: true  # 是否自动提交 offset
      auto-commit-interval: 100  # 自动提交的时间间隔
      auto-offset-reset: latest  # offset 策略(earliest, latest, none)
      max-poll-records: 100  # 每次拉取的最大消息数
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        max.poll.interval.ms: 600000  # 最大拉取间隔
        session.timeout.ms: 120000  # session 超时时间
        request.timeout.ms: 18000  # 请求超时时间
    listener:
      missing-topics-fatal: false  # Topic 不存在时是否抛出异常

3. Kafka 基础使用

3.1 生产者示例

@RestController
public class KafkaSimpleProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @GetMapping("simple/{msg}")
    public void sendSimpleMessage(@PathVariable("msg") String msg) {
        kafkaTemplate.send("simple_topic", LocalDateTime.now() + "  " + msg);
    }
}

3.2 消费者示例

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"simple_topic"})
    public void onNormalMessage(ConsumerRecord<String, Object> record) {
        log.info("接收到消息: topic:{}  partition:{}   offset:{}  key:{}  msg:{}  timestamp:{}",
                 record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }
}

3.3 多消费者(指定 partition)每个消费者监听不同的分区

消费者 1(监听分区 0)

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumer1 {
    
    @KafkaListener(topics = "custom_topic", groupId = "consumer-group-1", properties = {"spring.kafka.consumer.auto-offset-reset=earliest"}, topicPartitions = @TopicPartition(topic = "custom_topic", partitions = {"0"}))
    public void listenPartition0(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("Consumer 1 - Partition 0 - Message: {}", record.value());
        ack.acknowledge();
    }
}

消费者 2(监听分区 1)

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class KafkaConsumer2 {
    
    @KafkaListener(topics = "custom_topic", groupId = "consumer-group-2", properties = {"spring.kafka.consumer.auto-offset-reset=earliest"}, topicPartitions = @TopicPartition(topic = "custom_topic", partitions = {"1"}))
    public void listenPartition1(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("Consumer 2 - Partition 1 - Message: {}", record.value());
        ack.acknowledge();
    }
}

生产者使用 key 来确保消息被正确路由到指定分区。
消费者分别监听特定分区,实现定向消费。
适用于场景:负载均衡、高吞吐量、精准消息分发。

4. 生产者高级用法

4.1 带回调的生产者

Kafka 允许在消息发送后添加回调,以便处理成功或失败情况。

方式 1:使用 addCallback

@GetMapping("addCallback1/{msg}")
public void sendCallback1Message(@PathVariable("msg") String msg) {
    kafkaTemplate.send("simple_topic", LocalDateTime.now() + "  " + msg)
        .addCallback(new SuccessCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("消息发送成功: topic:{}  partition:{}  offset:{}",
                         result.getRecordMetadata().topic(),
                         result.getRecordMetadata().partition(),
                         result.getRecordMetadata().offset());
            }
        }, new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("消息发送失败: {}", ex.getMessage());
            }
        });
}

方式 2:使用 ListenableFutureCallback

@GetMapping("addCallback2/{msg}")
public void sendCallback2Message(@PathVariable("msg") String msg) {
    kafkaTemplate.send("simple_topic", LocalDateTime.now() + "  " + msg)
        .addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("消息发送失败: {}", ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("消息发送成功: topic:{}  partition:{}  offset:{}",
                         result.getRecordMetadata().topic(),
                         result.getRecordMetadata().partition(),
                         result.getRecordMetadata().offset());
            }
        });
}

4.2 使用 ProducerListener 监听消息状态

Kafka 提供 ProducerListener 接口,可以用来监控消息发送情况。例如,我们可以在 KafkaTemplate 中添加监听器,记录成功和失败的消息:

@Slf4j
@Configuration
public class KafkaConfig {
    
    @Autowired
    private ProducerFactory<String, Object> producerFactory;
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                log.info("ProducerListener 发送成功: topic:{}  partition:{}  offset:{}  msg:{}",
                         recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), producerRecord.value());
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord, @Nullable RecordMetadata recordMetadata, Exception e) {
                log.info("ProducerListener 发送失败: msg:{} errMsg:{}", producerRecord.value(), e.getMessage());
            }
        });
        return kafkaTemplate;
    }
}

4.3 自定义分区策略

Kafka 默认采用轮询方式分配分区,也支持自定义分区策略。当发送消息时:
1、若明确指定了分区号,则直接进入对应分区。
2、若未指定分区号,但提供了 key,Kafka 通过 key 的哈希值计算分区,保证相同 key 进入同一分区。
3、若 key 也未指定,Kafka 采用轮询方式将消息分配到不同分区。

可以自定义分区器来实现特定的分区策略,例如按某个业务字段进行分区:

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return key.hashCode() % numPartitions;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

在 application.yml 中配置自定义分区器:

spring:
  kafka:
    producer:
      properties:
        partitioner.class: com.example.kafka.CustomPartitioner

结束

以上介绍了 Kafka 在 Spring Boot 项目中的基本使用,包括生产者、消费者、回调机制以及自定义分区策略。结合回调、监听器和分区器,可以进一步优化 Kafka 消息的处理方式,提高系统的可靠性和可扩展性。