SpringBoot集成Kafka

1.概念

image-20240812121753344

2.新建项目

  • 新建空项目

image-20240813162111387

  • 然后新建模块Spingboot

image-20240813162212183

  • 添加依赖

image-20240813162252711

  • 如果发现不能构建maven,可手动添加

image-20240813162502476

3.编辑配置文件

  • application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
application:
# 应用名称
name: spring-boot-01-kafka-base

# kafka连接地址 ip+port
kafka:
bootstrap-servers: 192.168.100.129:9092

# 配置生产者(共24个)
# producer:

# 配置消费者(共24个)
# consumer:

4.发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.gaomu.producer;

import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
//加入了spring-kafka的依赖,+ yml配置文件已经自动配置好了kafka,自动装配好了kafkaTemplate这个Bean
@Resource
private KafkaTemplate kafkaTemplate;

public void sendEvent() {
kafkaTemplate.send("hello-topic", "hello kafka");
}
}

  • 测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.gaomu;

import com.gaomu.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringBoot01KafkaBaseApplicationTests {

@Resource
private EventProducer eventProducer;

@Test
void test01() {
eventProducer.sendEvent();
}

}

image-20240813165817903

5.消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.gaomu.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {
//采用监听的方式来接收时间(消息、数据)
@KafkaListener(topics = {"hello-topic"}, groupId = "hello-group")
public void onEvent(String event){
System.out.println("读到的事件 " + event);
}
}

  • 如果需要读取最早的消息需要则需要配置
1
2
consumer:
auto-offset-reset: earliest
  • 且如果该消费组之前已经消费过,则该配置无法生效,该配置,因此需要手动重置偏移量。或者使用新得消费组来消费,这时配置得earliest则可以生效。