lvzhiqiang 5 年之前
父節點
當前提交
f10fef3748
共有 2 個文件被更改,包括 207 次插入22 次删除
  1. 207 22
      source/_posts/af-mq-kafka.md
  2. 二進制
      source/_posts/af-mq-kafka/af-mq-kafka-010.png

+ 207 - 22
source/_posts/af-mq-kafka.md

@@ -452,7 +452,7 @@ Consumer事务
                     }
                 });
             }
-            // 4.关闭资源 会做一些资源的回收,防止没达到send的要求时数据发送不出去
+            // 4.关闭资源 会做一些资源的回收(包括拦截器,分区器等资源的收尾处理),同时防止没达到send的要求时数据发送不出去
             producer.close();
         }
     }
@@ -591,32 +591,27 @@ Consumer事务
     ```
     ```java
     @Slf4j
-    public class CustomConsumer {
+    public class CustomOffsetConsumer {
         private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
     
-        public static void main(String[] args) {
-            //创建配置信息
+        public static void main(String[] args) throws IOException {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            // 1.读取kafka消费者的配置信息 具体配置参数可参考ConsumerConfig,CommonClientConfigs
             Properties props = new Properties();
-            //kafka集群,broker-list
-            props.put("bootstrap.servers", "144.34.207.84:9093");
-            //消费者组,只要group.id相同,就属于同一个消费者组
-            props.put("group.id", "test");
-            //关闭自动提交offset
+            props.load(ClassLoader.getSystemResourceAsStream("newConsumer.properties"));
+            // 1.1重置消费者的offset,可选earliest(最早的)和latest(最新的,默认)
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            // 1.2关闭自动提交offset
             props.put("enable.auto.commit", "false");
-            //Key和Value的反序列化类
-            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-            //创建一个消费者
+            // 2.创建consumer对象
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-            //消费者订阅主题
+            // 3.订阅主题
             consumer.subscribe(Arrays.asList("test"), new ConsumerRebalanceListener() {
                 //该方法会在Rebalance之前调用
                 @Override
                 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                     commitOffset(currentOffset);
                 }
-    
                 //该方法会在Rebalance之后调用
                 @Override
                 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
@@ -628,15 +623,21 @@ Consumer事务
                 }
             });
             while (true) {
-                //消费者拉取数据
+                // 4.拉取数据
                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
-                //消费数据
+                // 4.1消费数据
                 for (ConsumerRecord<String, String> record : records) {
-                    log.info("{},offset={},key={},value={}", sdf.format(new Date()), record.offset(), record.key(), record.value());
-                    currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
+                    log.info("分区:{},偏移量:{},值:{}", record.partition(), record.offset(), record.value());
                 }
-                //异步提交
-                commitOffset(currentOffset);
+                // 4.2.b异步提交
+                consumer.commitAsync(new OffsetCommitCallback() {
+                    @Override
+                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                        if (exception != null) {
+                            log.error("Commit failed for {}", offsets, exception);
+                        }
+                    }
+                });
             }
         }
     
@@ -653,8 +654,192 @@ Consumer事务
   
 ### `自定义Interceptor`
 
+- 拦截器原理
+    ```
+    Producer拦截器(interceptor)是在 Kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
+    对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
+    同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
+    Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
+    ---
+    (1)configure(configs):
+    获取配置信息和初始化数据时调用。
+    (2)onSend(ProducerRecord):
+    该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。 
+    用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
+    (3)onAcknowledgement(RecordMetadata, Exception):
+    该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发
+    之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
+    (4)close:
+    关闭interceptor,主要用于执行一些资源清理工作。
+    ```
+    ```
+    如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
+    另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们。
+    并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。
+    ```
+- 案例
+    ```java
+    @Slf4j
+    public class CustomInterceptor implements ProducerInterceptor<String, String> {
+        private int errorCounter = 0;
+        private int successCounter = 0;
+    
+        @Override
+        public ProducerRecord onSend(ProducerRecord producerRecord) {
+            // 创建一个新的record,把时间戳写入消息体的最前部
+            return new ProducerRecord(producerRecord.topic(),
+                    producerRecord.partition(),
+                    producerRecord.timestamp(),
+                    producerRecord.key(),
+                    System.currentTimeMillis() + "," + producerRecord.value().toString());
+        }
+    
+        @Override
+        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
+            // 统计成功和失败的次数
+            if (e == null) {
+                successCounter++;
+            } else {
+                errorCounter++;
+            }
+        }
+    
+        @Override
+        public void close() {
+            // 保存结果
+            log.error("Successful sent: {}", successCounter);
+            log.error("Failed sent: {}", errorCounter);
+        }
+    
+        @Override
+        public void configure(Map<String, ?> map) {
+        }
+    }
+    ```
+    ```java
+    @Slf4j
+    public class InterceptorProducer {
+        // 带回调函数的API
+        public static void main(String[] args) throws IOException {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            // 1.读取kafka生产者的配置信息 具体配置参数可参考ProducerConfig,CommonClientConfigs
+            Properties props = new Properties();
+            props.load(ClassLoader.getSystemResourceAsStream("newProducer.properties"));
+            // 1.1自定义分区器,可选
+            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "top.lvzhiqiang.testnewapi.CustomPartitioner");
+            // 1.2自定义拦截器链,可选
+            List<String> interceptors = new ArrayList<>();
+            interceptors.add("top.lvzhiqiang.testnewapi.CustomInterceptor");
+            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
+            // 2.创建producer对象
+            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+            // 3.发送数据
+            for (int i = 0; i < 10; i++) {
+                String value = "testnewapi" + i;
+                // 每条数据都要封装成一个ProducerRecord对象
+                producer.send(new ProducerRecord<>("test", value), new Callback() {
+                    // 回调函数,该方法会在Producer收到ack时调用,为异步调用。
+                    // 该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,反之说明消息发送失败。
+                    // 注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
+                    @Override
+                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+                        if (null == e) {
+                            log.info("分区:{},偏移量:{},值:{}", recordMetadata.partition(), recordMetadata.offset(), value);
+                        } else {
+                            log.error("{}", sdf.format(new Date()), e);
+                        }
+                    }
+                });
+            }
+            // 4.关闭资源 一定要关闭producer,这样才会调用interceptor的close方法
+            producer.close();
+        }
+    }
+    ```  
+
 ## kafka监控
 
