Elasticsearch 从入门到实战
🚀 Elasticsearch 从入门到实战
一、什么是 Elasticsearch?
Elasticsearch(简称 ES)是一个基于 Lucene 的开源、分布式、高扩展性的 全文搜索引擎,同时也被广泛用作 实时数据分析引擎 和 日志存储系统(常与 Logstash、Kibana 组成 ELK 技术栈)。
🔍 主要特性:
特性 | 说明 |
---|---|
🔎 全文搜索 | 支持模糊匹配、拼音、同义词、高亮等 |
⚡ 实时性 | 数据写入后 1 秒内可被搜索到 |
🌐 分布式 | 支持横向扩展,自动负载均衡 |
📦 JSON 接口 | 所有操作通过 RESTful API 完成 |
📊 聚合分析 | 可进行统计、分组、时间序列分析 |
🔁 高可用 | 数据自动复制(replica),节点宕机不丢数据 |
📌 典型应用场景:
- 商品搜索(淘宝、京东)
- 日志分析(ELK)
- 用户行为分析
- 实时监控系统
- 文档检索系统
二、Elasticsearch 安装(CentOS & Debian)
环境要求
- Java 8 或 Java 11(必须)
- 至少 2GB 内存
- 关闭交换分区(swap)以提升性能
1. 在 CentOS 7/8 上安装
步骤 1:安装 Java
sudo yum install -y java-11-openjdk-devel
步骤 2:下载并安装 Elasticsearch
# 下载 RPM 包(以 8.12.0 为例)
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.0-x86_64.rpm# 安装
sudo rpm -ivh elasticsearch-8.12.0-x86_64.rpm
步骤 3:配置
编辑配置文件:
sudo vi /etc/elasticsearch/elasticsearch.yml
修改以下内容:
# 节点名称
node.name: node-1# 网络绑定(允许外部访问)
network.host: 0.0.0.0# 集群名称(默认为 elasticsearch)
cluster.name: my-es-cluster# 发现机制(单节点时设置)
discovery.type: single-node
⚠️ 生产环境需配置
discovery.seed_hosts
和cluster.initial_master_nodes
步骤 4:启动服务
sudo systemctl daemon-reload
sudo systemctl enable elasticsearch
sudo systemctl start elasticsearch
步骤 5:验证
curl http://localhost:9200
输出:
{"name" : "node-1","cluster_name" : "my-es-cluster","version" : {"number" : "8.12.0","build_flavor" : "default"},"tagline" : "You Know, for Search"
}
✅ 安装成功!
2. 在 Debian/Ubuntu 上安装
步骤 1:安装 Java
sudo apt update
sudo apt install -y openjdk-11-jdk
步骤 2:导入 GPG 密钥并添加源
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-8.x.list
步骤 3:安装
sudo apt update
sudo apt install -y elasticsearch
步骤 4:配置 & 启动(同 CentOS)
sudo systemctl enable elasticsearch
sudo systemctl start elasticsearch
三、核心原理:倒排索引(Inverted Index)
1. 什么是倒排索引?
传统数据库使用 正排索引(按 ID 查内容):
ID | 内容 |
---|---|
1 | “苹果手机很好用” |
2 | “华为手机也不错” |
而 Elasticsearch 使用 倒排索引:按词查文档
将文本分词后建立索引:
词语 | 出现的文档 ID |
---|---|
苹果 | [1] |
手机 | [1, 2] |
很好用 | [1] |
华为 | [2] |
不错 | [2] |
2. 搜索 “手机” 时:
- 查倒排表:
手机 → [1, 2]
- 返回文档 1 和 2
✅ 优势:查询速度快,适合海量数据全文检索。
3. 分词器(Analyzer)
ES 使用分词器将文本拆分为“词”(Token),常见分词器:
standard
:标准分词(按空格、标点)ik
:中文分词(需安装插件)
# 安装 ik 分词插件
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.12.0/elasticsearch-analysis-ik-8.12.0.zip
四、分片(Shard)机制
1. 什么是分片?
Elasticsearch 将一个 索引(Index) 拆分成多个 分片(Shard),每个分片是一个独立的 Lucene 实例。
索引: products
├── 分片 0 → 存储部分数据
├── 分片 1 → 存储部分数据
└── 分片 2 → 存储部分数据
2. 为什么需要分片?
- ✅ 水平扩展:数据量大时,可分布到多个节点
- ✅ 并行查询:多个分片可并行处理搜索请求,提升性能
- ✅ 容错:分片可复制(replica)
3. 主分片 vs 副本分片
类型 | 数量 | 作用 |
---|---|---|
主分片(Primary Shard) | 固定(创建索引时指定) | 存储真实数据 |
副本分片(Replica Shard) | 可动态调整 | 数据备份、提升读性能 |
PUT /products
{"settings": {"number_of_shards": 3,"number_of_replicas": 1}
}
表示:
- 3 个主分片
- 每个主分片有 1 个副本 → 共 6 个分片
4. 分片了,怎么知道查哪个?
Elasticsearch 的 协调节点(Coordinating Node) 会自动完成路由:
写入流程:
客户端 → 协调节点 → 计算 _id 的哈希 % 分片数 → 决定写入哪个主分片
公式:
shard = hash(_id) % number_of_primary_shards
读取流程:
- 查询请求到达协调节点
- 协调节点将请求广播到该索引的所有主分片和副本分片
- 各分片并行执行查询
- 协调节点合并结果并返回
✅ 所以你不需要关心“数据在哪个分片”——ES 自动处理!
五、Elasticsearch 字段类型详解(Mapping)
当你创建索引时,需要定义 Mapping,即字段的结构和类型。常见字段类型如下:
1. text
vs keyword
:最重要的区别!
类型 | 是否分词 | 用途 | 示例 |
---|---|---|---|
text | ✅ 是 | 全文搜索,支持模糊匹配 | 商品描述、文章内容 |
keyword | ❌ 否 | 精确匹配、聚合、排序 | 邮箱、状态、标签 |
示例:
PUT /products
{"settings": {"number_of_shards": 3,"number_of_replicas": 1},"mappings": {"properties": {"title": { "type": "text" },"status": { "type": "keyword" },"price": { "type": "float" },"tags": { "type": "keyword" },"created_at": { "type": "date" }}}
}
插入数据:
POST /products/_doc/1
{"title": "华为手机 高清拍照","status": "on_sale","price": 3999.99,"tags": ["手机", "5G"],"created_at": "2025-04-05T10:00:00Z"
}
搜索示例:
-
text
字段搜索(分词):GET /products/_search {"query": {"match": {"title": "华为"}} }
-
keyword
字段精确匹配:GET /products/_search {"query": {"term": {"status": "on_sale"}} }
✅ 总结:
text
→ 用于“搜索内容”keyword
→ 用于“筛选、排序、聚合”
六、安装 IK 中文分词器
Elasticsearch 默认的 standard
分词器对中文支持很差(会按字切分)。推荐使用 IK 分词器。
1. 下载并安装
# 进入 ES 安装目录
cd /usr/share/elasticsearch# 安装 IK 插件(版本需与 ES 一致)
sudo bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.12.0/elasticsearch-analysis-ik-8.12.0.zip
2. 重启 ES
sudo systemctl restart elasticsearch
3. 测试分词效果
GET /_analyze
{"analyzer": "ik_max_word","text": "华为手机很好用"
}
输出:
{"tokens": [{ "token": "华为", "start_offset": 0, "end_offset": 2 },{ "token": "手机", "start_offset": 2, "end_offset": 4 },{ "token": "很好用", "start_offset": 4, "end_offset": 7 }]
}
4. 在 Mapping 中使用 IK
PUT /news
{"mappings": {"properties": {"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"}}}
}
ik_max_word
:最细粒度切分ik_smart
:智能切分(较粗)
七、Canal 详解:MySQL Binlog 实时同步工具
1. 什么是 Canal?
Canal 是阿里巴巴开源的 MySQL 数据库增量日志解析中间件,伪装成 MySQL 的从库(Slave),读取主库的 Binlog,解析成结构化数据,再发送到 MQ 或 Kafka。
类似于 MySQL 的“CDC”(Change Data Capture)
2. 为什么用 Canal?
- ✅ 实时性强(毫秒级延迟)
- ✅ 不侵入业务代码
- ✅ 支持多种数据源和目的地
- ✅ 高可用、可扩展
3. Canal 配置文件详解(instance.properties
)
# MySQL 主库地址
canal.instance.master.address=127.0.0.1:3306# 数据库用户名
canal.instance.dbUsername=root# 数据库密码
canal.instance.dbPassword=123456# 监听的数据库和表
canal.instance.filter.regex=mydb\\.user,mydb\\.order# RabbitMQ 配置
canal.mq.topic=es_user_topic
canal.mq.servers=127.0.0.1:5672
canal.mq.exchange=canal_exchange
配置项 | 含义 |
---|---|
master.address | MySQL 主库 IP 和端口 |
dbUsername/password | 连接 MySQL 的账号 |
filter.regex | 正则表达式,指定监听哪些表 |
mq.topic | 发送到 MQ 的主题名 |
mq.servers | MQ 服务器地址 |
4. Binlog 数据格式(Canal 输出示例)
Canal 将 Binlog 解析为 JSON 格式,示例如下:
{"data": [{"id": "1","name": "张三","age": "25","email": "zhangsan@example.com"}],"database": "mydb","table": "user","type": "INSERT","ts": 1743811200000
}
字段 | 含义 |
---|---|
data | 变更的数据(数组,支持批量) |
database | 数据库名 |
table | 表名 |
type | 操作类型:INSERT , UPDATE , DELETE |
ts | 时间戳 |
Spring Boot 消费此消息,即可同步到 ES。
八、不依赖 Canal:定时任务同步(全量 + 增量)
如果你不想引入 Canal,也可以通过 定时任务查询数据库 实现同步。
场景:每天凌晨同步一次用户数据
技术栈:Spring Boot + Scheduled + MyBatis
1. 数据库表结构
CREATE TABLE user (id BIGINT PRIMARY KEY,name VARCHAR(50),age INT,email VARCHAR(100),updated_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
2. Spring Boot 同步逻辑
@Component
@Slf4j
public class UserSyncTask {@Autowiredprivate UserMapper userMapper;@Autowiredprivate ElasticsearchClient esClient;// 每天凌晨 2:00 执行@Scheduled(cron = "0 0 2 * * ?")public void syncUsers() throws IOException {log.info("开始同步用户数据到 ES");// 查询所有 updated_time > 最后同步时间的数据(增量)LocalDateTime lastSyncTime = getLastSyncTime(); // 可存入 Redis 或 DBList<User> users = userMapper.findByUpdatedTimeAfter(lastSyncTime);for (User user : users) {IndexRequest<User> request = IndexRequest.of(i -> i.index("users").id(user.getId().toString()).document(user));esClient.index(request);}log.info("同步完成,共 {} 条数据", users.size());}private LocalDateTime getLastSyncTime() {// 实际项目中可从 Redis 或数据库读取上次同步时间return LocalDateTime.now().minusDays(1);}
}
3. 优缺点对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Canal + MQ | 实时性强(毫秒级) 不侵入业务 | 架构复杂 运维成本高 | 高实时性要求(如订单、日志) |
定时任务 | 简单易实现 无需额外中间件 | 延迟高(分钟/小时级) | 数据量小、实时性要求低 |
九、Java 原生客户端实现:增删查改 + 高亮
1. 添加依赖(Maven)
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.12.0</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version>
</dependency>
<dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.1.2</version>
</dependency>
<dependency><groupId>org.glassfish</groupId><artifactId>jakarta.json</artifactId><version>2.0.1</version>
</dependency>
2. 创建实体类
public class User {private String id;private String name;private Integer age;private String email;private String city;private List<String> interests;// Getters and Setterspublic String getId() { return id; }public void setId(String id) { this.id = id; }public String getName() { return name; }public void setName(String name) { this.name = name; }public Integer getAge() { return age; }public void setAge(Integer age) { this.age = age; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getCity() { return city; }public void setCity(String city) { this.city = city; }public List<String> getInterests() { return interests; }public void setInterests(List<String> interests) { this.interests = interests; }
}
3. Java 原生客户端 CRUD + 高亮
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.HighlightField;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;import java.io.IOException;
import java.util.*;public class ElasticsearchNativeDemo {private static final String INDEX_NAME = "users";private static ElasticsearchClient client;static {RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());client = new ElasticsearchClient(transport);}// 1. 创建索引(带 mapping)public static void createIndex() throws IOException {client.indices().create(c -> c.index(INDEX_NAME).settings(s -> s.numberOfShards(3).numberOfReplicas(1)).mappings(m -> m.properties("name", p -> p.text(t -> t)).properties("age", p -> p.integer(i -> i)).properties("email", p -> p.keyword(k -> k)).properties("city", p -> p.text(t -> t.analyzer("ik_max_word")))));System.out.println("索引创建成功");}// 2. 插入文档(Create)public static void createDocument() throws IOException {User user = new User();user.setId("1");user.setName("张三");user.setAge(25);user.setEmail("zhangsan@example.com");user.setCity("北京朝阳区");user.setInterests(Arrays.asList("读书", "电影"));IndexResponse response = client.index(i -> i.index(INDEX_NAME).id(user.getId()).document(user));System.out.println("插入成功,ID: " + response.id());}// 3. 查询文档(Read)public static void getDocument() throws IOException {GetResponse<User> response = client.get(g -> g.index(INDEX_NAME).id("1"), User.class);if (response.found()) {User user = response.source();System.out.println("查询结果: " + user.getName() + ", " + user.getCity());} else {System.out.println("文档不存在");}}// 4. 更新文档(Update)public static void updateDocument() throws IOException {Map<String, Object> jsonMap = new HashMap<>();jsonMap.put("age", 26);UpdateResponse<User> response = client.update(u -> u.index(INDEX_NAME).id("1").doc(jsonMap), User.class);System.out.println("更新成功,版本: " + response.version());}// 5. 删除文档(Delete)public static void deleteDocument() throws IOException {DeleteResponse response = client.delete(d -> d.index(INDEX_NAME).id("1"));System.out.println("删除成功,版本: " + response.version());}// 6. 搜索 + 高亮public static void searchWithHighlight() throws IOException {SearchResponse<User> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.match(t -> t.field("city").query("北京"))).highlight(h -> h.fields("city", f -> f.preTags("<em style='color:red'>").postTags("</em>"))).source(true), User.class);System.out.println("共 " + response.hits().total().value() + " 条结果");for (Hit<User> hit : response.hits().hits()) {User user = hit.source();System.out.println("用户: " + user.getName() + ", 城市: " + user.getCity());// 输出高亮内容Highlight highlight = hit.highlight();if (highlight != null && highlight.size() > 0) {List<HighlightField> cityHighlights = highlight.get("city");if (cityHighlights != null && !cityHighlights.isEmpty()) {System.out.println("高亮城市: " + cityHighlights.get(0).fragments().get(0));}}}}public static void main(String[] args) throws IOException {createIndex();createDocument();getDocument();updateDocument();searchWithHighlight();// deleteDocument(); // 可选}
}
十、Spring Boot 整合实现:增删查改 + 高亮
1. 添加依赖(pom.xml)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
2. 配置文件(application.yml)
spring:elasticsearch:uris: localhost:9200username: elasticpassword: changeme
3. 实体类(使用注解)
@Document(indexName = "users")
public class User {@Idprivate String id;@Field(type = FieldType.Text)private String name;@Field(type = FieldType.Integer)private Integer age;@Field(type = FieldType.Keyword)private String email;@Field(type = FieldType.Text, analyzer = "ik_max_word")private String city;@Field(type = FieldType.Keyword)private List<String> interests;// Getters and Setters 省略
}
4. Repository 接口
public interface UserRepository extends ElasticsearchRepository<User, String> {List<User> findByName(String name);List<User> findByCityContaining(String city);
}
5. Service 层(含高亮)
@Service
public class UserService {@Autowiredprivate UserRepository userRepository;public void saveUser(User user) {userRepository.save(user);}public Optional<User> getUserById(String id) {return userRepository.findById(id);}public void updateUser(User user) {userRepository.save(user);}public void deleteUser(String id) {userRepository.deleteById(id);}public SearchHits<User> searchWithHighlight(String keyword) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchQuery("city", keyword)).withHighlightFields(new HighlightBuilder.Field("city").preTags("<em style='color:red'>").postTags("</em>")).build();SearchHits<User> hits = userRepository.search(query, User.class);for (SearchHit<User> hit : hits) {System.out.println("用户: " + hit.getContent().getName());if (hit.getHighlightFields() != null) {List<String> highlights = hit.getHighlightFields().get("city");if (highlights != null && !highlights.isEmpty()) {System.out.println("高亮: " + highlights.get(0));}}}return hits;}
}
6. Controller 示例
@RestController
@RequestMapping("/users")
public class UserController {@Autowiredprivate UserService userService;@PostMappingpublic String save(@RequestBody User user) {userService.saveUser(user);return "保存成功";}@GetMapping("/{id}")public User get(@PathVariable String id) {return userService.getUserById(id).orElse(null);}@GetMapping("/search")public SearchHits<User> search(@RequestParam String q) {return userService.searchWithHighlight(q);}
}
十一、总结
主题 | 要点 |
---|---|
Elasticsearch 是什么 | 分布式全文搜索引擎,支持实时搜索与分析 |
IK 分词器 | 解决中文分词问题,ik_max_word / ik_smart |
字段类型 | text (分词搜索) vs keyword (精确匹配) |
Canal | 解析 MySQL Binlog,实现实时数据同步 |
Canal 配置 | master.address , filter.regex , mq.topic |
Binlog 格式 | JSON 结构:data , table , type , ts |
同步方案 | 实时:Canal + MQ;简单:定时任务 + 增量查询 |
Java 原生 CRUD | 使用 ElasticsearchClient 实现完整操作 |
Spring Boot CRUD | 使用 @Document , ElasticsearchRepository |
高亮实现 | 原生使用 highlight() ,Spring 使用 HighlightBuilder |
📚 推荐方案选择
业务需求 | 推荐方案 |
---|---|
实时商品搜索 | ✅ Canal + RabbitMQ |
日志分析 | ✅ Filebeat → Logstash → ES |
小型系统,数据量 < 10万 | ✅ 定时任务同步 |
需要中文搜索 | ✅ 安装 IK 分词器 |
Java 项目 | ✅ Spring Data Elasticsearch |
高性能 | ✅ 原生 Java API Client |
📚 参考资料
- Elasticsearch 官方文档
- Canal GitHub
- IK 分词器 GitHub
- 《Elasticsearch 权威指南》
- Spring Data Elasticsearch 文档
觉得有收获?点赞、收藏、分享三连支持一下吧!
欢迎在评论区提问或分享你的使用经验 👇