【实验楼】Kafka快速上手教程
· 阅读需 5 分钟
kafka 介绍与实践
基本概念
topic
: Kafka处理的消息源
partition
: topic物理上的分组,partition中每条消息都会被分配一个有序的id(offset)
message
: 消息,通信的基本单位,每个producer可以向一个topic发布一些消息
broker
:缓存代理,Kafka集群中的一台或多台服务器统称为broker
leader
: 负责处理消息的读和写
replicas
: 副本节点
isr
: 正在服务中的节点
安装部署
启动hadoop:
$ hadoop/sbin/start-all.sh
启动zookeeper:
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
启动Kafka:
$ bin/kafka-server-start.sh config/server.properties &> /dev/null &
创建topic:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
列举topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
查看topic信息:
$ bun/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
启动producer与consumer:
$ /opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
$ /opt/kafka_2.10-0.8.1.1$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
多broker配置
准备多broker配置文件:
$ cp config/server.properties config/server-1.properties
$ cp config/server.properties config/server-2.properties
指定不同的broker.id
、port
和log.dir
使用 Java 操作 Kafka
将kafka的libs下的jar包加入build path
Producer代码:
package syl.kafka;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.*;
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<>(config);
for (int i = 1; i <= 100000; i++) {
producer.send(new KeyedMessage<String, String>("test",
"message:" + i));
}
}
}
Consumer代码:
package syl.kafka;
import java.util.*;
import kafka.consumer.*;
import kafka.javaapi.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.*;
public class ConsumerDemo {
private static final String topic = "test";
private static final Integer threds = 1;
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "group1");
props.put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
new Thread(new Runnable() {
@Override
public void run() {
for (MessageAndMetadata<byte[], byte[]> mm : kafkaStream) {
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
}
}
}