当前位置: 首页 > news >正文

一个完整的日志收集方案:Elasticsearch + Logstash + Kibana+Filebeat (三)

现在我们主要完成AI-RAG服务的扩展,利用ES的向量检索能力完成历史聊天记录的存储和向量检索,让ai聊天有记忆。

主要做法是在首次聊天完成后将对话内容写出日志到D:\dev\dev2025\EC0601\logs\chat-his.log

写出日志同时嵌入向量

向量可以从ollama的端点:/api/embeddings获取,不同的模型获取到的维度不一样,我的是4096,这个维度很重要,向量检索必须要使用相同的维度去相互匹配,所以要记录这个值,后续创建es模板需要用到。

然后在第二次以及后续的对话中,就可以从es中基于相似度获取聊天记录从而让聊天模型有记忆

这里解释一下:Elasticsearch 的 k-NN

Elasticsearch 的 k-NN(k-Nearest Neighbors,k 最近邻)查询是一种用于执行**向量相似度搜索**的功能。在您的场景中,它被用来根据查询的嵌入向量找到最相似的历史聊天记录。### 什么是 Elasticsearch k-NN 查询?Elasticsearch 的 k-NN 查询允许您在 `dense_vector` 字段上执行相似度搜索。它会找到在向量空间中与给定查询向量距离最近的 `k` 个文档。这对于实现语义搜索、推荐系统、重复检测等场景非常有用。### k-NN 查询的目的在您的聊天历史记录场景中,k-NN 查询的主要目的是:*   **语义搜索**:通过比较查询的文本嵌入向量与存储在 Elasticsearch 中的聊天记录的 `prompt_embedding` 或 `response_embedding` 向量,找到语义上最相关的历史对话,即使它们不包含完全相同的关键词。
*   **上下文检索**:为 AI 模型的响应提供相关的历史上下文,从而生成更准确和连贯的回复(RAG - Retrieval Augmented Generation)。### Elasticsearch k-NN 查询的关键组件一个基本的 k-NN 查询通常包含以下几个部分:1.  **`knn` 对象**:这是 k-NN 查询的主体。
2.  **`field`**:*   指定要执行相似度搜索的 `dense_vector` 字段。在您的案例中,这通常是 `prompt_embedding` 或 `response_embedding`。
3.  **`query_vector`**:*   这是您要用来查找相似文档的查询向量。它是一个浮点数数组,其维度必须与目标 `dense_vector` 字段的 `dims`(维度)相匹配。在您的场景中,这就是 `getEmbedding(queryPrompt)` 生成的向量。
4.  **`k`**:*   指定您希望返回的最相似结果的数量。例如,`k: 5` 会返回 5 个最相似的文档。
5.  **`num_candidates`**:*   这是一个性能参数。它指定了 Elasticsearch 在内部需要检查的候选文档的数量,以找到 `k` 个最近邻。通常,`num_candidates` 应该大于 `k`(例如,`k` 的几倍),因为它会影响搜索的准确性和性能。更大的 `num_candidates` 可能会提高准确性,但会增加计算开销。### 示例 (Kibana Dev Tools)假设您的 `chat-history-*` 索引中存储了聊天记录的 `response_embedding` 字段,并且它们的维度是 `4096`。如果您想查询与某个特定向量最相似的聊天记录,可以在 Kibana Dev Tools 中这样构建查询:```
GET chat-history-*/_search
{"knn": {"field": "response_embedding","query_vector": [0.1, 0.2, -0.3, ..., 0.05, 0.99, -0.7 // 这里需要是实际的4096维查询向量],"k": 5,"num_candidates": 100}
}
```**注意事项:***   `query_vector` 必须是一个完整的向量,其长度要与 Elasticsearch 映射中 `dense_vector` 字段的 `dims` 参数(在您的情况下是 `4096`)严格一致。
*   `num_candidates` 是一个调优参数。根据您的数据量和对准确性/性能的需求进行调整。您的 Spring Boot 应用程序中的 `AIController.java` 代码正是构建了这样一个 k-NN 查询体,并通过 `restTemplate.postForEntity` 发送给 Elasticsearch。现在的关键是确保应用程序生成的 `query_vector` (由 `getEmbedding(queryPrompt)` 返回)的维度是正确的 `4096`。

这是调试效果:可以看到检索是成功了,本轮对话问题:"你知道我是谁吗"
它已经拿到了上一轮对话的内容:“你好我是伊丽萨白 ”。

当然1.7b它智商不在线,没能把我想要的回答组织起来,这通常需要调整提示语:"基于以下聊天历史记录" 让它更好理解。

接下来就是通过配置实现log写入es

  • es:elasticsearch需要手动创建处理管道和模板用用于存储chat-his.log
PUT _ingest/pipeline/chat-history
{"description": "处理聊天历史数据的管道","processors": [{"date": {"field": "@timestamp","target_field": "timestamp","formats": ["ISO8601"]}},{"remove": {"field": "@timestamp"}},{"set": {"field": "metadata.processed","value": true}},{"set": {"field": "metadata.processed_at","value": "{{_ingest.timestamp}}"}}]
}
PUT _index_template/chat_history_template
{"index_patterns": ["chat-history-*"],"template": {"mappings": {"properties": {"timestamp": {"type": "date","format": "strict_date_optional_time_nanos"},"model": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"prompt": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"response": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"prompt_embedding": {"type": "dense_vector","dims": 4096 //注意这个4096,ollama对应模型的向量维度},"response_embedding": {"type": "dense_vector","dims": 4096  //注意这个4096,ollama对应模型的向量维度}}}}
}
  • logstash:在原来的基础上添加了if [log_type] == "chat-history" 的输入和输出,这个解析过程一定程度决定于filebeat的输出格式
# Sample Logstash configuration for creating a complete
# Beats -> Logstash -> Elasticsearch pipeline with log parsinginput {beats {port => 5044}
}# Filter to parse logs using grok and date plugins
filter {if [log_type] == "springboot-app" {grok {match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:logger} - %{GREEDYDATA:message}" }}}if [log_type] == "chat-history" {# Step 1: 使用 grok 去除三引号(""") 包裹grok {match => {"[event][original]" => "^\"\"\"%{GREEDYDATA:json_content}\"\"\""}tag_on_failure => ["_grok_chat_history_failure"]}# Step 2: 如果提取成功,则尝试解析 json_contentif [json_content] and [json_content] != "" {json {source => "json_content"target => "parsed_chat_history"remove_field => ["json_content"]}}else {# 如果没有匹配到三引号,尝试直接解析 event.original 为 JSONjson {source => "[event][original]"target => "parsed_chat_history"remove_field => ["[event][original]"]}}# Step 3: 提升 chat_log_data 中的字段到顶层if [parsed_chat_history][chat_log_data] {mutate {rename => {"[parsed_chat_history][chat_log_data][timestamp]" => "timestamp""[parsed_chat_history][chat_log_data][model]" => "model""[parsed_chat_history][chat_log_data][prompt]" => "prompt""[parsed_chat_history][chat_log_data][response]" => "response""[parsed_chat_history][chat_log_data][prompt_embedding]" => "prompt_embedding""[parsed_chat_history][chat_log_data][response_embedding]" => "response_embedding"}}# 删除中间字段mutate {remove_field => ["parsed_chat_history"]}}}}output {if [log_type] == "springboot-app" {elasticsearch {hosts => ["https://localhost:9200"]index => "springboot-logs-%{+YYYY.MM.dd}"ssl_enabled => truessl_verification_mode => "full"ssl_certificate_authorities => ["D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt"]api_key => "V6VUSpcBUPesLBBNVAlH:O7l1zeyOwQFfy9w5Af_JTA"}}if [log_type] == "chat-history" {elasticsearch {hosts => ["https://localhost:9200"]index => "chat-history-%{+YYYY.MM.dd}" //匹配es模板自动创建按天分片的索引pipeline => "chat-history" //es管道,对接收的字段做进一步处理,比如日期格式转换等ssl_enabled => truessl_verification_mode => "full"ssl_certificate_authorities => ["D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt"]api_key => "V6VUSpcBUPesLBBNVAlH:O7l1zeyOwQFfy9w5Af_JTA"}}stdout {codec => rubydebug}
}

具体做法在解析前可以要看这里(event字段内容的原始格式)样式进行解析

  • filebeat:这里加入log_type: chat-history的日志收集
filebeat.inputs:- type: filestreamid: springboot-logspaths:- D:/dev/dev2025/EC0601/logs/springboot-ai-rag-demo.logenabled: truefields:log_type: springboot-appapp_name: springboot-ai-rag-demofields_under_root: truemultiline:pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'negate: truematch: after- type: filestreamid: chat-historypaths:- D:/dev/dev2025/EC0601/logs/chat-his.logenabled: truefields:log_type: chat-historyapp_name: springboot-ai-rag-demofields_under_root: trueoutput.logstash:hosts: ["localhost:5044"]enabled: true# 启用 HTTP 状态接口
http.enabled: true
http.port: 5066
logging.level: debug
logging.selectors: ["*"]
  • JDK:最后是JDK,应为es需要安全证书(https://localhost:9200/),所以java程序在访问es时,需要把es证书导入证书到jdk目录  :/lib/security/cacerts
     
  • 提示输入密码时,默认密码通常是 changeit。
  • 当它问“是否信任此证书? [否]:”时,输入 y 并按回车。
keytool -import -trustcacerts -alias elasticsearch_ca -file "D:/dev/dev2025/EC0601/elasticsearch-9.0.1/config/certs/http_ca.crt" -keystore "C:/Program Files/Java/jdk-17/lib/security/cacerts"
  • springboot项目:需要修改之前log配置,现在通过logback配置将控制台日志和聊天日志分析写入到指定目录
<?xml version="1.0" encoding="UTF-8"?>
<configuration><property name="LOG_PATH" value="D:/dev/dev2025/EC0601/logs"/><!-- 控制台输出 --><appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><!-- 主应用日志文件 --><appender name="SPRINGBOOT_APP_LOG" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/springboot-ai-rag-demo.log</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/springboot-ai-rag-demo.%d{yyyy-MM-dd}.log</fileNamePattern><maxHistory>30</maxHistory></rollingPolicy><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><!-- 聊天历史记录文件 --><appender name="CHAT_HISTORY" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/chat-his.log</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/chat-his.%d{yyyy-MM-dd}.log</fileNamePattern><maxHistory>30</maxHistory></rollingPolicy><encoder><pattern>%msg%n</pattern></encoder></appender><!-- 聊天历史记录日志 --><logger name="com.example.demo.logging.ChatHistoryLogger" level="INFO" additivity="false"><appender-ref ref="CHAT_HISTORY"/></logger><root level="INFO"><appender-ref ref="CONSOLE"/><appender-ref ref="SPRINGBOOT_APP_LOG"/></root>
</configuration> 

最后贴一下AIController,和ChatHistoryLogger

package com.example.demo.controller;import com.example.demo.logging.ChatHistoryLogger;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@RestController
@RequestMapping("/api/ai")
public class AIController {@Value("${ollama.base-url:http://localhost:11434}")private String ollamaBaseUrl;@Value("${spring.elasticsearch.rest.uris:https://localhost:9200}")private String elasticsearchBaseUrl;private final RestTemplate restTemplate = new RestTemplate();private final ObjectMapper objectMapper = new ObjectMapper();@Autowiredprivate ChatHistoryLogger chatHistoryLogger;@PostMapping("/chat")public String chat(@RequestBody Map<String, String> request) {String prompt = request.get("prompt");String model = request.getOrDefault("model", "qwen3:1.7b");// 1. 从 Elasticsearch 获取历史记录 (使用向量检索)List<String> chatHistory = getChatHistoryFromElasticsearch(prompt);String historyContext = chatHistory.isEmpty() ? "" : "基于以下聊天历史记录:\n" + String.join("\n", chatHistory) + "\n";// 2. 构建发送给 Ollama 的完整 promptString fullPrompt = historyContext + prompt;// 调用 Ollama APIMap<String, Object> body = new HashMap<>();body.put("model", model);body.put("prompt", fullPrompt);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);StringBuilder aiResponseBuilder = new StringBuilder();try {restTemplate.execute(ollamaBaseUrl + "/api/generate",HttpMethod.POST,req -> {new ObjectMapper().writeValue(req.getBody(), body);req.getHeaders().setContentType(MediaType.APPLICATION_JSON);},resp -> {try (BufferedReader reader = new BufferedReader(new InputStreamReader(resp.getBody()))) {String line;while ((line = reader.readLine()) != null) {if (!line.trim().isEmpty()) {Map<String, Object> json = objectMapper.readValue(line, Map.class);Object part = json.get("response");if (part != null) {aiResponseBuilder.append(part.toString());}}}}return null;});} catch (Exception e) {aiResponseBuilder.append("与Ollama交互失败: ").append(e.getMessage());// Optionally log the full stack trace for debugging// logger.error("Error interacting with Ollama", e);}String aiResponse = aiResponseBuilder.length() > 0 ? aiResponseBuilder.toString() : "无响应";// 记录聊天历史(包含向量嵌入)chatHistoryLogger.logChat(model, prompt, aiResponse);return aiResponse;}private List<String> getChatHistoryFromElasticsearch(String queryPrompt) {List<String> history = new ArrayList<>();try {// 1. 生成查询向量List<Float> queryEmbedding = chatHistoryLogger.getEmbedding(queryPrompt);if (queryEmbedding.isEmpty()) {System.err.println("无法为查询生成嵌入,跳过向量检索。");return history;}String searchUrl = elasticsearchBaseUrl + "/chat-history-*/_search";// 2. 构建 Elasticsearch k-NN 查询体Map<String, Object> esQueryBody = new HashMap<>();Map<String, Object> knnMap = new HashMap<>();knnMap.put("field", "response_embedding"); // 用于向量嵌入的字段knnMap.put("query_vector", queryEmbedding);knnMap.put("k", 5); // 返回最相似的 5 个结果knnMap.put("num_candidates", 10); // 搜索候选数量,通常 k 的几倍esQueryBody.put("knn", knnMap);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);headers.setBasicAuth("elastic", "elastic"); // 请替换为您的实际用户名和密码HttpEntity<Map<String, Object>> entity = new HttpEntity<>(esQueryBody, headers);ResponseEntity<String> response = restTemplate.postForEntity(searchUrl, entity, String.class);if (response.getBody() != null) {Map<String, Object> responseMap = objectMapper.readValue(response.getBody(), Map.class);List<Map<String, Object>> hits = (List<Map<String, Object>>)((Map<String, Object>) responseMap.get("hits")).get("hits");for (Map<String, Object> hit : hits) {Map<String, Object> source = (Map<String, Object>) hit.get("_source");String historicalPrompt = (String) source.get("prompt");String historicalResponse = (String) source.get("response");String role = (String) source.get("role");String content = (String) source.get("content");// Use prompt and response directly from the sourceif (historicalPrompt != null && historicalResponse != null) {history.add("用户: " + historicalPrompt + "\nAI: " + historicalResponse);} else if (role != null && content != null) {history.add(role + ": " + content);}}}} catch (Exception e) {System.err.println("从Elasticsearch获取历史记录失败: " + e.getMessage());}return history;}
} 
package com.example.demo.logging;import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Component
public class ChatHistoryLogger {private static final Logger logger = LoggerFactory.getLogger(ChatHistoryLogger.class);private final ObjectMapper objectMapper = new ObjectMapper();private final RestTemplate restTemplate = new RestTemplate();@Value("${ollama.base-url:http://localhost:11434}")private String ollamaBaseUrl;public void logChat(String model, String prompt, String response) {try {// 获取向量嵌入List<Float> promptEmbedding = getEmbedding(prompt);List<Float> responseEmbedding = getEmbedding(response);// 构建内部日志条目Map<String, Object> innerLogEntry = new HashMap<>();innerLogEntry.put("timestamp", LocalDateTime.now().toString());innerLogEntry.put("model", model);innerLogEntry.put("prompt", prompt);innerLogEntry.put("response", response);innerLogEntry.put("prompt_embedding", promptEmbedding);innerLogEntry.put("response_embedding", responseEmbedding);// 将内部日志条目包装在新的顶级字段中Map<String, Object> logEntry = new HashMap<>();logEntry.put("chat_log_data", innerLogEntry);String jsonLog = objectMapper.writeValueAsString(logEntry);logger.info(jsonLog);} catch (Exception e) {logger.error("Failed to log chat history", e);}}/**使用 Ollama 的 embeddings API 生成向量 */public List<Float> getEmbedding(String text) {try {Map<String, Object> body = new HashMap<>();body.put("model", "qwen:7b");body.put("prompt", text);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers);ResponseEntity<Map> response = restTemplate.postForEntity(ollamaBaseUrl + "/api/embeddings",entity,Map.class);if (response.getBody() != null && response.getBody().containsKey("embedding")) {@SuppressWarnings("unchecked")List<Float> embedding = (List<Float>) response.getBody().get("embedding");return embedding;}} catch (Exception e) {logger.error("Failed to get embedding", e);}return new ArrayList<>();}
} 

下面时工作空间结构,由于文件太大,这里就不贴源码了,后面会上传到csdn,从csdn下载,谢谢大家的支持。

相关文章:

  • <6>-MySQL表的增删查改
  • PostgreSQL 安装与配置全指南(适用于 Windows、macOS 与主流 Linux 发行版)
  • 【Elasticsearch】Elasticsearch 近实时高速查询原理
  • Android高性能音频与图形开发:OpenSL ES与OpenGL ES最佳实践
  • 如何判断对象是否存活
  • DevSecOps实践:CI/CD流水线集成SAST工具的完整指南
  • 从零开始搭建现代化 Monorepo 开发模板:TypeScript + Rollup + Jest + 持续集成完整指南
  • python/java环境配置
  • 张彬彬《龙骨焚箱》开机 奇幻冒险题材引期待
  • 期末考试复习总结-《从简单的页面开始(下)》
  • 亚马逊运营:物流成本优化——如何在开发阶段做好物流成本优化
  • 【多智能体】受木偶戏启发实现多智能体协作编排
  • 论文笔记:LANGUAGE MODELS REPRESENT SPACE AND TIME
  • 初阶数据结构习题【16】(5二叉树)——101. 对称二叉树
  • IDEA中配置HTML和Thymeleaf热部署的步骤
  • Springboot度假村住宿服务平台95i1e(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 从“分散开发”到“智能协同” —— Gitee 如何赋能河南农担构建金融级研发体系?
  • 【docker n8n】本地台式机A部署后,其他电脑B、C如何访问n8n?
  • 记录win10/win11安装docker desktop全过程
  • 漫画Android:APK是怎样安装的?
  • 时时彩怎么建设网站/百度软件开放平台
  • 资阳地网站seo/百度浏览器官网入口
  • 城乡住房建设部网站造价师网/线上营销平台有哪些
  • 代理国外网站/安卓手机优化软件哪个好
  • 旅游主题网站模板/品牌营销推广方案怎么做
  • 做网站除了买域名还有什么/seo整站优化报价