docker-compose 安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
version: '3'
services:
zookeeper:
image: bitnami/zookeeper:3.8.0
user: root
container_name: zookeeper
ports:
- 2181:2181
environment:
# 时区
- TZ=Asia/Shanghai
# 允许匿名登录
- ALLOW_ANONYMOUS_LOGIN=yes
# zk在集群中的序号(1~255
- ZOO_SERVER_ID=1
# 端口
- ZOO_PORT_NUMBER=2181
volumes:
- ./zk:/bitnami/zookeeper
kafka:
image: bitnami/kafka:3.2.1
user: root
container_name: kafka
ports:
- 9092:9092
environment:
- TZ=Asia/Shanghai
# broker id>=0
- KAFKA_BROKER_ID=0
# kk配置zk连接
- KAFKA_CFG_ZOOKEEPER_CONNECT=192.168.56.100:2181
# 允许使用PLAINTEXT协议
- ALLOW_PLAINTEXT_LISTENER=yes
# kk配置监听器
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
# kk配置发布到zk的监听器
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.100:9092
volumes:
- ./kk/data:/bitnami/kafka/data
depends_on:
- zookeeper
kafka-manager:
image: sheepkiller/kafka-manager:stable
container_name: kafka-manager
ports:
- 9000:9000
environment:
# zk地址
- ZK_HOSTS=192.168.56.100:2181
# 应用秘钥
- APPLICATION_SECRET=km_secret
# km用户名
- KAFKA_MANAGER_USERNAME=kkm_user
# km密码
- KAFKA_MANAGER_PASSWORD=kkm_pwd
depends_on:
- kafka

使用

kafka Manager

在这里插入图片描述
填写信息
在这里插入图片描述
成功
在这里插入图片描述
go to cluster view

新建主题

在这里插入图片描述
在这里插入图片描述

查看kafka状态

查看topic情况

1
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test

在这里插入图片描述

查看消息内容

1
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

在这里插入图片描述

查看消费情况

1
./bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group zhTestGroup

在这里插入图片描述

SpringBoot整合

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
在这里插入代码片
server:
port: 8080

spring:
application:
name: kafka
kafka:
bootstrap-servers: 192.168.56.100:9092 # kafka集群信息
producer: # 生产者配置
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送 # 重试次数
batch-size: 16384 #16K
buffer-memory: 33554432 #32M
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# # 批量处理的最大大小 单位 byte
# batch-size: 4096
# # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# buffer-memory: 33554432
# # 客户端ID
# client-id: hello-kafka
# # 消息压缩:none、lz4、gzip、snappy,默认为 none。
# compression-type: gzip


consumer:
group-id: zhTestGroup # 消费者组
enable-auto-commit: false # 关闭自动提交
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# # 自动提交的频率 单位 ms
# auto-commit-interval: 1000
# # 批量消费最大数量
# max-poll-records: 100

listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package com.jhj.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* kafka 生产服务
*
* @author Leo
* @create 2020/12/31 16:06
**/
@Slf4j
@Service
public class KafkaProducerService {

@Resource
private KafkaTemplate<String, String> kafkaTemplate;

@Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;

/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {

//可以指定最长等待时间,也可以不指定
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
//指定key,kafka根据key进行hash,决定存入哪个partition
// kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
//存入指定partition
// kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
}

/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {

SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}

/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
//添加回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onFailure(Throwable throwable) {

log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
}

@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {

log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
}
});
}

/**
* 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
* @param topic
* @param key
* @param message
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {

// 组装消息
Message msg = MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX,"kafka_")
.build();
//同步发送
kafkaTemplate.send(msg).get();
// 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}

/**
* 以事务方式发送消息
* @param topic
* @param key
* @param message
*/
public void sendMessageInTransaction(String topic, String key, String message) {

kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {

@Override
public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {

kafkaOperations.send(topic, key, message);
//出现异常将会中断事务,消息不会发送出去
throw new RuntimeException("12");
}
});
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.jhj.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* kafka 消费服务
*
* @author Leo
* @create 2020/12/31 16:06
**/
@Slf4j
@Service
public class KafkaConsumerService {


/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息
*/
@KafkaListener(id = "consumerSingle", topics = "kafka-test-topic")
public void consumerSingle(String message,Acknowledgment ack) {

log.info("consumerSingle ====> message: {}", message);
ack.acknowledge();
}


/* @KafkaListener(id = "consumerBatch", topicPartitions = {
@TopicPartition(topic = "hello-batch1", partitions = "0"),
@TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4"))
})*/
/**
* 批量消费消息
* @param messages
*/
@KafkaListener(id = "consumerBatch", topics = "test-batch")
public void consumerBatch(List<ConsumerRecord<String, String>> messages) {

log.info("consumerBatch =====> messageSize: {}", messages.size());
log.info(messages.toString());
}

/**
* 指定消费异常处理器
* @param message
*/
@KafkaListener(id = "consumerException", topics = "kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler")
public void consumerException(String message) {

throw new RuntimeException("consumer exception");
}

/**
* 验证ConsumerInterceptor
* @param message
*/
@KafkaListener(id = "interceptor", topics = "consumer-interceptor")
public void consumerInterceptor(String message) {

log.info("consumerInterceptor ====> message: {}", message);
}



//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
@KafkaListener(topics = "test", groupId = "zhTestGroup")
public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {

String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();
}
}

测试项目

https://gitee.com/jhj159/kafka.githttps://gitee.com/jhj159/kafka.git

作者声明

1
如有问题,欢迎指正!