当前位置: 首页 > news >正文

Spring Boot 整合第三方组件:Redis、MyBatis、Kafka 实战

🚀 Spring Boot 整合第三方组件:Redis、MyBatis、Kafka 实战

文章目录

  • 🚀 Spring Boot 整合第三方组件:Redis、MyBatis、Kafka 实战
  • 🎯 一、Spring Boot Starter 设计哲学
    • 💡 约定优于配置的核心思想
    • 🔧 自动装配机制揭秘
  • 🔴 二、Redis 整合实战
    • 📦 依赖配置与自动装配
    • ⚙️ Redis 配置类
    • 💻 Redis 操作实战
    • 🔍 Redis 健康检查与监控
  • 🗄️ 三、MyBatis 整合实战
    • 📦 MyBatis 依赖配置
    • 🏗️ 数据层架构设计
    • 💻 Service 层实现
    • 🔧 MyBatis 配置优化
  • 📨 四、Kafka 整合实战
    • 📦 Kafka 依赖配置
    • 🔧 Kafka 配置类
    • 💻 消息生产者实现
    • 👂 消息消费者实现
    • 🔄 邮件服务模拟
    • 🔍 Kafka 健康检查与监控
  • 💡 五、总结与扩展阅读
    • 🎯 整合成果总结
    • 🚀 性能优化建议

🎯 一、Spring Boot Starter 设计哲学

💡 约定优于配置的核心思想

​​传统整合 vs Spring Boot Starter 对比​​:

方面传统方式Spring Boot Starter优势 / 成果
依赖管理手动维护版本、易冲突起步依赖(Starter)自动聚合与版本对齐版本冲突减少 80%,依赖升级更安全
配置复杂度需大量 XML 或 Java 配置自动配置 + 合理默认值(约定优于配置)配置代码减少 70%,开发效率提升
启动速度手动加载配置,启动较慢条件化自动装配,按需加载模块启动时间缩短 50%,资源占用更优
维护成本配置分散,项目间不一致统一 Starter 标准配置与依赖管理维护效率提升 60%,团队协作更顺畅

🔧 自动装配机制揭秘

​​Spring Boot 自动配置流程​​:

graph TBA[启动类] --> B[@SpringBootApplication]B --> C[@EnableAutoConfiguration]C --> D[spring.factories]D --> E[自动配置类]E --> F[条件注解检查]F --> G[创建Bean]G --> H[完成整合]style E fill:#bbdefb,stroke:#333style F fill:#c8e6c9,stroke:#333

​​条件注解工作原理示例​​:

@Configuration
@ConditionalOnClass(RedisTemplate.class)  // 类路径存在时生效
@ConditionalOnProperty(prefix = "spring.redis", name = "enabled", matchIfMissing = true)
@EnableConfigurationProperties(RedisProperties.class)  // 绑定配置
public class RedisAutoConfiguration {@Bean@ConditionalOnMissingBean  // 容器中不存在时创建public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);return template;}
}

🔴 二、Redis 整合实战

📦 依赖配置与自动装配

​​Maven 依赖配置​​:

<!-- pom.xml -->
<dependencies><!-- Redis Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 连接池 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><!-- JSON 序列化 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>

​​application.yml 配置​​:

# application.yml
spring:redis:# 连接配置host: localhostport: 6379password: 123456database: 0timeout: 2000ms# 连接池配置lettuce:pool:max-active: 20      # 最大连接数max-idle: 10       # 最大空闲连接min-idle: 5        # 最小空闲连接max-wait: 1000ms   # 获取连接最大等待时间# 集群配置(可选)# cluster:#   nodes: #     - 192.168.1.101:6379#     - 192.168.1.102:6379

⚙️ Redis 配置类

​​自定义 Redis 配置​​:

@Configuration
@Slf4j
public class RedisConfig {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;/*** 自定义 RedisTemplate,解决序列化问题*/@Beanpublic RedisTemplate<String, Object> redisTemplate() {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);// 使用 Jackson2JsonRedisSerializer 替换默认序列化Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.activateDefaultTyping(LazyIterator.class, ObjectMapper.DefaultTyping.NON_FINAL);serializer.setObjectMapper(objectMapper);// 设置 key 和 value 的序列化规则template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(serializer);template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(serializer);template.afterPropertiesSet();log.info("RedisTemplate 配置完成");return template;}/*** 缓存管理器配置*/@Beanpublic RedisCacheManager cacheManager(RedisConnectionFactory factory) {RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30))  // 默认缓存30分钟.disableCachingNullValues()         // 不缓存null值.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));return RedisCacheManager.builder(factory).cacheDefaults(config).build();}
}

