kafka的分区数
比如某个testTopic有24个分区,3台kafka集群机器。当kafka消费时不管是单线程还是多线程版,都不能保证读取的顺序性,是随机读取的,但时间不会相差太多。
单线程消费
一、精简版单线程消费的例子
使用high-level kafka api
Properties props = new Properties();
props.put("zookeeper.connect", "xxxx:2181");
props.put("zookeeper.connectiontimeout.ms", "1000000");
props.put("group.id", "test_group");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
//key--topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
StringBuffer sb = new StringBuffer();
while(it.hasNext()) {
try {
String msg = new String(it.next().message(), "utf-8").trim();
System.out.println("receive:" + msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
二、封装版单线程消费的例子
1. 配置文件consumer.properties
auto.offset.reset
kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费. 在consumter端配置文件中(或者是ConsumerConfig类参数)有个”autooffset.reset”(在kafka 0.8版本中为auto.offset.reset),有2个合法的值”largest”/”smallest”,默认为”largest”,此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
zookeeper.connect=xxxx:2181,xxx2:2181,xxx3:2181/kafka
zookeeper.session.timeout.ms=30000
zookeeper.sync.time.ms=2000
auto.commit.interval.ms=1000
auto.offset.reset=largest
serializer.class=kafka.serializer.StringEncoder
2. KafkaConsumerHighUtils
public class KafkaConsumerHighUtils {
private Logger logger = LoggerFactory.getLogger(KafkaConsumerHighUtils.class.getName());
private ConsumerConnector consumer;
private Properties consumerProp;
private Map<String, List<KafkaStream<String, String>>> consumerMap = null;
public KafkaConsumerHighUtils(){}
public KafkaConsumerHighUtils(String groupid) throws IOException {
logger.info("------初始化-----");
consumerProp= new Properties();
InputStream consumerStream = KafkaConsumerHighUtils.class.getClassLoader().getResourceAsStream("consumer.properties");
consumerProp.load(consumerStream);
consumerProp.put("group.id",groupid);
ConsumerConfig config = new ConsumerConfig(consumerProp);
consumer = Consumer.createJavaConsumerConnector(config);
logger.info("Consumer 初始化成功!");
}
/**
* @Description 获取kafka topic数据流
* @param topicMap topic信息
* @return Map<String, List<KafkaStream<String, String>>> topic对应kafka数据流
* @throws IOException
*/
public Map<String, List<KafkaStream<String, String>>> pullKafka(Map<String,Integer> topicMap) throws IOException {
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
try {
consumerMap = consumer.createMessageStreams(topicMap,keyDecoder,valueDecoder);
logger.info("pushKafka topic Thread信息:"+topicMap.keySet().toString());
logger.info("成功接收 "+consumerMap.size()+" 个topic数据!");
if (consumerMap != null){
return consumerMap;
}
} catch (Exception e) {
if (consumer != null){
consumer.shutdown();
}
e.printStackTrace();
return null;
}
return null;
}
}
2. getKafkaStream
private static String TOPIC="testTopic";
//获取kafka数据
public static KafkaStream<String, String> getKafkaStrea(){
KafkaStream<String, String> kafkaStream = null;
Map<String,Integer> topicMap = new HashMap<String,Integer>();
try {
KafkaConsumerHighUtils kafkaUtils = new KafkaConsumerHighUtils(SZ_KAFKA_GROUP);
// 指定topic信息
topicMap.put(TOPIC,1);
// 获取对应kafka流
Map<String, List<KafkaStream<String, String>>> topics = kafkaUtils.pullKafka(topicMap);
// 获取流失败时
if (topics == null){
logger.error("topic为空,请检查topic信息是否正确!");
}
// 获取对应topic kafka数据流进行返回
List<KafkaStream<String, String>> consumerList = topics.get(TOPIC );
if(consumerList != null){
kafkaStream = consumerList.get(0);
}
} catch (IOException e) {
e.printStackTrace();
}
return kafkaStream;
}
多线程消费
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.ConsumerConfig;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* kafka多线程消费实例
* Created by gongenbo on 2017/7/4.
*/
public class KafkaConsumer implements Runnable {
private ConsumerConfig consumerConfig;
private static String topic="testTopic";
Properties props;
final int a_numThreads = 24;
private ConsumerConnector consumer;
private Map<String, List<KafkaStream<String,String>>> consumerMap = null;
public KafkaConsumer() {
props = new Properties();
props.put("zookeeper.connect", "xxx1:2181,xxx2:2181,xxx3:2181/kafka");
props.put("group.id", "blog");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
List<KafkaStream<String,String>> streams = consumerMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
executor.submit(new KafkaConsumerThread(stream));
}
}
public static void main(String[] args) {
System.out.println(topic);
Thread t = new Thread(new KafkaConsumer());
t.start();
}
}
class KafkaConsumerThread implements Runnable {
private KafkaStream<String,String> stream;
public KafkaConsumerThread(KafkaStream<String,String> stream) {
this.stream = stream;
}
@Override
public void run() {
ConsumerIterator<String,String> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<String,String> mam = it.next();
System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
+ "offset[" + mam.offset() + "], " + new String(mam.message())+"统计条数");
}
}
}
您的打赏是对我最大的鼓励!