|
|
@@ -355,7 +355,7 @@ index文件存储大量的索引信息,log文件存储大量的数据,索引
|
|
|
kafka根据(Consumer Group+Topic+Partition)来确定唯一一个offset。
|
|
|
---
|
|
|
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置
|
|
|
- 的topic中,该topic为__consumer_offsets,默认有50个分区1个副本,分散在各个broker中。
|
|
|
+ 的topic中,该topic为__consumer_offsets,默认有50个分区1个副本,分散在各个broker中。无论哪种方式,都只在消费者启动的时候访问一次。
|
|
|
```
|
|
|
```
|
|
|
1)修改配置文件consumer.properties,让普通消费者可以消费系统的Topic
|
|
|
@@ -426,45 +426,57 @@ Consumer事务
|
|
|
public class Procuder {
|
|
|
// 带回调函数的API
|
|
|
public static void main(String[] args) {
|
|
|
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+ // 1.读取kafka生产者的配置信息 具体配置参数可参考ProducerConfig,CommonClientConfigs
|
|
|
Properties props = new Properties();
|
|
|
- //kafka集群,broker-list
|
|
|
- props.put("bootstrap.servers", "hexo.lvzhiqiang.top:9093");
|
|
|
- //ack应答机制
|
|
|
- props.put("acks", "all");
|
|
|
- //重试次数
|
|
|
- props.put("retries", 1);
|
|
|
- //批次大小 只有数据积累到batch.size之后,sender才会发送数据
|
|
|
- props.put("batch.size", 16384);
|
|
|
- //等待时间 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
|
|
|
- props.put("linger.ms", 1);
|
|
|
- //缓冲区大小
|
|
|
- props.put("buffer.memory", 33554432);
|
|
|
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
|
|
+ props.load(ClassLoader.getSystemResourceAsStream("newProducer.properties"));
|
|
|
+ // 1.1自定义分区拦截器,可选
|
|
|
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "top.lvzhiqiang.testnewapi.CustomPartitioner");
|
|
|
+ // 2.创建producer对象
|
|
|
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
|
|
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
- for (int i = 0; i < 100; i++) {
|
|
|
- int finalI = i;
|
|
|
- producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)), new Callback() {
|
|
|
- //回调函数,该方法会在Producer收到ack时调用,为异步调用
|
|
|
- //消息发送失败会自动重试,不需要我们在回调函数中手动重试
|
|
|
+ // 3.发送数据
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
+ String value = "testnewapi" + i;
|
|
|
+ // 每条数据都要封装成一个ProducerRecord对象
|
|
|
+ producer.send(new ProducerRecord<>("test", value), new Callback() {
|
|
|
+ // 回调函数,该方法会在Producer收到ack时调用,为异步调用。
|
|
|
+ // 该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,反之说明消息发送失败。
|
|
|
+ // 注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
|
|
|
@Override
|
|
|
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
|
|
if (null == e) {
|
|
|
- log.info("{}->success->{}->{}", finalI, sdf.format(new Date()), recordMetadata.offset());
|
|
|
+ log.info("分区:{},偏移量:{},值:{}", recordMetadata.partition(), recordMetadata.offset(), value);
|
|
|
} else {
|
|
|
- log.warn("{}->fail->{}", finalI, sdf.format(new Date()), e);
|
|
|
+ log.error("{}", sdf.format(new Date()), e);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
+ // 4.关闭资源 会做一些资源的回收,防止没达到send的要求时数据发送不出去
|
|
|
producer.close();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
+ ```properties
|
|
|
+ # 指定连接的kafka集群,broker-list
|
|
|
+ bootstrap.servers=hexo.lvzhiqiang.top:9093,hexo.lvzhiqiang.top:9094
|
|
|
+ # ack应答级别
|
|
|
+ acks=-1
|
|
|
+ # 重试次数
|
|
|
+ retries=3
|
|
|
+ # 批次大小,默认16k,只有数据积累到batch.size之后,sender才会发送数据
|
|
|
+ batch.size=16384
|
|
|
+ # 等待时间,默认1毫秒,如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
|
|
|
+ linger.ms=1
|
|
|
+ # RecordAccumulator缓冲区大小,默认32M
|
|
|
+ buffer.memory=33554432
|
|
|
+ # key,value的序列化类
|
|
|
+ key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
|
+ value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
|
+ ```
|
|
|
- 同步发送API
|
|
|
```
|
|
|
- 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
|
|
|
+ 同步发送的意思就是,Sender线程在工作发送消息时,同时阻塞main线程,直至Sender线程返回ack来通知main线程继续执行。
|
|
|
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们只需再调用Future对象的get方法即可实现同步发送的效果。
|
|
|
```
|
|
|
|
|
|
@@ -480,8 +492,8 @@ Consumer事务
|
|
|
两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失
|
|
|
败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
|
|
|
---
|
|
|
- 虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,
|
|
|
- 会选用异步提交offset的方式。
|
|
|
+ 虽然同步提交offset有失败重试机制,更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。
|
|
|
+ 因此更多的情况下,会选用异步提交offset的方式。
|
|
|
---
|
|
|
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;
|
|
|
而先消费后提交offset,有可能会造成数据的重复消费。
|
|
|
@@ -490,97 +502,70 @@ Consumer事务
|
|
|
```java
|
|
|
@Slf4j
|
|
|
public class Consumer {
|
|
|
- public static void main(String[] args) {
|
|
|
+ public static void main(String[] args) throws IOException {
|
|
|
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
+ // 1.读取kafka消费者的配置信息 具体配置参数可参考ConsumerConfig,CommonClientConfigs
|
|
|
Properties props = new Properties();
|
|
|
- //kafka集群,broker-list
|
|
|
- props.put("bootstrap.servers", "144.34.207.84:9093");
|
|
|
- //消费者组,只要group.id相同,就属于同一个消费者组
|
|
|
- props.put("group.id", "test");
|
|
|
- //是否开启自动提交offset功能
|
|
|
- props.put("enable.auto.commit", "true");
|
|
|
- //自动提交offset的时间间隔
|
|
|
- props.put("auto.commit.interval.ms", "100000");
|
|
|
- //key,value反序列化
|
|
|
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
+ props.load(ClassLoader.getSystemResourceAsStream("newConsumer.properties"));
|
|
|
+ // 1.1重置消费者的offset,可选earliest(最早的)和latest(最新的,默认) 换组(没有初始偏移量)或者offset过期(数据被删除)时该属性会生效
|
|
|
+ // earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
|
|
|
+ // latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
|
|
|
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
+ // 2.创建consumer对象
|
|
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
|
|
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
-
|
|
|
- // 订阅主题
|
|
|
- consumer.subscribe(Arrays.asList("test"));
|
|
|
+ // 3.订阅主题
|
|
|
+ consumer.subscribe(Collections.singletonList("test"));
|
|
|
while (true) {
|
|
|
- // 拉取数据
|
|
|
+ // 4.拉取数据
|
|
|
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
|
|
|
- // 消费数据
|
|
|
+ // 4.1消费数据
|
|
|
for (ConsumerRecord<String, String> record : records) {
|
|
|
- log.info("{},offset={},key={},value={}", sdf.format(new Date()), record.offset(), record.key(), record.value());
|
|
|
+ log.info("分区:{},偏移量:{},值:{}", record.partition(), record.offset(), record.value());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
+ ```properties
|
|
|
+ # 指定连接的kafka集群,broker-list
|
|
|
+ bootstrap.servers=144.34.207.84:9093,144.34.207.84:9094
|
|
|
+ # 消费者组,只要group.id相同,就属于同一个消费者组
|
|
|
+ group.id=test
|
|
|
+ # 是否开启自动提交offset功能,默认true
|
|
|
+ enable.auto.commit=true
|
|
|
+ # 自动提交offset的时间间隔,默认1S
|
|
|
+ auto.commit.interval.ms=1000
|
|
|
+ # key,value反序列化类
|
|
|
+ key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
|
|
+ value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
|
|
+ ```
|
|
|
- 手动提交offset
|
|
|
```java
|
|
|
@Slf4j
|
|
|
- // 同步
|
|
|
- public class Consumer {
|
|
|
- public static void main(String[] args) {
|
|
|
- Properties props = new Properties();
|
|
|
- //kafka集群,broker-list
|
|
|
- props.put("bootstrap.servers", "144.34.207.84:9093");
|
|
|
- //消费者组,只要group.id相同,就属于同一个消费者组
|
|
|
- props.put("group.id", "test");
|
|
|
- //关闭自动提交offset
|
|
|
- props.put("enable.auto.commit", "false");
|
|
|
- //key,value反序列化
|
|
|
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
|
|
+ public class ManuallySubmitConsumer {
|
|
|
+ public static void main(String[] args) throws IOException {
|
|
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
-
|
|
|
- // 订阅主题
|
|
|
- consumer.subscribe(Arrays.asList("test"));
|
|
|
- while (true) {
|
|
|
- // 拉取数据
|
|
|
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
|
|
|
- // 消费数据
|
|
|
- for (ConsumerRecord<String, String> record : records) {
|
|
|
- log.info("{},offset={},key={},value={}", sdf.format(new Date()), record.offset(), record.key(), record.value());
|
|
|
- }
|
|
|
- //同步提交,当前线程会阻塞直到offset提交成功
|
|
|
- consumer.commitSync();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- ```
|
|
|
- ```java
|
|
|
- @Slf4j
|
|
|
- // 异步
|
|
|
- public class Consumer {
|
|
|
- public static void main(String[] args) {
|
|
|
+ // 1.读取kafka消费者的配置信息 具体配置参数可参考ConsumerConfig,CommonClientConfigs
|
|
|
Properties props = new Properties();
|
|
|
- //kafka集群,broker-list
|
|
|
- props.put("bootstrap.servers", "144.34.207.84:9093");
|
|
|
- //消费者组,只要group.id相同,就属于同一个消费者组
|
|
|
- props.put("group.id", "test");
|
|
|
- //关闭自动提交offset
|
|
|
+ props.load(ClassLoader.getSystemResourceAsStream("newConsumer.properties"));
|
|
|
+ // 1.1重置消费者的offset,可选earliest(最早的)和latest(最新的,默认)
|
|
|
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
+ // 1.2关闭自动提交offset
|
|
|
props.put("enable.auto.commit", "false");
|
|
|
- //key,value反序列化
|
|
|
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
+ // 2.创建consumer对象
|
|
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
|
|
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
-
|
|
|
- // 订阅主题
|
|
|
+ // 3.订阅主题
|
|
|
consumer.subscribe(Arrays.asList("test"));
|
|
|
while (true) {
|
|
|
- // 拉取数据
|
|
|
+ // 4.拉取数据
|
|
|
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
|
|
|
- // 消费数据
|
|
|
+ // 4.1消费数据
|
|
|
for (ConsumerRecord<String, String> record : records) {
|
|
|
- log.info("{},offset={},key={},value={}", sdf.format(new Date()), record.offset(), record.key(), record.value());
|
|
|
+ log.info("分区:{},偏移量:{},值:{}", record.partition(), record.offset(), record.value());
|
|
|
}
|
|
|
- //异步提交
|
|
|
+ // 4.2.a同步提交,当前线程会阻塞直到offset提交成功(重试)才会拉取新的数据
|
|
|
+ consumer.commitSync();
|
|
|
+ // 4.2.b异步提交
|
|
|
consumer.commitAsync(new OffsetCommitCallback() {
|
|
|
@Override
|
|
|
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
|