+1. 修改kafka启动命令
+    ```
+    修改kafka-server-start.sh命令中
+        if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+            export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
+        fi
+    为
+        if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+            #export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
+            export KAFKA_HEAP_OPTS="-server -Xms512m -Xmx512m -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
+            export JMX_PORT="9999"  
+        fi
+    ```
+2. 下载[kafka-eagle](http://download.kafka-eagle.org)程序包
+    ```
+    wget -P /opt/setups -O kafka-eagle-1.4.7.tar.gz https://github.com/smartloli/kafka-eagle-bin/archive/v1.4.7.tar.gz
+    ```
+3. 解压到指定目录
+    ```
+    # mkdir -pv /usr/program
+    # tar -zxvf kafka-eagle-1.4.7.tar.gz -C /usr/program/
+    # cd /usr/program/kafka-eagle-bin-1.4.7
+    # tar -zxvf kafka-eagle-web-1.4.7-bin.tar.gz -C ../
+    # rm -rf ../kafka-eagle-bin-1.4.7
+    ```
+4. 配置环境变量
+    ```
+    [root@144 ~]# vim /etc/profile.d/my.sh
+    # kafka-eagle
+    KE_HOME=/usr/program/kafka-eagle-web-1.4.7
+    PATH=$KE_HOME/bin:$PATH
+    export KE_HOME
+    export PATH
+    [root@144 ~]# source /etc/profile.d/my.sh
+   ```
+5. 给启动文件执行权限
+    ```
+    # chmod +x ${KE_HOME}/bin/ke.sh
+    ```
+6. 修改配置文件
+    ```
+    [root@144 ~]# vim ${KE_HOME}/conf/system-config.properties
+    ######################################
+    # multi zookeeper&kafka cluster list
+    ######################################
+    kafka.eagle.zk.cluster.alias=cluster1
+    cluster1.zk.list=127.0.0.1:2181
+    ######################################
+    # kafka offset storage
+    ######################################
+    cluster1.kafka.eagle.offset.storage=kafka
+    ######################################
+    # enable kafka metrics
+    ######################################
+    kafka.eagle.metrics.charts=true
+    kafka.eagle.sql.fix.error=false
+    ######################################
+    # kafka jdbc driver address
+    ######################################
+    kafka.eagle.driver=com.mysql.jdbc.Driver
+    kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/kafka-ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
+    kafka.eagle.username=root
+    kafka.eagle.password=000000
+    ```
+7. 启动(需要先启动ZK以及KAFKA)
+    ```
+    [root@144 bin]# ./ke.sh start
+    Version 1.4.7 -- Copyright 2016-2020
+    *******************************************************************
+    * Kafka Eagle Service has started success.
+    * Welcome, Now you can visit 'http://ip:8048/ke'
+    * Account:admin ,Password:123456
+    *******************************************************************
+    * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
+    * <Usage> https://www.kafka-eagle.org/ </Usage>
+    *******************************************************************
+    ```
+8. 登录页面查看监控数据
+    ![抱歉,图片休息了](af-mq-kafka/af-mq-kafka-010.png)
+
+
 ## flume对接kafka
 
 ## 参考链接

二進制
source/_posts/af-mq-kafka/af-mq-kafka-010.png