💻 Redis 操作实战

​​基础数据操作服务​​:

@Service
@Slf4j
public class RedisOperationService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 字符串操作*/public void stringOperations() {// 1. 设置值redisTemplate.opsForValue().set("user:1001:name", "张三");redisTemplate.opsForValue().set("user:1001:age", 25, Duration.ofMinutes(30));// 2. 获取值String name = (String) redisTemplate.opsForValue().get("user:1001:name");Integer age = (Integer) redisTemplate.opsForValue().get("user:1001:age");log.info("用户信息 - 姓名: {}, 年龄: {}", name, age);// 3. 原子操作Long increment = redisTemplate.opsForValue().increment("counter", 1);log.info("计数器值: {}", increment);}/*** Hash 操作*/public void hashOperations() {String key = "user:1001:profile";// 1. 设置Hash字段Map<String, Object> userMap = new HashMap<>();userMap.put("name", "李四");userMap.put("age", 30);userMap.put("email", "lisi@example.com");redisTemplate.opsForHash().putAll(key, userMap);// 2. 获取单个字段String name = (String) redisTemplate.opsForHash().get(key, "name");log.info("Hash字段 name: {}", name);// 3. 获取所有字段Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);log.info("完整用户信息: {}", entries);}/*** List 操作*/public void listOperations() {String key = "recent:users";// 1. 从左端插入redisTemplate.opsForList().leftPush(key, "user1");redisTemplate.opsForList().leftPush(key, "user2");redisTemplate.opsForList().leftPush(key, "user3");// 2. 获取范围List<Object> recentUsers = redisTemplate.opsForList().range(key, 0, 2);log.info("最近访问用户: {}", recentUsers);}/*** Set 操作*/public void setOperations() {String key = "user:tags:1001";// 1. 添加元素redisTemplate.opsForSet().add(key, "vip", "active", "new");// 2. 判断元素是否存在Boolean isVip = redisTemplate.opsForSet().isMember(key, "vip");log.info("是否是VIP: {}", isVip);// 3. 获取所有元素Set<Object> tags = redisTemplate.opsForSet().members(key);log.info("用户标签: {}", tags);}
}

​​缓存注解实战​​:

