成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久

您的位置:首頁技術文章
文章詳情頁

Java kafka如何實現自定義分區類和攔截器

瀏覽:87日期:2022-08-31 13:14:07

生產者發送到對應的分區有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應的java api, 有多種參數)

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區算法的功能,由業務手動實現分布:

1、實現一個自定義分區類,CustomPartitioner實現Partitioner

import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { /** * * @param topic 當前的發送的topic * @param key 當前的key值 * @param keyBytes 當前的key的字節數組 * @param value 當前的value值 * @param valueBytes 當前的value的字節數組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據返回值就是分區號, 這邊就是固定發送到三號分區 return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}

2、producer配置文件指定,具體的分區類

// 具體的分區類props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner');

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;import java.util.UUID;public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println('這是MessageInterceptor的configure方法'); } /** * 這個是消息發送之前進行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創建一個新的record,把uuid入消息體的最前部 System.out.println('為消息添加uuid'); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),UUID.randomUUID().toString().replace('-', '') + ',' + record.value()); } /** * 這個是生產者回調函數調用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println('MessageInterceptor攔截器的onAcknowledgement方法'); } @Override public void close() { System.out.println('MessageInterceptor close 方法'); }}

2、定義計數攔截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println('這是CounterInterceptor的configure方法'); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println('CounterInterceptor計數過濾器不對消息做任何操作'); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統計成功和失敗的次數 System.out.println('CounterInterceptor過濾器執行統計失敗和成功數量'); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結果 System.out.println('Successful sent: ' + successCounter); System.out.println('Failed sent: ' + errorCounter); }}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務端的主機名和端口號 props.put('bootstrap.servers', 'localhost:9092'); // 等待所有副本節點的應答 props.put('acks', 'all'); // 消息發送最大嘗試次數 props.put('retries', 0); // 一批消息處理大小 props.put('batch.size', 16384); // 請求延時,可能生產數據太快了 props.put('linger.ms', 1); // 發送緩存區內存大小,數據是先放到生產者的緩沖區 props.put('buffer.memory', 33554432); // key序列化 props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // value序列化 props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // 具體的分區類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner'); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add('kafka.MessageInterceptor'); interceptors.add('kafka.CounterInterceptor'); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>('test_0515', i + '', 'xxx-' + i), new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println('這是producer回調函數');} }); } /*System.out.println('現在執行關閉producer'); producer.close();*/ producer.close(); }}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執行A的configure方法,執行B的configure方法

(2)執行A的onSend方法,B的onSend方法

