【MongoDB的RLE压缩数据存储】
MongoDB的RLE压缩数据存储
1. RLE(Run-Length Encoding)算法详解
1.1 RLE基本概念
RLE是一种简单的无损数据压缩算法,特别适用于处理连续重复数据。其核心思想是将连续的重复数据值序列替换为一个值和重复次数。
1.2 RLE编码原理
原始数据: [1,1,1,1,1,2,2,3,3,3,3]
RLE编码: [(1,5), (2,2), (3,4)]
1.3 RLE优势与适用场景
优势: 算法简单、压缩效率高(针对重复数据)、解压快速
适用场景: 图像处理、日志数据、传感器数据等包含大量连续重复值的场景
- MongoDB与Spring Data MongoDB配置
2.1 添加依赖
xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
2.2 配置文件
yaml
spring:data:mongodb:uri: mongodb://localhost:27017/record_dbdatabase: record_db
- 完整项目实现
3.1 数据库文档设计
3.1.1 RLE编码文档结构
json
{"_id": ObjectId("..."),"type": "q","version": "123","createTime": ISODate("2024-01-01T00:00:00Z"),"compressed": true,"rleValues": [{"value": 1.0, "count": 100},{"value": 2.0, "count": 50}],"originalSize": 150,"lastUpdateTime": ISODate("2024-01-01T01:00:00Z")
}
3.1.2 未压缩文档结构(小数据量)
json
{"_id": ObjectId("..."),"type": "q", "version": "124","createTime": ISODate("2024-01-01T00:00:00Z"),"compressed": false,"values": [1.23, 4.21, 5.90],"lastUpdateTime": ISODate("2024-01-01T01:00:00Z")
}
3.2 实体类设计
3.2.1 RLE条目类
java
package com.example.entity;import org.springframework.data.annotation.PersistenceCreator;
import org.springframework.data.mongodb.core.mapping.Field;public class RLEEntry {@Field("value")private float value;@Field("count")private int count;public RLEEntry(float value, int count) {this.value = value;this.count = count;}@PersistenceCreatorpublic RLEEntry() {}// Getters and Setterspublic float getValue() { return value; }public void setValue(float value) { this.value = value; }public int getCount() { return count; }public void setCount(int count) { this.count = count; }
}
3.2.2 主文档实体类
java
package com.example.entity;import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;import java.util.Date;
import java.util.List;@Document(collection = "records")
@CompoundIndex(name = "type_version_idx", def = "{'type': 1, 'version': 1}", unique = true)
public class RecordDocument {@Idprivate String id;private String type;private String version;@Field("createTime")private Date createTime;@Field("lastUpdateTime")private Date lastUpdateTime;@Field("compressed")private boolean compressed;@Field("values")private List<Float> values;@Field("rleValues")private List<RLEEntry> rleValues;@Field("originalSize")private Integer originalSize;// Constructorspublic RecordDocument() {this.createTime = new Date();this.lastUpdateTime = new Date();}public RecordDocument(String type, String version, List<Float> values) {this();this.type = type;this.version = version;this.values = values;this.compressed = false;}// Getters and Setterspublic String getId() { return id; }public void setId(String id) { this.id = id; }public String getType() { return type; }public void setType(String type) { this.type = type; }public String getVersion() { return version; }public void setVersion(String version) { this.version = version; }public Date getCreateTime() { return createTime; }public void setCreateTime(Date createTime) { this.createTime = createTime; }public Date getLastUpdateTime() { return lastUpdateTime; }public void setLastUpdateTime(Date lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; }public boolean isCompressed() { return compressed; }public void setCompressed(boolean compressed) { this.compressed = compressed; }public List<Float> getValues() { return values; }public void setValues(List<Float> values) { this.values = values; }public List<RLEEntry> getRleValues() { return rleValues; }public void setRleValues(List<RLEEntry> rleValues) { this.rleValues = rleValues; }public Integer getOriginalSize() { return originalSize; }public void setOriginalSize(Integer originalSize) { this.originalSize = originalSize; }
}
3.3 RLE工具类
java
package com.example.util;import com.example.entity.RLEEntry;
import java.util.ArrayList;
import java.util.List;public class RLEUtil {// 压缩阈值:当连续重复值超过此阈值时使用RLE压缩private static final int COMPRESSION_THRESHOLD = 10;/*** RLE编码压缩*/public static List<RLEEntry> encode(List<Float> values) {if (values == null || values.isEmpty()) {return new ArrayList<>();}List<RLEEntry> encoded = new ArrayList<>();float currentValue = values.get(0);int count = 1;for (int i = 1; i < values.size(); i++) {if (values.get(i).equals(currentValue)) {count++;} else {encoded.add(new RLEEntry(currentValue, count));currentValue = values.get(i);count = 1;}}encoded.add(new RLEEntry(currentValue, count));return encoded;}/*** RLE解码解压*/public static List<Float> decode(List<RLEEntry> rleEntries) {List<Float> decoded = new ArrayList<>();if (rleEntries == null) return decoded;for (RLEEntry entry : rleEntries) {for (int i = 0; i < entry.getCount(); i++) {decoded.add(entry.getValue());}}return decoded;}/*** 判断是否应该使用压缩(基于数据特征)*/public static boolean shouldCompress(List<Float> values) {if (values == null || values.size() < COMPRESSION_THRESHOLD) {return false;}// 检查连续重复模式int maxRunLength = 1;int currentRunLength = 1;for (int i = 1; i < values.size(); i++) {if (values.get(i).equals(values.get(i - 1))) {currentRunLength++;maxRunLength = Math.max(maxRunLength, currentRunLength);} else {currentRunLength = 1;}}return maxRunLength >= COMPRESSION_THRESHOLD;}/*** 计算压缩比*/public static double calculateCompressionRatio(List<Float> original, List<RLEEntry> compressed) {if (original == null || compressed == null) return 0.0;int originalSize = original.size();int compressedSize = compressed.size() * 2; // 每个RLE条目包含值和计数return originalSize == 0 ? 0.0 : (double) compressedSize / originalSize;}
}
3.4 Repository接口
java
package com.example.repository;import com.example.entity.RecordDocument;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.Update;import java.util.Date;
import java.util.List;
import java.util.Optional;public interface RecordRepository extends MongoRepository<RecordDocument, String> {Optional<RecordDocument> findByTypeAndVersion(String type, String version);List<RecordDocument> findByType(String type);List<RecordDocument> findByCompressed(boolean compressed);@Query("{'type': ?0, 'version': ?1}")@Update("{$set: {'lastUpdateTime': ?2}}")void updateLastUpdateTime(String type, String version, Date lastUpdateTime);@Query("{'type': ?0, 'version': ?1, 'compressed': false}")@Update("{$push: {'values': {$each: ?2}}, $set: {'lastUpdateTime': ?3}}")void appendValues(String type, String version, List<Float> newValues, Date lastUpdateTime);@Query("{'type': ?0, 'version': ?1, 'compressed': true}")@Update("{$set: {'rleValues': ?2, 'lastUpdateTime': ?3, 'originalSize': ?4}}")void updateRleValues(String type, String version, List<RLEEntry> rleValues, Date lastUpdateTime, int originalSize);@Query("{'createTime': {$gte: ?0, $lte: ?1}}")List<RecordDocument> findByCreateTimeBetween(Date start, Date end);
}
3.5 服务实现类
java
package com.example.service;
import com.example.entity.RLEEntry;
import com.example.entity.RecordDocument;
import com.example.repository.RecordRepository;
import com.example.util.RLEUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
@Service
public class RecordService {
@Autowired
private RecordRepository recordRepository;// 压缩阈值配置
private static final int COMPRESSION_SIZE_THRESHOLD = 1000; // 数据量超过1000条时考虑压缩/*** 插入或更新记录(自动处理压缩)*/
public RecordDocument saveOrUpdate(String type, String version, List<Float> newValues) {Optional<RecordDocument> existingRecord = recordRepository.findByTypeAndVersion(type, version);if (existingRecord.isPresent()) {return appendToExistingRecord(existingRecord.get(), newValues);} else {return createNewRecord(type, version, newValues);}
}/*** 创建新记录*/
private RecordDocument createNewRecord(String type, String version, List<Float> values) {RecordDocument record = new RecordDocument(type, version, values);// 判断是否需要压缩if (shouldCompress(values)) {compressRecord(record);}return recordRepository.save(record);
}/*** 向现有记录追加数据*/
private RecordDocument appendToExistingRecord(RecordDocument existingRecord, List<Float> newValues) {List<Float> allValues = getAllValues(existingRecord);allValues.addAll(newValues);// 检查是否需要重新评估压缩策略if (existingRecord.isCompressed()) {// 已压缩的记录,直接更新RLE编码updateCompressedRecord(existingRecord, allValues);} else {// 未压缩的记录,检查是否需要压缩if (shouldCompress(allValues)) {compressRecord(existingRecord);existingRecord.setValues(null);recordRepository.save(existingRecord);} else {// 保持未压缩状态,直接追加recordRepository.appendValues(existingRecord.getType(), existingRecord.getVersion(), newValues, new Date());}}return recordRepository.findByTypeAndVersion(existingRecord.getType(), existingRecord.getVersion()).orElse(null);
}/*** 获取记录的所有值(自动解压)*/
public List<Float> getValues(String type, String version) {Optional<RecordDocument> record = recordRepository.findByTypeAndVersion(type, version);return record.map(this::getAllValues).orElse(new ArrayList<>());
}/*** 根据ID查找记录*/
public Optional<RecordDocument> findById(String id) {return recordRepository.findById(id);
}/*** 根据类型查找记录*/
public List<RecordDocument> findByType(String type) {return recordRepository.findByType(type);
}/*** 分页查询*/
public Page<RecordDocument> findAll(Pageable pageable) {return recordRepository.findAll(pageable);
}/*** 删除记录*/
public void deleteRecord(String type, String version) {Optional<RecordDocument> record = recordRepository.findByTypeAndVersion(type, version);record.ifPresent(recordRepository::delete);
}/*** 手动触发压缩*/
public RecordDocument compressRecord(String type, String version) {Optional<RecordDocument> recordOpt = recordRepository.findByTypeAndVersion(type, version);if (recordOpt.isPresent()) {RecordDocument record = recordOpt.get();if (!record.isCompressed() && shouldCompress(getAllValues(record))) {compressRecord(record);return recordRepository.save(record);}}return recordOpt.orElse(null);
}/*** 手动解压缩*/
public RecordDocument decompressRecord(String type, String version) {Optional<RecordDocument> recordOpt = recordRepository.findByTypeAndVersion(type, version);if (recordOpt.isPresent()) {RecordDocument record = recordOpt.get();if (record.isCompressed()) {List<Float> values = RLEUtil.decode(record.getRleValues());record.setValues(values);record.setRleValues(null);record.setCompressed(false);record.setOriginalSize(null);return recordRepository.save(record);}}return recordOpt.orElse(null);
}/*** 获取统计信息*/
public RecordStats getStats(String type, String version) {Optional<RecordDocument> recordOpt = recordRepository.findByTypeAndVersion(type, version);if (recordOpt.isPresent()) {RecordDocument record = recordOpt.get();List<Float> values = getAllValues(record);RecordStats stats = new RecordStats();stats.setTotalValues(values.size());stats.setCompressed(record.isCompressed());stats.setCompressionRatio(record.isCompressed() ? RLEUtil.calculateCompressionRatio(values, record.getRleValues()) : 1.0);stats.setCreateTime(record.getCreateTime());stats.setLastUpdateTime(record.getLastUpdateTime());return stats;}return null;
}// 内部辅助方法
private List<Float> getAllValues(RecordDocument record) {if (record.isCompressed()) {return RLEUtil.decode(record.getRleValues());} else {return record.getValues() != null ? record.getValues() : new ArrayList<>();}
}private boolean shouldCompress(List<Float> values) {return values.size() > COMPRESSION_SIZE_THRESHOLD && RLEUtil.shouldCompress(values);
}private void compressRecord(RecordDocument record) {List<Float> values = getAllValues(record);List<RLEEntry> rleValues = RLEUtil.encode(values);record.setRleValues(rleValues);record.setValues(null);record.setCompressed(true);record.setOriginalSize(values.size());record.setLastUpdateTime(new Date());
}private void updateCompressedRecord(RecordDocument record, List<Float> allValues) {List<RLEEntry> rleValues = RLEUtil.encode(allValues);recordRepository.updateRleValues(record.getType(),record.getVersion(),rleValues,new Date(),allValues.size());
}// 统计信息类
public static class RecordStats {private int totalValues;private boolean compressed;private double compressionRatio;private Date createTime;private Date lastUpdateTime;// Getters and Setterspublic int getTotalValues() { return totalValues; }public void setTotalValues(int totalValues) { this.totalValues = totalValues; }public boolean isCompressed() { return compressed; }public void setCompressed(boolean compressed) { this.compressed = compressed; }public double getCompressionRatio() { return compressionRatio; }public void setCompressionRatio(double compressionRatio) { this.compressionRatio = compressionRatio; }public Date getCreateTime() { return createTime; }public void setCreateTime(Date createTime) { this.createTime = createTime; }public Date getLastUpdateTime() { return lastUpdateTime; }public void setLastUpdateTime(Date lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; }
}
}
3.6 控制器类
java
package com.example.controller;
import com.example.entity.RecordDocument;
import com.example.service.RecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping(“/api/records”)
public class RecordController {
@Autowired
private RecordService recordService;@PostMapping
public ResponseEntity<RecordDocument> createOrUpdateRecord(@RequestParam String type,@RequestParam String version,@RequestBody List<Float> values) {RecordDocument record = recordService.saveOrUpdate(type, version, values);return ResponseEntity.ok(record);
}@GetMapping
public ResponseEntity<RecordDocument> getRecord(@RequestParam String type,@RequestParam String version) {List<Float> values = recordService.getValues(type, version);RecordDocument record = new RecordDocument();record.setType(type);record.setVersion(version);// 注意:这里返回的是包含解压后数据的简化对象record.setValues(values);return ResponseEntity.ok(record);
}@GetMapping("/{id}")
public ResponseEntity<RecordDocument> getRecordById(@PathVariable String id) {return recordService.findById(id).map(ResponseEntity::ok).orElse(ResponseEntity.notFound().build());
}@GetMapping("/type/{type}")
public ResponseEntity<List<RecordDocument>> getRecordsByType(@PathVariable String type) {List<RecordDocument> records = recordService.findByType(type);return ResponseEntity.ok(records);
}@GetMapping("/page")
public ResponseEntity<Page<RecordDocument>> getRecordsPage(@RequestParam(defaultValue = "0") int page,@RequestParam(defaultValue = "10") int size) {Page<RecordDocument> records = recordService.findAll(PageRequest.of(page, size));return ResponseEntity.ok(records);
}@PostMapping("/compress")
public ResponseEntity<RecordDocument> compressRecord(@RequestParam String type,@RequestParam String version) {RecordDocument record = recordService.compressRecord(type, version);return record != null ? ResponseEntity.ok(record) : ResponseEntity.notFound().build();
}@PostMapping("/decompress")
public ResponseEntity<RecordDocument> decompressRecord(@RequestParam String type,@RequestParam String version) {RecordDocument record = recordService.decompressRecord(type, version);return record != null ? ResponseEntity.ok(record) : ResponseEntity.notFound().build();
}@GetMapping("/stats")
public ResponseEntity<RecordService.RecordStats> getRecordStats(@RequestParam String type,@RequestParam String version) {RecordService.RecordStats stats = recordService.getStats(type, version);return stats != null ? ResponseEntity.ok(stats) : ResponseEntity.notFound().build();
}@DeleteMapping
public ResponseEntity<Void> deleteRecord(@RequestParam String type,@RequestParam String version) {recordService.deleteRecord(type, version);return ResponseEntity.ok().build();
}
}
3.7 配置类
java
package com.example.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.EnableMongoAuditing;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
@Configuration
@EnableMongoRepositories(basePackages = “com.example.repository”)
@EnableMongoAuditing
public class MongoConfig {
}
4. 使用示例
4.1 插入数据示例
java
// 插入第一条数据
List values1 = Arrays.asList(1.23f, 4.21f, 5.90f);
recordService.saveOrUpdate(“q”, “123”, values1);
// 插入第二条数据(自动追加)
List values2 = Arrays.asList(5.78f, 9.88f);
recordService.saveOrUpdate(“q”, “123”, values2);
// 插入大量重复数据(自动压缩)
List repeatedValues = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
repeatedValues.add(1.0f);
}
recordService.saveOrUpdate(“q”, “124”, repeatedValues);
4.2 查询数据示例
java
// 查询数据(自动解压)
List values = recordService.getValues(“q”, “123”);
// 获取统计信息
RecordService.RecordStats stats = recordService.getStats(“q”, “124”);
System.out.println("压缩比: " + stats.getCompressionRatio());
5. 性能优化建议
5.1 索引优化
java
// 在实体类上添加复合索引
@CompoundIndex(name = “type_version_idx”, def = “{‘type’: 1, ‘version’: 1}”, unique = true)
5.2 批量操作优化
对于大批量数据插入,考虑使用MongoDB的批量操作
实现数据分片策略,避免单个文档过大
5.3 监控与调优
监控压缩比和查询性能
根据实际数据特征调整压缩阈值
定期清理历史数据
- 总结
本项目实现了基于Spring Boot和MongoDB的智能数据存储系统,具有以下特点:
自动压缩: 根据数据特征自动选择是否使用RLE压缩
透明操作: 提供统一的API,压缩/解压对用户透明
高性能: 针对重复数据优化存储效率
灵活配置: 可调整压缩阈值和策略
完整功能: 提供增删改查、统计等完整功能