@Service
@Slf4j
public class UserService {/*** 缓存用户信息*/@Cacheable(value = "users", key = "#userId", unless = "#result == null")public User getUserById(Long userId) {log.info("查询数据库获取用户: {}", userId);// 模拟数据库查询return findUserFromDB(userId);}/*** 更新缓存*/@CachePut(value = "users", key = "#user.id")public User updateUser(User user) {log.info("更新用户信息: {}", user.getId());// 模拟数据库更新return updateUserInDB(user);}/*** 删除缓存*/@CacheEvict(value = "users", key = "#userId")public void deleteUser(Long userId) {log.info("删除用户: {}", userId);// 模拟数据库删除deleteUserFromDB(userId);}/*** 复杂缓存配置*/@Caching(cacheable = {@Cacheable(value = "user_detail", key = "#userId")},put = {@CachePut(value = "user_profile", key = "#userId")})public User getUserDetail(Long userId) {log.info("获取用户详情: {}", userId);return findUserDetailFromDB(userId);}// 模拟数据库操作private User findUserFromDB(Long userId) {// 实际项目中这里会查询数据库return new User(userId, "用户" + userId, "user" + userId + "@example.com");}private User updateUserInDB(User user) {return user;}private void deleteUserFromDB(Long userId) {// 删除操作}private User findUserDetailFromDB(Long userId) {return new User(userId, "详情用户" + userId, "detail" + userId + "@example.com");}@Data@AllArgsConstructorpublic static class User {private Long id;private String name;private String email;}
}

🔍 Redis 健康检查与监控

​​Redis 健康指示器​​:

@Component
public class RedisHealthIndicator implements HealthIndicator {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Overridepublic Health health() {try {// 测试Redis连接RedisConnection connection = redisConnectionFactory.getConnection();try {String result = connection.ping();if ("PONG".equals(result)) {return Health.up().withDetail("server", "redis").withDetail("version", getRedisVersion(connection)).withDetail("status", "connected").build();} else {return Health.down().withDetail("error", "Unexpected ping response: " + result).build();}} finally {connection.close();}} catch (Exception e) {return Health.down(e).withDetail("error", "Redis connection failed: " + e.getMessage()).build();}}private String getRedisVersion(RedisConnection connection) {try {Properties info = connection.info("server");return info.getProperty("redis_version");} catch (Exception e) {return "unknown";}}
}

🗄️ 三、MyBatis 整合实战

📦 MyBatis 依赖配置

​​Maven 依赖​​:


<!-- pom.xml -->
<dependencies><!-- MyBatis Starter --><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version></dependency><!-- MySQL 驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- 数据源 --><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId></dependency>
</dependencies>

​​数据库配置​​:

# application.yml
spring:datasource:# 数据源配置url: jdbc:mysql://localhost:3306/spring_boot_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: 123456driver-class-name: com.mysql.cj.jdbc.Driver# Hikari 连接池配置hikari:maximum-pool-size: 20minimum-idle: 5connection-timeout: 30000idle-timeout: 300000max-lifetime: 1200000# MyBatis 配置
mybatis:#  mapper 文件位置mapper-locations: classpath:mapper/*.xml# 别名扫描包type-aliases-package: com.example.entity# 开启驼峰命名转换configuration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 全局配置global-config:db-config:id-type: autologic-delete-field: deleted  # 逻辑删除字段logic-delete-value: 1        # 逻辑已删除值logic-not-delete-value: 0    # 逻辑未删除值

🏗️ 数据层架构设计

​​实体类定义​​:

@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("user")  // MyBatis-Plus 注解,标准 MyBatis 可省略
public class User {@TableId(type = IdType.AUTO)  // 主键自增private Long id;private String name;private String email;private Integer age;@TableField(fill = FieldFill.INSERT)  // 插入时自动填充private LocalDateTime createTime;@TableField(fill = FieldFill.INSERT_UPDATE)  // 插入和更新时填充private LocalDateTime updateTime;@TableLogic  // 逻辑删除标识private Integer deleted;
}

​​Mapper 接口定义​​:

@Mapper
@Repository
public interface UserMapper {/*** 根据ID查询用户*/@Select("SELECT * FROM user WHERE id = #{id} AND deleted = 0")User selectById(Long id);/*** 查询所有用户*/@Select("SELECT * FROM user WHERE deleted = 0 ORDER BY create_time DESC")List<User> selectAll();/*** 插入用户*/@Insert("INSERT INTO user(name, email, age, create_time, update_time) " +"VALUES(#{name}, #{email}, #{age}, NOW(), NOW())")@Options(useGeneratedKeys = true, keyProperty = "id")int insert(User user);/*** 更新用户*/@Update("UPDATE user SET name=#{name}, email=#{email}, age=#{age}, " +"update_time=NOW() WHERE id=#{id} AND deleted = 0")int update(User user);/*** 逻辑删除用户*/@Update("UPDATE user SET deleted=1, update_time=NOW() WHERE id=#{id}")int deleteById(Long id);/*** 根据邮箱查询用户*/@Select("SELECT * FROM user WHERE email = #{email} AND deleted = 0")User selectByEmail(String email);/*** 分页查询用户*/@Select("SELECT * FROM user WHERE deleted = 0 ORDER BY create_time DESC " +"LIMIT #{offset}, #{pageSize}")List<User> selectByPage(@Param("offset") int offset, @Param("pageSize") int pageSize);
}

​​XML Mapper 配置​​:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.example.mapper.UserMapper"><!-- 自定义结果映射 --><resultMap id="UserResultMap" type="User"><id column="id" property="id" /><result column="name" property="name" /><result column="email" property="email" /><result column="age" property="age" /><result column="create_time" property="createTime" /><result column="update_time" property="updateTime" /><result column="deleted" property="deleted" /></resultMap><!-- 复杂查询:根据条件动态查询用户 --><select id="selectByCondition" parameterType="map" resultMap="UserResultMap">SELECT * FROM user WHERE deleted = 0<if test="name != null and name != ''">AND name LIKE CONCAT('%', #{name}, '%')</if><if test="minAge != null">AND age >= #{minAge}</if><if test="maxAge != null">AND age &lt;= #{maxAge}</if><if test="email != null and email != ''">AND email = #{email}</if>ORDER BY create_time DESC</select><!-- 批量插入用户 --><insert id="batchInsert" parameterType="list">INSERT INTO user (name, email, age, create_time, update_time) VALUES<foreach collection="list" item="user" separator=",">(#{user.name}, #{user.email}, #{user.age}, NOW(), NOW())</foreach></insert><!-- 统计用户数量 --><select id="countUsers" resultType="int">SELECT COUNT(*) FROM user WHERE deleted = 0</select></mapper>

💻 Service 层实现

​​业务服务类​​:

@Service
@Slf4j
@Transactional
public class UserService {@Autowiredprivate UserMapper userMapper;/*** 根据ID查询用户*/public User getUserById(Long id) {log.info("查询用户: {}", id);User user = userMapper.selectById(id);if (user == null) {throw new RuntimeException("用户不存在");}return user;}/*** 查询所有用户*/public List<User> getAllUsers() {log.info("查询所有用户");return userMapper.selectAll();}/*** 创建用户*/public User createUser(User user) {log.info("创建用户: {}", user.getName());// 检查邮箱是否已存在User existingUser = userMapper.selectByEmail(user.getEmail());if (existingUser != null) {throw new RuntimeException("邮箱已存在");}int result = userMapper.insert(user);if (result > 0) {return user;} else {throw new RuntimeException("创建用户失败");}}/*** 更新用户*/public User updateUser(User user) {log.info("更新用户: {}", user.getId());// 检查用户是否存在User existingUser = userMapper.selectById(user.getId());if (existingUser == null) {throw new RuntimeException("用户不存在");}int result = userMapper.update(user);if (result > 0) {return getUserById(user.getId());} else {throw new RuntimeException("更新用户失败");}}/*** 删除用户(逻辑删除)*/public void deleteUser(Long id) {log.info("删除用户: {}", id);User existingUser = userMapper.selectById(id);if (existingUser == null) {throw new RuntimeException("用户不存在");}int result = userMapper.deleteById(id);if (result == 0) {throw new RuntimeException("删除用户失败");}}/*** 分页查询用户*/public PageResult<User> getUsersByPage(int pageNum, int pageSize) {log.info("分页查询用户: 页码={}, 大小={}", pageNum, pageSize);int offset = (pageNum - 1) * pageSize;List<User> users = userMapper.selectByPage(offset, pageSize);int total = userMapper.countUsers();return new PageResult<>(users, total, pageNum, pageSize);}/*** 条件查询用户*/public List<User> getUsersByCondition(String name, Integer minAge, Integer maxAge, String email) {log.info("条件查询用户: name={}, minAge={}, maxAge={}, email={}", name, minAge, maxAge, email);Map<String, Object> params = new HashMap<>();params.put("name", name);params.put("minAge", minAge);params.put("maxAge", maxAge);params.put("email", email);return userMapper.selectByCondition(params);}/*** 分页结果封装类*/@Data@AllArgsConstructorpublic static class PageResult<T> {private List<T> data;private long total;private int pageNum;private int pageSize;private int totalPages;public PageResult(List<T> data, long total, int pageNum, int pageSize) {this.data = data;this.total = total;this.pageNum = pageNum;this.pageSize = pageSize;this.totalPages = (int) Math.ceil((double) total / pageSize);}}
}

🔧 MyBatis 配置优化

​​MyBatis 配置类​​:

@Configuration
@MapperScan("com.example.mapper")  // 扫描Mapper接口
@Slf4j
public class MyBatisConfig {/*** 配置MyBatis SqlSessionFactory*/@Bean@ConfigurationProperties(prefix = "mybatis.configuration")public org.apache.ibatis.session.Configuration globalConfiguration() {org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();configuration.setMapUnderscoreToCamelCase(true);configuration.setLogImpl(StdOutImpl.class);configuration.setCacheEnabled(true);return configuration;}/*** 分页插件配置*/@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();// 分页插件PaginationInnerInterceptor paginationInterceptor = new PaginationInnerInterceptor();paginationInterceptor.setMaxLimit(1000L);  // 单页最大记录数paginationInterceptor.setOverflow(true);   // 超过最大页数时返回第一页interceptor.addInnerInterceptor(paginationInterceptor);// 乐观锁插件(可选)OptimisticLockerInnerInterceptor optimisticLockerInterceptor = new OptimisticLockerInnerInterceptor();interceptor.addInnerInterceptor(optimisticLockerInterceptor);log.info("MyBatis 插件配置完成");return interceptor;}/*** 元对象处理器(自动填充字段)*/@Beanpublic MetaObjectHandler metaObjectHandler() {return new MetaObjectHandler() {@Overridepublic void insertFill(MetaObject metaObject) {this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now());this.strictInsertFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());}@Overridepublic void updateFill(MetaObject metaObject) {this.strictUpdateFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());}};}
}

📨 四、Kafka 整合实战

📦 Kafka 依赖配置

​​Maven 依赖​​:

<!-- pom.xml -->
<dependencies><!-- Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- JSON 支持 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>

​​Kafka 配置​​:

# application.yml
spring:kafka:# Kafka 服务器配置bootstrap-servers: localhost:9092# 生产者配置producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializeracks: all  # 所有副本确认retries: 3  # 重试次数batch-size: 16384  # 批量大小buffer-memory: 33554432  # 缓冲区大小properties:linger.ms: 10  # 发送延迟# 消费者配置consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializergroup-id: spring-boot-demo-group  # 消费者组auto-offset-reset: earliest  # 偏移量重置策略enable-auto-commit: false  # 手动提交偏移量properties:spring.json.trusted.packages: "*"  # 信任所有包进行反序列化# 监听器配置listener:ack-mode: manual  # 手动提交模式concurrency: 3    # 并发消费者数量

🔧 Kafka 配置类

​​Kafka 高级配置​​:

@Configuration
@Slf4j
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;/*** 生产者工厂配置*/@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 10);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);return new DefaultKafkaProducerFactory<>(props);}/*** Kafka 模板*/@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());template.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> record, RecordMetadata metadata) {log.info("消息发送成功: topic={}, partition={}, offset={}", record.topic(), metadata.partition(), metadata.offset());}@Overridepublic void onError(ProducerRecord<String, Object> record, Exception exception) {log.error("消息发送失败: topic={}, key={}", record.topic(), record.key(), exception);}});return template;}/*** 消费者工厂配置*/@Beanpublic ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-boot-demo-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");return new DefaultKafkaConsumerFactory<>(props);}/*** 监听器容器工厂*/@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);  // 并发消费者数量factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);// 错误处理factory.setErrorHandler(((thrownException, data) -> {log.error("消费消息失败: {}", data, thrownException);}));return factory;}
}

