当前位置: 首页 > news >正文

SpringCloud 项目阶段十:kafka实现双端信息同步以及ElasticSearch容器搭建示例

自媒体文章上下架流程图

说明:

  1. 自媒体端创建一个接口,传递文章的id和对应enable
  2. 根据状态修改news对象中的enable数据,当enable属性发生变化时,
  3. 调用kafka服务(会将new中的articleId值传递给kafka),它会同时将news修改的内容同步给article中。

定义自媒体端文章上下架接口

接口信息

响应结果

kafka配置

nacos添加

spring:kafka:bootstrap-servers: varin.cn:9999producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

配置公用的kafka主题

package com.hei.common.constance;public class WmNewsMessageConstants {public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

配置接收前端dto


package com.heima.model.wemedia.dtos;import io.swagger.models.auth.In;
import lombok.Data;/*** 用于判断自媒体文章上架下架*/
@Data
public class WmNewsEnableDto {private Integer id;private Short enable;
}

定义Controller接口

@PostMapping("down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsEnableDto dto){return wmNewsService.down_or_up(dto);
}

定义Service并实现Impl

package com.heima.wemedia.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.dtos.WmNewsDto;
import com.heima.model.wemedia.dtos.WmNewsEnableDto;
import com.heima.model.wemedia.dtos.WmNewsPageReqDto;
import com.heima.model.wemedia.pojos.WmNews;import java.lang.reflect.InvocationTargetException;public interface WmNewsService extends IService<WmNews> {ResponseResult down_or_up(WmNewsEnableDto dto);}

Impl

  @Autowiredprivate WmNewsMapper wmNewsMapper;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult down_or_up(WmNewsEnableDto dto) {// 判断参数if (dto.getId()==null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_REQUIRE,"文章id不可缺少");}// 查询文章WmNews wmNews = wmNewsMapper.selectById(dto.getId());if (wmNews==null) {return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");}if (!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())) {return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"当前文章不是发布状态");}// 判断上架下架的范围if (wmNews.getEnable()!=null && wmNews.getEnable()>-1 &&wmNews.getEnable()<2) {wmNews.setEnable(dto.getEnable());updateById(wmNews);// 卡夫卡生产者发送消息if (wmNews.getArticleId()!=null) {Map<String, Object> map = new HashMap<>();map.put("articleId", wmNews.getArticleId());map.put("enable", dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));}}return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

app端定义Kafka监停上下状态

配置

nacos设置

spring:kafka:bootstrap-servers: varin.cn:9999consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

实现监听事件

package com.heima.article.listener;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hei.common.constance.WmNewsMessageConstants;
import com.heima.article.service.ArticleConfigService;
import com.heima.model.wemedia.pojos.WmNews;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class ArticleDownOrRpListener {@Autowiredprivate ArticleConfigService articleConfigService;@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void articleDownOrRp(String  message){if (StringUtils.isNotBlank(message)) {Map jsonObject = JSON.parseObject(message, Map.class);articleConfigService.updatetoMap(jsonObject);}}
}

定义service并实现Impl

package com.heima.article.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.article.pojos.ApArticleConfig;import java.util.Map;public interface ArticleConfigService extends IService<ApArticleConfig> {void updatetoMap(Map map);
}

Impl

package com.heima.article.service.impl;import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.article.mapper.ApArticleConfigMapper;
import com.heima.article.service.ArticleConfigService;
import com.heima.model.article.pojos.ApArticleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.Map;@Service
@Transactional
@Slf4j
public class ArticleConfigServiceImpl  extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ArticleConfigService {@Overridepublic void updatetoMap(Map map) {Object o = map.get("enable");Boolean enable = o.equals(1)?false:true;// 更新update(Wrappers.<ApArticleConfig>lambdaUpdate().  eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,enable));}
}

app端文章搜索需求分析

  • 用户输入关键可搜索文章列表
  • 关键词高亮显示
  • 文章列表展示与home展示一样,当用户点击某一篇文章,可查看文章详情
  • 为了加快检索的效率,在查询的时候不会直接从数据库中查询文章,需要在**elasticsearch**中进行高速检索。

ElasticSearch介绍

Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用Elasticsearch的水平伸缩性,能使数据在生产环境变得更有价值。Elasticsearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elasticsearch 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户。

docker搭建ElasticSearch环境

拉取镜像

拉取镜像并指定版本

docker pull elasticsearch:7.4.0

运行容器

docker run -id --name elasticsearch -d --restart=always -p 9200:9200 -p 9300:9300 -v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins -e "discovery.type=single-node" elasticsearch:7.4.0

以下是该 docker run 命令各参数的详细解释,以表格形式呈现:

参数说明
docker runDocker 的基础命令,用于创建并启动一个新的容器
-i以交互式模式运行容器,保持标准输入(STDIN)打开,即使没有连接
-d以守护进程(后台)模式运行容器,容器启动后会返回容器 ID,不在前台阻塞终端
--name elasticsearch为容器指定一个自定义名称(elasticsearch),方便后续通过名称操作容器(如启动、停止、删除等)
--restart=always设置容器的重启策略为 always:无论容器因何种原因退出(包括正常退出、异常崩溃等),Docker 都会自动重启该容器
-p 9200:9200端口映射:将主机的 9200 端口映射到容器内的 9200 端口。Elasticsearch 的 HTTP 服务默认使用 9200 端口,外部可通过主机的 9200 端口访问容器内的 Elasticsearch 服务
-p 9300:9300另一组端口映射:将主机的 9300 端口映射到容器内的 9300 端口。9300 是 Elasticsearch 节点间通信的默认端口(用于集群内节点发现和数据同步)
-v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins数据卷挂载:将主机的 /usr/share/elasticsearch/plugins 目录挂载到容器内的同名目录。作用是持久化 Elasticsearch 的插件(容器重启或重建后,插件不会丢失)
-e "discovery.type=single-node"设置环境变量:向容器内传递 discovery.type=single-node 参数,指定 Elasticsearch 以单节点模式运行(无需配置集群,适合开发或测试环境)
elasticsearch:7.4.0指定启动容器所使用的镜像:elasticsearch 是镜像名称,7.4.0 是镜像的版本标签

配置中文分词器 ik

因为在创建elasticsearch容器的时候,映射了目录,所以可以在宿主机上进行配置ik中文分词器

在去选择ik分词器的时候,需要与elasticsearch的版本好对应上

elasticsearch-analysis-ik-7.4.0.zip上传到服务器上,放到对应目录(plugins)解压

#切换目录
cd /usr/share/elasticsearch/plugins
#新建目录
mkdir analysis-ik
cd analysis-ik
#root根目录中拷贝文件
mv elasticsearch-analysis-ik-7.4.0.zip /usr/share/elasticsearch/plugins/analysis-ik
#解压文件
cd /usr/share/elasticsearch/plugins/analysis-ik
unzip elasticsearch-analysis-ik-7.4.0.zip

重启docker容器服务

sudo docker restart elasticsearch

使用postman测试

Post请求

URL:81.69.12.244:9200/_analyze

# body
{"analyzer":"ik_max_word","text":"今天也是加油的一天呀"
}

响应结果

{"tokens": [{"token": "今天","start_offset": 0,"end_offset": 2,"type": "CN_WORD","position": 0},{"token": "也是","start_offset": 2,"end_offset": 4,"type": "CN_WORD","position": 1},{"token": "加油","start_offset": 4,"end_offset": 6,"type": "CN_WORD","position": 2},{"token": "的","start_offset": 6,"end_offset": 7,"type": "CN_CHAR","position": 3},{"token": "一天","start_offset": 7,"end_offset": 9,"type": "CN_WORD","position": 4},{"token": "一","start_offset": 7,"end_offset": 8,"type": "TYPE_CNUM","position": 5},{"token": "天呀","start_offset": 8,"end_offset": 10,"type": "CN_WORD","position": 6},{"token": "天","start_offset": 8,"end_offset": 9,"type": "COUNT","position": 7},{"token": "呀","start_offset": 9,"end_offset": 10,"type": "CN_CHAR","position": 8}]
}

加油呀~

http://www.dtcms.com/a/410085.html

相关文章:

  • 解析前端框架 Axios 的设计理念与源码:从零手写一个支持 HTTP/3 的“类 Axios”最小核
  • 共享ip服务器做网站小型创意电子产品设计
  • [Dify] 知识库架构介绍与使用场景概述
  • NFS 服务器iSCSI 服务器
  • 如何确保CMS系统能够快速响应用户请求?全面性能优化指南
  • 【202509新版】Hexo + GitHub Pages 免费部署个人博客|保姆级教程 第三部
  • 同时使用ReactUse 、 ahooks与性能优化
  • 跨境电商怎么做一件代发宁波关键词排名优化平台
  • FreeFusion:基于交叉重构学习的红外与可见光图像融合
  • GraphRAG对自然语言处理中深层语义分析的革命性影响与未来启示
  • 数据分析-60-工业时序数据分析之开关频次
  • C++入门基础知识157—【用一篇博文简单了解数据结构之红黑树】
  • 做网站课程报告阜阳网站建设哪家好
  • 吃透 Java 中的 break 与 continue
  • 【Android之路】kotlin和Jatpack compose
  • 渗透测试入门:从网络抓包到Web安全基础
  • 阿里云CDN加速流量消耗大原因:动态加速
  • 云栖2025 | 阿里云自研大数据平台 ODPS 重磅升级:全面支持AI计算和服务
  • FreeRTOS内存分配与STM32内存布局详解
  • 外贸建站的公司wordpress如何汉化主题
  • phpcms网站系统 技术方案 系统框架图网站系统开发团队简介
  • vue3+ts项目实现陕西省3d地图
  • leetcode_146 LRU缓存
  • Python常用自动化测试框架—Pytest详解
  • 郑州英文网站建设软件开发平台搭建
  • 在 C# .NETCore 中使用 MongoDB(第 3 部分):跳过、排序、限制和投影
  • 建设网站入什么科目最大的商标交易平台
  • esp32墨水屏学习3
  • DOM(二):事件监听、事件类型、事件对象、环境对象、回调函数、Tab栏切换
  • net6.0 WebApi 中使用 Entity Framework Core + Sqlite