Step By Step
1、kafka控制台创建公网类型实例
2、创建SpringBoot项目集成阿里云Kafka
3、发送接收测试
1.1 Kafka控制台创建实例
1.2 获取认证参数
二、创建SpringBoot项目集成阿里云Kafka2.1 创建Spring Boot(2.5.2)项目
2.2 依赖
properties java.version 1.8 /java.version /properties dependencies dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-web /artifactId /dependency dependency groupId org.springframework.kafka /groupId artifactId spring-kafka /artifactId /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-test /artifactId scope test /scope /dependency dependency groupId org.springframework.kafka /groupId artifactId spring-kafka-test /artifactId scope test /scope /dependency /dependencies
2.3 Sender.class
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private KafkaTemplate String, String template; public void send(String msg) { this.template.sendDefault("my_msg", msg); System.out.println("send message:" + msg); }
2.4 Receiver.class
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class Receiver { @KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic public void receiveMessage(ConsumerRecord String, String record) { System.out.println("Receive Message"); System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value()); }
2.5 KafkaController.class
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { @Autowired private Sender sender; @PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post public String send(@PathVariable("msg") String msg) { sender.send(msg); return msg; }
2.6 application.yml
spring: kafka: template: default-topic: topic bootstrap-servers: SSL接入点 jaas: enabled: true loginModule: org.apache.kafka.common.security.plain.PlainLoginModule options: username: 用户名 password: 密码 consumer: ssl: truststoreLocation: file:/kafka.client.truststore.jks properties: sasl.mechanism: PLAIN security.protocol: SASL_SSL ssl.endpoint.identification.algorithm: group-id: group max-poll-records: 2 producer: ssl: truststoreLocation: file:/kafka.client.truststore.jks retries: 3 acks: 1 compression-type: lz4 buffer-memory: 33554432 batch-size: 51200 properties: send.buffer.bytes: 262144 sasl.mechanism: PLAIN security.protocol: SASL_SSL ssl.endpoint.identification.algorithm:kafka.client.truststore.jks 下载地址,证书下载后直接放在C盘根目录下。
2.7 项目结构
三、发送接收测试3.1 启动项目,使用PostMan发送Post请求
3.2 项目日志
3.3 控制台消息监控查看
更多参考本文转自网络,原文链接:https://developer.aliyun.com/article/784990