SpringBoot 集成Kafka分析

无情 阅读:292 2021-03-31 12:46:31 评论:0

Kafka项目整体结构图:

父类项目:pom.xml 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
	<modelVersion>4.0.0</modelVersion> 
	<groupId>com.zzg</groupId> 
	<artifactId>kafka_learn</artifactId> 
	<version>0.0.1-SNAPSHOT</version> 
	<packaging>pom</packaging> 
 
	<parent> 
		<groupId>org.springframework.boot</groupId> 
		<artifactId>spring-boot-starter-parent</artifactId> 
		<version>2.1.2.RELEASE</version> 
	</parent> 
	 
	<!-- 集中定义管理依赖版本号 --> 
	<properties> 
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
		<java.version>1.8</java.version> 
		<commons-lang.version>2.6</commons-lang.version> 
		<commons-codec.version>1.10</commons-codec.version> 
		<commons-lang3.version>3.9</commons-lang3.version> 
		<commons-net.version>3.6</commons-net.version> 
		<commons-io.version>2.6</commons-io.version> 
		<commons-collections.version>3.2.1</commons-collections.version> 
		<commons-text.version>1.8</commons-text.version> 
		<common-fileupload.version>1.3.1</common-fileupload.version> 
	</properties> 
 
	 
	<dependencies> 
		<!--common-lang 常用工具包 --> 
		<dependency> 
			<groupId>commons-lang</groupId> 
			<artifactId>commons-lang</artifactId> 
			<version>${commons-lang.version}</version> 
		</dependency> 
		<!--commons-lang3 工具包 --> 
		<dependency> 
			<groupId>org.apache.commons</groupId> 
			<artifactId>commons-lang3</artifactId> 
			<version>${commons-lang3.version}</version> 
		</dependency> 
 
		<!--commons-codec 加密工具包 --> 
		<dependency> 
			<groupId>commons-codec</groupId> 
			<artifactId>commons-codec</artifactId> 
			<version>${commons-codec.version}</version> 
		</dependency> 
		<!--commons-net 网络工具包 --> 
		<dependency> 
			<groupId>commons-net</groupId> 
			<artifactId>commons-net</artifactId> 
			<version>${commons-net.version}</version> 
		</dependency> 
		<!--common-io 工具包 --> 
		<dependency> 
			<groupId>commons-io</groupId> 
			<artifactId>commons-io</artifactId> 
			<version>${commons-io.version}</version> 
		</dependency> 
		<!--common-collection 工具包 --> 
		<dependency> 
			<groupId>commons-collections</groupId> 
			<artifactId>commons-collections</artifactId> 
			<version>${commons-collections.version}</version> 
		</dependency> 
		<!--common-fileupload 工具包 --> 
		<dependency> 
			<groupId>commons-fileupload</groupId> 
			<artifactId>commons-fileupload</artifactId> 
			<version>${common-fileupload.version}</version> 
		</dependency> 
		<!-- common-text 工具包 --> 
		<dependency> 
			<groupId>org.apache.commons</groupId> 
			<artifactId>commons-text</artifactId> 
			<version>${commons-text.version}</version> 
		</dependency> 
		<!--lombok --> 
		<dependency> 
			<groupId>org.projectlombok</groupId> 
			<artifactId>lombok</artifactId> 
		</dependency> 
	</dependencies> 
 
 
	<modules> 
		<module>kafka-cluster-producer</module> 
		<module>kafka-cluster-consumer</module> 
	</modules> 
</project>

kafka 生成者 pom.xml 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
	<modelVersion>4.0.0</modelVersion> 
	<parent> 
		<groupId>com.zzg</groupId> 
		<artifactId>kafka_learn</artifactId> 
		<version>0.0.1-SNAPSHOT</version> 
	</parent> 
	<artifactId>kafka-cluster-producer</artifactId> 
 
	<dependencies> 
		<!--starter --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter</artifactId> 
		</dependency> 
		<!-- test --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-test</artifactId> 
			<scope>test</scope> 
		</dependency> 
		<!--web --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-web</artifactId> 
		</dependency> 
		<!--validation --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-validation</artifactId> 
		</dependency> 
		<!-- kafka 集成 --> 
		<dependency> 
			<groupId>org.springframework.kafka</groupId> 
			<artifactId>spring-kafka</artifactId> 
			<version>2.2.9.RELEASE</version> 
		</dependency> 
	</dependencies> 
</project>

kafka 生成者之程序入口:

package com.zzg; 
 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.ConfigurableApplicationContext; 
 
import com.fasterxml.jackson.core.JsonProcessingException; 
import com.zzg.component.KafkaProducer; 
 
