Kafka配置详解
背景:
很多集成kafka的方式,都是通过配置文件的方式直接配置了,灵活性较差,若遇到需要动态调整分组ID的时候,就会遇到瓶颈
踩坑:
1、kafka配置外网使用的时候,bootstrap-servers需要的是域名地址,直接暴露外网ip,kafka启动都是不成功的,本地可以使用127.0.0.1进行通信
使用服务器外网的时候,最好 本地配置hosts转发到kafka服务器上
如:bootstrap-servers: 048f7a2e7d03:9092
2、auto-offset-reset: 配置很重要,根据实际情况来配置,不然容器导致重复消费的问题
3、key 和value的序列化问题,手动进行配置的时候,生产者配置:"org.apache.kafka.common.serialization.StringSerializer" 直接指定某个序列化方式,不需要指定class
springboot集成kafka配置方式使用:
kafka:
bootstrap-servers: 048f7a2e7d03:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
properties:
max.block.ms: 2000
consumer:
group-id: testGroup
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: latest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
注意listener的配置:
ack-mode:一般配置为MANUAL 或者MANUAL_IMMEDIATE 标识手动进行offset的提交
auto-offset-reset:很重要
latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
解释:latest就是表示有新的分组进来,从当前的offset进行消息消费,之前的offset不管
earliest:表示有新的分组进来,offset从0开始进行消费
根据业务需求来, 配置的不好,可能导致重复消费问题
springboot集成kafka配置方式
1、配置kafka生产者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* kafka的消费者配置
*
*/
@Slf4j
@Component
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String BROKERS;
@Value("${spring.kafka.producer.retries}")
private Integer RETRIES;
@Value("${spring.kafka.producer.batch-size}")
private Integer BATCH_SIZE;
@Value("${spring.kafka.producer.buffer-memory}")
private Long BUFFER_MEMORY;
@Value("${spring.kafka.producer.acks}")
private String ACK;
@Value("${spring.kafka.producer.properties.max.block.ms}")
private Integer BLOCK_MS;
/**
* 生产者配置
*
* @return 配置
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<String, Object>();
//kafka server地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
properties.put(ProducerConfig.ACKS_CONFIG,ACK);
//消息发送失败重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, RETRIES);
//去缓冲区中一次拉16k的数据,发送到broker
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
//设置缓存区大小 32m
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY);
//key序列化器选择,直接指定序列化包class,不能直接写 StringSerializer.class
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//value序列化器选择
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//设置sasl认证
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,BLOCK_MS);
return properties;
}
/**
* Producer Template 配置
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> stringObjectMap = producerConfigs();
DefaultKafkaProducerFactory<String, String> objectObjectDefaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(stringObjectMap);
return new KafkaTemplate<>(objectObjectDefaultKafkaProducerFactory);
}
}
注意上面标红的部分:配置key和valued的序列化方式的时候,直接指定配置的class
2、配置kafka消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* kafka的消费者配置
*
* @author G008186
*/
@Slf4j
@Component
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String BROKERS;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean ENABLE_AUTO_COMMIT;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String AUTO_COMMIT_INTERVAL_MS;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String AUTO_OFFSET_RESET;
@Value("${spring.kafka.listener.concurrency}")
private Integer CONCURRENCY;
@Value("${spring.kafka.listener.missing-topics-fatal}")
private Boolean TOPICS_FATAL;
private String CURRENT_INSTANCE_GROUP_ID;
/**构建kafka监听工厂*/
@Bean("kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
//这里配置监听器的模式,记得需要如下,先获取properties,然后才可以设置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(CONCURRENCY);
factory.setMissingTopicsFatal(TOPICS_FATAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
/**初始化消费工厂配置 其中会动态指定消费分组*/
private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
/**多实例部署每个实例设置不同groupId 实现发布订阅*/
//CURRENT_INSTANCE_GROUP_ID = KafkaConstant.SSE_GROUP.concat(String.valueOf(System.identityHashCode(sendSyncTaskFactory)));
log.info("当前实例WsMsgConsumer group_id:{}",CURRENT_INSTANCE_GROUP_ID);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
return new DefaultKafkaConsumerFactory<String, String>(properties);
}
}
注意红色部分,配置ack-mode的方式