探讨kafka的分区数与单线程多线程消费

2017/07/16 kafka 共 6499 字,约 19 分钟

kafka的分区数

比如某个testTopic有24个分区,3台kafka集群机器。当kafka消费时不管是单线程还是多线程版,都不能保证读取的顺序性,是随机读取的,但时间不会相差太多。

单线程消费

一、精简版单线程消费的例子

使用high-level kafka api


Properties props = new Properties();
props.put("zookeeper.connect", "xxxx:2181");
props.put("zookeeper.connectiontimeout.ms", "1000000");    
props.put("group.id", "test_group");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);


Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
//key--topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
        consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
StringBuffer sb = new StringBuffer();
while(it.hasNext()) {
    try {
        String msg = new String(it.next().message(), "utf-8").trim();
        System.out.println("receive:" + msg);
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    }
}

二、封装版单线程消费的例子

1. 配置文件consumer.properties

auto.offset.reset

kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费. 在consumter端配置文件中(或者是ConsumerConfig类参数)有个”autooffset.reset”(在kafka 0.8版本中为auto.offset.reset),有2个合法的值”largest”/”smallest”,默认为”largest”,此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.


zookeeper.connect=xxxx:2181,xxx2:2181,xxx3:2181/kafka
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
auto.commit.interval.ms=1000
auto.offset.reset=largest
serializer.class=kafka.serializer.StringEncoder

2. KafkaConsumerHighUtils

public class KafkaConsumerHighUtils {
    private Logger logger = LoggerFactory.getLogger(KafkaConsumerHighUtils.class.getName());

    private ConsumerConnector consumer;
    private Properties consumerProp;
    private Map<String, List<KafkaStream<String, String>>> consumerMap = null;

    public KafkaConsumerHighUtils(){}

    public KafkaConsumerHighUtils(String groupid) throws IOException {
        logger.info("------初始化-----");
        consumerProp= new Properties();
        InputStream consumerStream = KafkaConsumerHighUtils.class.getClassLoader().getResourceAsStream("consumer.properties");
        consumerProp.load(consumerStream);
        consumerProp.put("group.id",groupid);
        
        ConsumerConfig config = new ConsumerConfig(consumerProp);
        consumer = Consumer.createJavaConsumerConnector(config);
        logger.info("Consumer  初始化成功!");
    }

    /**
     * @Description 获取kafka  topic数据流
     * @param topicMap   topic信息
     * @return  Map<String, List<KafkaStream<String, String>>>  topic对应kafka数据流
     * @throws IOException
     */
    public Map<String, List<KafkaStream<String, String>>> pullKafka(Map<String,Integer> topicMap) throws IOException {

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        try {

            consumerMap = consumer.createMessageStreams(topicMap,keyDecoder,valueDecoder);

            logger.info("pushKafka  topic Thread信息:"+topicMap.keySet().toString());
            logger.info("成功接收 "+consumerMap.size()+" 个topic数据!");

            if (consumerMap != null){
                return consumerMap;
            }

        } catch (Exception e) {
            if (consumer != null){
                consumer.shutdown();
            }
            e.printStackTrace();
            return null;
        }
        return null;

    }

}

2. getKafkaStream

private static String TOPIC="testTopic";
//获取kafka数据
public static KafkaStream<String, String> getKafkaStrea(){

KafkaStream<String, String> kafkaStream = null;
Map<String,Integer> topicMap = new HashMap<String,Integer>();

try {

    KafkaConsumerHighUtils kafkaUtils = new KafkaConsumerHighUtils(SZ_KAFKA_GROUP);

    // 指定topic信息
    topicMap.put(TOPIC,1);
    // 获取对应kafka流
    Map<String, List<KafkaStream<String, String>>> topics = kafkaUtils.pullKafka(topicMap);

    // 获取流失败时
    if (topics == null){
        logger.error("topic为空,请检查topic信息是否正确!");
    }

    // 获取对应topic kafka数据流进行返回
    List<KafkaStream<String, String>> consumerList = topics.get(TOPIC );

    if(consumerList != null){
        kafkaStream = consumerList.get(0);
    }

} catch (IOException e) {
    e.printStackTrace();
}
return kafkaStream;
}

多线程消费


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.ConsumerConfig;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * kafka多线程消费实例
 * Created by gongenbo on 2017/7/4.
 */
public class KafkaConsumer implements Runnable {
    private ConsumerConfig consumerConfig;
    private static String topic="testTopic";
    Properties props;
    final int a_numThreads = 24;
    private ConsumerConnector consumer;
    private Map<String, List<KafkaStream<String,String>>> consumerMap = null;
    public KafkaConsumer() {
        props = new Properties();
        props.put("zookeeper.connect", "xxx1:2181,xxx2:2181,xxx3:2181/kafka");
        props.put("group.id", "blog");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        consumerConfig = new ConsumerConfig(props);
        consumer = Consumer.createJavaConsumerConnector(consumerConfig);
    }
    @Override
    public void run() {
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        List<KafkaStream<String,String>> streams = consumerMap.get(topic);
        ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
        for (final KafkaStream stream : streams) {
            executor.submit(new KafkaConsumerThread(stream));
        }
    }
    public static void main(String[] args) {
        System.out.println(topic);
        Thread t = new Thread(new KafkaConsumer());
        t.start();
    }
}
class KafkaConsumerThread implements Runnable {

    private KafkaStream<String,String> stream;

    public KafkaConsumerThread(KafkaStream<String,String> stream) {
        this.stream = stream;
    }
    @Override
    public void run() {
        ConsumerIterator<String,String> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<String,String> mam = it.next();
            System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
                    + "offset[" + mam.offset() + "], " + new String(mam.message())+"统计条数");
        }
    }
}
支付宝打赏 微信打赏

您的打赏是对我最大的鼓励!

Search

    Post Directory