💻 消息生产者实现

​​消息生产服务​​:

@Service
@Slf4j
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 发送用户注册消息*/public void sendUserRegistration(User user) {String topic = "user-registration";String key = "user_" + user.getId();UserRegistrationEvent event = new UserRegistrationEvent(user.getId(), user.getName(), user.getEmail(), LocalDateTime.now());// 发送消息CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, key, event);// 异步处理发送结果future.whenComplete((result, exception) -> {if (exception != null) {log.error("发送用户注册消息失败: userId={}", user.getId(), exception);} else {log.info("用户注册消息发送成功: userId={}, offset={}", user.getId(), result.getRecordMetadata().offset());}});}/*** 发送订单创建消息*/public void sendOrderCreation(Order order) {String topic = "order-creation";String key = "order_" + order.getId();// 确保消息顺序:使用订单ID作为key确保同一订单的消息发送到同一分区kafkaTemplate.send(topic, key, order).addCallback(result -> log.info("订单创建消息发送成功: orderId={}", order.getId()),exception -> log.error("订单创建消息发送失败: orderId={}", order.getId(), exception));}/*** 发送带事务的消息*/@Transactionalpublic void sendTransactionalMessage(String topic, String key, Object message) {kafkaTemplate.executeInTransaction(operations -> {operations.send(topic, key, message);// 这里可以添加数据库操作,确保消息和数据库操作在同一个事务中return null;});}/*** 用户注册事件*/@Data@AllArgsConstructor@NoArgsConstructorpublic static class UserRegistrationEvent {private Long userId;private String username;private String email;private LocalDateTime registerTime;}/*** 订单实体*/@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private Long id;private Long userId;private BigDecimal amount;private LocalDateTime createTime;}
}