(3)生產者發送完畢后,執行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行A的close方法,B的close方法。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Java
相關文章:
成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久
日韩视频一区| 欧美系列日韩一区| 亚洲欧美日韩视频二区| 中文字幕精品三区| 成人av高清在线| 欧美日韩国产高清一区二区| 亚洲国产精品嫩草影院| 亚洲国产欧美国产综合一区| 亚洲国产精品精华液2区45| 成人av综合在线| 在线播放日韩导航| 久久精品国产**网站演员| 国产精品一区二区三区观看| 国产精品电影一区二区三区| 欧美xxx在线观看| 日韩一区二区高清| 精久久久久久久久久久| 欧美日韩视频在线一区二区 | 婷婷综合五月天| 国产精品老牛| 亚洲一区视频在线观看视频| 在线一区亚洲| 亚洲影院免费观看| 中文精品视频一区二区在线观看| 亚洲欧洲日韩在线| 亚洲电影自拍| 亚洲久草在线视频| 日韩视频一区| 亚洲女与黑人做爰| 亚洲国产精品一区| 亚洲免费在线看| 亚洲作爱视频| 亚洲国产精品一区二区尤物区| 国产精品一区二区三区免费观看| 亚洲中国最大av网站| 国产偷久久久精品专区| 亚洲一本大道在线| 久久人人超碰| 加勒比av一区二区| 日韩一区二区在线看片| 粉嫩在线一区二区三区视频| wwww国产精品欧美| 欧美精品1区| 亚洲久草在线视频| 欧美亚洲自偷自偷| 日本v片在线高清不卡在线观看| 欧美在线小视频| 国产真实乱子伦精品视频| 欧美一区二区三区在| 成人亚洲一区二区一| 国产丝袜欧美中文另类| 精品不卡一区| 亚洲图片欧美一区| 欧美伊人久久久久久久久影院 | 亚洲午夜久久久久中文字幕久| 久久婷婷麻豆| 激情综合色综合久久| 日韩无一区二区| 欧美激情第六页| 亚洲精品国产第一综合99久久| 老司机午夜免费精品视频| 狠狠色狠狠色综合| 久久综合九色综合97_久久久| 欧美日产一区二区三区在线观看| 亚洲美女在线一区| 色诱亚洲精品久久久久久| 激情综合一区二区三区| 日韩一区二区在线看| 欧美三级黄美女| 亚洲成人一区二区在线观看| 欧美人成免费网站| 色综合色综合色综合色综合色综合 | 日韩**一区毛片| 欧美一级日韩不卡播放免费| 99久久久久久| 亚洲视频在线一区观看| 老司机久久99久久精品播放免费| 国产成人鲁色资源国产91色综| 中文在线资源观看网站视频免费不卡| 宅男噜噜噜66一区二区 | 日韩av一二三| 欧美大片一区二区| 亚洲精品极品| 美女精品一区二区| 久久综合色8888| 亚洲一区二区三区精品视频| 美女性感视频久久| 久久蜜桃av一区精品变态类天堂| 亚洲国产婷婷| 精品一区二区三区的国产在线播放| 久久日韩粉嫩一区二区三区| 国产九九精品| 国产黄色成人av| 亚洲日本韩国一区| 欧美日韩高清在线播放| 欧美日韩ab| 日本成人在线一区| 国产女主播一区| 色999日韩国产欧美一区二区| 成人精品gif动图一区| 亚洲精品日产精品乱码不卡| 8v天堂国产在线一区二区| 激情欧美一区| 精品一二三四在线| 国产精品成人免费| 欧美日韩国产另类一区| 亚洲无线一线二线三线区别av| 日韩av在线发布| 国产精品美女久久久久av爽李琼| 在线免费观看不卡av| 国产综合精品| 国产美女av一区二区三区| 亚洲欧美国产毛片在线| 日韩色在线观看| 国产精品手机在线| 99麻豆久久久国产精品免费| 天堂久久久久va久久久久| 国产亚洲一本大道中文在线| 久久久久网站| 午夜欧美精品| 精品中文字幕一区二区| 中文字幕一区在线观看| 欧美一级在线免费| 裸体一区二区| 国产精品第十页| 国产精品一区久久久久| 亚洲一区二区三区四区五区黄| 精品少妇一区二区三区在线播放| 久久久久欧美| 亚洲午夜精品一区二区| 国产夫妻精品视频| 亚洲18女电影在线观看| 国产网站一区二区| 欧美区在线观看| 亚洲一区黄色| 欧美日韩精品免费观看视频完整| 国产另类ts人妖一区二区| 亚洲bt欧美bt精品| 国产精品久久久久aaaa樱花| 91精品国产综合久久久久久久| 国产精品乱码| 欧美黄色aaaa| 国产999精品久久久久久绿帽| 视频一区二区不卡| 亚洲丝袜精品丝袜在线| 国产亚洲一区二区三区在线观看 | 国产精品理论片| 欧美mv日韩mv| 欧美酷刑日本凌虐凌虐| 亚洲欧美99| 在线成人h网| 91老师片黄在线观看| 国产精品99久久不卡二区| 日本va欧美va欧美va精品| 亚洲图片一区二区| 成人欧美一区二区三区| 日韩欧美色综合| 欧美在线观看一二区| 麻豆久久精品| 亚洲精品日韩久久| 99热精品一区二区| 国产成人午夜99999| 九九九久久久精品| 日韩国产一二三区| 亚洲一级电影视频| **欧美大码日韩| 国产精品污www在线观看| 精品91自产拍在线观看一区| 91精品国产综合久久福利| 欧美日韩中文字幕一区二区| 欧洲一区在线电影| 色婷婷久久99综合精品jk白丝| 国产精品视频免费观看| 亚洲免费久久| 一本色道久久综合亚洲精品高清 | 捆绑变态av一区二区三区| 日精品一区二区| 亚洲成a人片在线不卡一二三区| 亚洲精品成a人| 亚洲精品网站在线观看| 亚洲天天做日日做天天谢日日欢| 国产精品青草综合久久久久99| 国产色婷婷亚洲99精品小说| 久久久99久久精品欧美| 久久人人97超碰com| 国产午夜亚洲精品羞羞网站| 2020国产成人综合网| 精品日韩一区二区| 欧美成人一区二区三区| 精品日韩成人av| 久久久久国产一区二区三区四区| 精品国产一区a| 久久综合九色综合97婷婷女人| 精品久久久久久久久久久院品网 | 91在线国产观看| 欧美91视频| 国内精品美女在线观看| 亚洲私人影院| 99精品久久久| 亚洲精选91|