@SpringBootApplication 
public class ProducerApplication { 
 
	public static void main(String[] args) { 
		// TODO Auto-generated method stub 
		ConfigurableApplicationContext context = SpringApplication.run(ProducerApplication.class, args); 
		 
		// 模拟消息发送 
		KafkaProducer sender = context.getBean(KafkaProducer.class); 
        for (int i = 0; i < 3; i++) { 
            try { 
				sender.send(); 
			} catch (JsonProcessingException e1) { 
				// TODO Auto-generated catch block 
				e1.printStackTrace(); 
			} 
 
            try { 
                Thread.sleep(3_000); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
	} 
 
} 

kafka 生成者之核心代码:

package com.zzg.component; 
 
import java.util.Date; 
import java.util.UUID; 
 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.kafka.core.KafkaTemplate; 
import org.springframework.stereotype.Component; 
 
import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.zzg.message.Message; 
 
@Component 
public class KafkaProducer { 
	  @Autowired 
      private KafkaTemplate<String, String> kafkaTemplate; 
	   
	   
	//发送消息方法 
      public void send() throws JsonProcessingException { 
    	  ObjectMapper objectMapper = new ObjectMapper(); 
    	   
          Message message = new Message(); 
          message.setId(System.currentTimeMillis()); 
          message.setMsg(UUID.randomUUID().toString()); 
          message.setSendTime(new Date()); 
          
          String content = objectMapper.writeValueAsString(message); 
          // rourou为主题 
          kafkaTemplate.send("rourou", content); 
           
          
      } 
 
} 

kafka 生成者之消息实体对象:

package com.zzg.message; 
 
import java.io.Serializable; 
import java.util.Date; 
 
import lombok.Data; 
 
@SuppressWarnings("serial") 
@Data 
public class Message implements Serializable { 
	 private Long id;    //id 
     private String msg; //消息 
     private Date sendTime;  //时间戳 
} 

kafka 生成者配置文件(application.properties)

# 指定服务端口 
server.port=8096 
# kafka 配置 
spring.kafka.bootstrap-servers=http://192.168.1.73:9097,http://192.168.1.73:9098,http://192.168.1.73:9099 
spring.kafka.producer.retries=0 
spring.kafka.producer.batch-size=16384 
spring.kafka.producer.buffer-memory=33554432 
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 

kafka 消费者 pom.xml 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
	<modelVersion>4.0.0</modelVersion> 
	<parent> 
		<groupId>com.zzg</groupId> 
		<artifactId>kafka_learn</artifactId> 
		<version>0.0.1-SNAPSHOT</version> 
	</parent> 
	<artifactId>kafka-cluster-consumer</artifactId> 
 
	<dependencies> 
		<!--starter --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter</artifactId> 
		</dependency> 
		<!-- test --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-test</artifactId> 
			<scope>test</scope> 
		</dependency> 
		<!--web --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-web</artifactId> 
		</dependency> 
		<!--validation --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-validation</artifactId> 
		</dependency> 
		<!-- kafka 集成 --> 
		<dependency> 
			<groupId>org.springframework.kafka</groupId> 
			<artifactId>spring-kafka</artifactId> 
			<version>2.2.9.RELEASE</version> 
		</dependency> 
	</dependencies> 
</project>

kafka 消费者之程序入口

package com.zzg; 
 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
 
@SpringBootApplication 
public class ConsumerApplication { 
 
	public static void main(String[] args) { 
		// TODO Auto-generated method stub 
		SpringApplication.run(ConsumerApplication.class, args); 
		System.out.println("============= SpringBoot kafka Project Start Success ============="); 
	} 
 
} 

kafka 消费者之核心代码:

package com.zzg.component; 
 
import java.util.Optional; 
 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.stereotype.Component; 
 
@Component 
public class KafkaConsumer { 
	 @KafkaListener(topics = {"rourou"}) 
	    public void listen(ConsumerRecord<?, ?> record) { 
	        Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 
	        if (kafkaMessage.isPresent()) { 
	            Object message = kafkaMessage.get(); 
 
	            System.out.println("11111111111111record =" + record); 
	            System.out.println("22222222222222222message =" + message); 
	        } 
	    } 
 
} 

kafka 消费者配置文件(application.properties)

# 指定服务端口 
server.port=8097 
# kafka 配置 
spring.kafka.bootstrap-servers=http://192.168.1.73:9097,http://192.168.1.73:9098,http://192.168.1.73:9099 
spring.kafka.consumer.group-id=myGroup 
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 

Spring Boot2.x + Kafka 集成效果展示:

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
KIKK导航

KIKK导航

排行榜
关注我们

一个IT知识分享的公众号