👂 消息消费者实现

​​消息消费服务​​:

@Service
@Slf4j
public class KafkaConsumerService {@Autowiredprivate EmailService emailService;@Autowiredprivate UserService userService;/*** 消费用户注册消息*/@KafkaListener(topics = "user-registration", groupId = "user-service")public void consumeUserRegistration(@Payload KafkaProducerService.UserRegistrationEvent event,@Header(KafkaHeaders.RECEIVED_KEY) String key,@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,Acknowledgment ack) {log.info("收到用户注册消息: key={}, partition={}, event={}", key, partition, event);try {// 1. 发送欢迎邮件emailService.sendWelcomeEmail(event.getEmail(), event.getUsername());// 2. 初始化用户积分userService.initUserPoints(event.getUserId());// 3. 记录注册日志log.info("用户注册处理完成: userId={}", event.getUserId());// 4. 手动提交偏移量ack.acknowledge();} catch (Exception e) {log.error("处理用户注册消息失败: userId={}", event.getUserId(), e);// 根据业务需求决定是否重试}}/*** 消费订单创建消息*/@KafkaListener(topics = "order-creation", groupId = "order-service")public void consumeOrderCreation(@Payload KafkaProducerService.Order order,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,Acknowledgment ack) {log.info("收到订单创建消息: orderId={}, amount={}, time={}", order.getId(), order.getAmount(), Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime());try {// 1. 库存检查checkInventory(order);// 2. 风控检查riskControlCheck(order);// 3. 发送订单确认sendOrderConfirmation(order);log.info("订单处理完成: orderId={}", order.getId());ack.acknowledge();} catch (Exception e) {log.error("处理订单消息失败: orderId={}", order.getId(), e);// 根据异常类型决定处理策略if (e instanceof InventoryException) {// 库存不足,需要人工处理handleInventoryShortage(order, e);} else {// 其他异常,可以重试throw e;}}}/*** 批量消费消息*/@KafkaListener(topics = "batch-processing", groupId = "batch-service")public void consumeBatchMessages(List<ConsumerRecord<String, Object>> records,Acknowledgment ack) {log.info("收到批量消息,数量: {}", records.size());try {for (ConsumerRecord<String, Object> record : records) {processSingleRecord(record);}// 批量提交偏移量ack.acknowledge();} catch (Exception e) {log.error("批量处理消息失败", e);// 可以根据业务需求实现重试逻辑}}// 模拟业务方法private void checkInventory(KafkaProducerService.Order order) {// 库存检查逻辑log.info("检查库存: orderId={}", order.getId());}private void riskControlCheck(KafkaProducerService.Order order) {// 风控检查逻辑log.info("风控检查: orderId={}", order.getId());}private void sendOrderConfirmation(KafkaProducerService.Order order) {// 发送确认逻辑log.info("发送订单确认: orderId={}", order.getId());}private void handleInventoryShortage(KafkaProducerService.Order order, Exception e) {// 处理库存不足log.warn("处理库存不足: orderId={}", order.getId());}private void processSingleRecord(ConsumerRecord<String, Object> record) {// 处理单条记录log.info("处理记录: key={}, value={}", record.key(), record.value());}
}

🔄 邮件服务模拟

​​邮件服务实现​​:

@Service
@Slf4j
public class EmailService {/*** 发送欢迎邮件*/public void sendWelcomeEmail(String email, String username) {log.info("发送欢迎邮件: 收件人={}, 用户名={}", email, username);// 模拟邮件发送try {Thread.sleep(100); // 模拟网络延迟log.info("欢迎邮件发送成功: {}", email);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("邮件发送中断", e);}}/*** 发送订单确认邮件*/public void sendOrderConfirmation(String email, String orderInfo) {log.info("发送订单确认邮件: 收件人={}, 订单信息={}", email, orderInfo);// 模拟邮件发送try {Thread.sleep(150);log.info("订单确认邮件发送成功: {}", email);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("邮件发送中断", e);}}
}

🔍 Kafka 健康检查与监控

​​Kafka 健康指示器​​:

@Component
public class KafkaHealthIndicator implements HealthIndicator {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@Overridepublic Health health() {try {// 测试Kafka连接Future<RecordMetadata> future = kafkaTemplate.send("health-check", "test-key", "test-value");// 设置超时时间try {RecordMetadata metadata = future.get(5, TimeUnit.SECONDS);return Health.up().withDetail("cluster", "kafka").withDetail("topic", metadata.topic()).withDetail("partition", metadata.partition()).withDetail("status", "connected").build();} catch (TimeoutException e) {return Health.down().withDetail("error", "Kafka connection timeout").build();}} catch (Exception e) {return Health.down(e).withDetail("error", "Kafka health check failed: " + e.getMessage()).build();}}
}

💡 五、总结与扩展阅读

🎯 整合成果总结

​​三大组件整合对比​​:

组件核心 Starter关键配置主要用途性能优化点
Redisspring-boot-starter-data-redis连接池、序列化方式(JDK/JSON)、超时配置缓存、分布式锁、会话存储✅ 使用 Lettuce + 连接池复用
✅ 采用 JSON 序列化(GenericJackson2JsonRedisSerializer)
✅ 利用 Pipeline/批量操作 提升吞吐
MyBatismybatis-spring-boot-starter数据源、Mapper 扫描、SqlSession 管理数据持久化、ORM 映射✅ 启用 二级缓存、合理设置 缓存刷新策略
✅ 使用 分页插件(PageHelper / MyBatis Plus) 减少全表扫描
✅ 优化 SQL 与索引结构
Kafkaspring-kafkaProducer/Consumer 参数、Topic 配置异步消息处理、事件驱动架构✅ 启用 批量发送 (batch.size, linger.ms)
✅ 消费端开启 多线程并发消费
✅ 合理设置 acks、retries、幂等性 提高可靠性

🚀 性能优化建议

​​Redis 优化策略​​:

# 生产环境 Redis 优化配置
spring:redis:lettuce:pool:max-active: 50max-idle: 20min-idle: 10timeout: 1000ms

​MyBatis 优化建议​​:

// 启用二级缓存
@CacheNamespace
public interface UserMapper {// Mapper 接口
}// 使用批量操作
@Insert("<script>INSERT INTO user (...) VALUES " +"<foreach collection='list' item='item' separator=','>(...)</foreach></script>")
void batchInsert(List<User> users);

​​Kafka 性能调优​​:

spring:kafka:producer:batch-size: 32768      # 增大批量大小linger-ms: 20          # 适当增加延迟compression-type: snappy # 启用压缩consumer:fetch-max-wait: 500    # 最大等待时间fetch-min-size: 1024   # 最小获取大小
http://www.dtcms.com/a/548993.html

相关文章:

  • 可视化图解算法66:两个数组的交集
  • 7 种方法:如何将视频从电脑传输到安卓手机
  • Qt GridLayout布局详解:从基础到高级技巧
  • BTreeMap 的 B-Tree 之心:性能与安全的 Rust 式演绎
  • 中国查公司的网站长沙 网站设计 公司
  • R 因子:深度解析其在统计学中的重要作用
  • Laravel 结合影刀 RPA 实现企业微信自动询单报价流程
  • Rust 入门之Rust 运算符全面解析:从基础到实战
  • Rust:借用 切片
  • 【Blender工具】
  • Spring Al学习6:嵌入模型 API
  • 坪山区住房和建设局网站wordpress能放视频
  • 网站承建商有哪些注册了一个域名怎么做网站
  • 我公司是帮企业做网站的_现在要帮客户们的网站备案微信公众营销平台开发
  • MPC模型预测控制:原理、设计与MATLAB实现
  • JavaEE初阶,网络编程篇
  • 基于中值滤波和高斯平滑的三维点云数据滤波matlab仿真
  • Java设计模式应用--装饰器模式
  • 【MATLAB例程】基于梯度检测自适应的互补滤波 vs 标准互补滤波,附MATLAB代码下载链接,可直接运行,方便学习和修改成自己想要的程序
  • 在检验铸铁平台精度使用三研法检验有哪些好处
  • 用Blender制作室内效果图宜居之地
  • blender4.5 使用外部IDE(pycharm)编辑脚本(bpy)实践指南
  • 计算机的一点基础知识
  • 广州网站建设 乐云seo国外优秀论文网站
  • CSS 图像拼合技术
  • 【C++】模板进阶 | 继承
  • 排名优化网站建设长沙网站建设优化
  • 厦门网站优化服务pyhton做网站
  • 论文阅读笔记——数据增强
  • 如何裁剪YOLOv8m的大目标检测头并验证其结构