kafka学习笔记

kafka的安装

  1. 解压
1
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
  1. 修改解压后的文件名,方便配置
1
mv kafka_2.11-0.11.0.0/ kafka
  1. 在/opt/module/kafka目录下创建logs文件夹、
1
mkdir logs
  1. 修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 vi server.properties

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
  1. hadoop02,hadoop03进行如上配置,并且修改broker.id=1,broker.id=2
  2. 启动集群
1
2
# 三台机器都需要启动
bin/kafka-server-start.sh config/server.properties &

kafka架构

Kafka是一个分布式消息队列,Kafka对消息的保存是基于Topic进行归类的,发送消息的称为Producer,接受消息的称为Consumer。Kafka的每个实例被称为broker。kafka集群依赖zookeeper集群来保存一些元数据.

  • Producer:消息生产者,就是向kafka broker发消息的客户端
  • Consumer:消息消费者,向kafka broker取消息的客户端
  • Topic:存储消息的队列
  • Consumer Group:一个消费者组,一个topic可以被多个消费者组中的消费者消费,topic中的每个分区只能被同一个消费者组中的一个消费者消费,但是可以被不同组的消费者消费
  • Broker:每一台kafka机器就是一个Broker
  • Partition:一个topic可以分为多个partition,每个partition拥有一部分的数据
  • Offset:数据的偏移量

命令操作

  • 查看当前服务器中的所有topic
1
kafka-topics.sh --zookeeper localhost:2181 --list
  • 创建topic
1
kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first
  • 删除topic
1
kafka-topics.sh --zookeeper localhost:2181 --delete --topic first
  • 发送消息
1
kafka-console-producer.sh --broker-list localhost:9092 --topic first
  • 消费消息
1
kafka-console-consumer.sh --zookeeper localhost:2181 --topic first
  • 查看topic状态
1
kafka-topics.sh --zookeeper localhost:2181 --describe --topic first

Producer写入流程

producer的写入流程需要根据设置的ack机制来进行数据的写入

ack=0,producer将消息发送到出去,不管发送的数据有没有成功写入到partition leader所在的磁盘,producer都不再关心这条数据。

ack=1,producer将消息发送出去,只要partition leader接受到消息并且写入到磁盘,就认为成功,不管其它follower有没有同步这条消息

ack=all,producer将消息发送出去,partition leader成功写入消息后要求ISR列表中的follower也将数据成功同步到自己的磁盘。

ISR列表(跟leader始终保持同步的列表)

存储策略

无论是否消费,kafka都会保留所有消息,有两种策略可以删除旧数据

1
2
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824

javaapi

consumer

不需要自行管理offset,分区,副本,系统通过zookeeper自行管理,消费者会根据上一次记录在zookeeper中的offset区接着获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyConsumer {

public static void main(String[] args) {

Properties props = new Properties();
// 定义 kakfa 服务的地址,不需要将所有 broker 指定上
props.put("bootstrap.servers", "hadoop03:9092");
// 制定 consumer group
props.put("group.id", "test");
// 是否自动确认 offset
props.put("enable.auto.commit", "true");
// 自动确认 offset 的时间间隔
props.put("auto.commit.interval.ms", "1000");
// key 的序列化类
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value 的序列化类
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String,String> consumer = new KafkaConsumer<String, String>(props);s
consumer.subscribe(Arrays.asList("first"));
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100).getSeconds());

for (ConsumerRecord<String, String> record : poll) {
System.out.println(record.topic()+"--"+record.partition()+"--"+record.offset()+"--"+record.value());
}
}
}
}

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class MyProducer {

public static void main(String[] args) {

Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop03:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
// key 序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// value 序列化
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class",MyPartitioner.class);
Producer<String,String> producer = new KafkaProducer<String, String>(props);

// for (int i=0;i<10;i++){
// producer.send(new ProducerRecord<String, String>("second",String.valueOf(i)));
// }
for (int i=0;i<10;i++){

producer.send(new ProducerRecord<String, String>("first", String.valueOf(i)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if(e==null){
System.out.println(recordMetadata.topic()+"=="+recordMetadata.partition()+"=="+recordMetadata.offset());
}
}
});
}
producer.close();
}
}

拦截器

实现在每条记录前加上时间戳并且统计成功与失败的次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 时间戳
public class TimeIntercepter implements ProducerInterceptor<String,String>{

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord(producerRecord.topic(),producerRecord.key(),System.currentTimeMillis()+" "+producerRecord.value());
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}

// 计算成功与失败的次数
public class CountInterceptor implements ProducerInterceptor {
private int successCount = 0;
private int failCount = 0;
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return new ProducerRecord(producerRecord.topic(),producerRecord.key(),System.currentTimeMillis()+" "+producerRecord.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null){
successCount++;
}else{
failCount++;
}
}
@Override
public void close() {
System.out.println("成功次数: "+successCount+" 失败次数:"+failCount);
}
@Override
public void configure(Map<String, ?> map) {
}
}
// 在producer端进行配置
ArrayList<String> list = new ArrayList<>();
list.add("interceptor.TimeIntercepter");
list.add("interceptor.CountInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,list);

Kafka与Flume的比较

  1. kafka和flume都是日志系统。kafka是一种消息队列,自带存储,提供push和pull存取数据功能。flume以管道流的方式实现数据的采集,分为agent(数据采集器)[source channel sink]。

  2. flume的数据采集部分做的很好,可以定制很多数据源,而Kafka更方便用于下游消费者众多的情况。

    flume与kafka的集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers =hadoop1:9092,hadoop2:9092,hadoo3:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
文章目录
  1. 1. kafka的安装
  2. 2. kafka架构
  3. 3. 命令操作
  4. 4. Producer写入流程
    1. 4.1. 存储策略
  5. 5. javaapi
    1. 5.1. consumer
    2. 5.2. producer
  6. 6. 拦截器
  7. 7. Kafka与Flume的比较
|
载入天数...载入时分秒...