当前位置: 首页 > news >正文

构建分布式京东商品数据采集系统:基于 API 的微服务实现方案

随着电子商务的快速发展,商品数据的采集与分析成为企业制定营销策略、优化产品布局的重要依据。京东作为国内领先的电商平台,其商品数据具有极高的商业价值。本文将介绍如何构建一个分布式的京东商品数据采集系统,采用基于 API 的微服务架构,实现高效、稳定、可扩展的数据采集能力。

系统架构设计

分布式京东商品数据采集系统采用微服务架构,主要包含以下几个核心服务:

  1. API 网关服务:统一入口,负责请求路由、负载均衡、认证授权
  2. 任务调度服务:管理采集任务的分发与状态跟踪
  3. 数据采集服务:实际执行商品数据采集的工作节点
  4. 数据存储服务:负责采集数据的持久化存储
  5. 数据清洗服务:对采集的原始数据进行清洗和标准化
  6. 监控告警服务:监控各服务状态,异常时触发告警

系统架构图如下:

plaintext

[客户端] → [API网关] → [任务调度服务]↓
[监控告警] ← [数据清洗] ← [数据采集集群] → [京东API/网页]↓[数据存储服务]

核心服务实现

1. API 网关服务

采用 Spring Cloud Gateway 实现 API 网关,负责请求路由和负载均衡。

spring:cloud:gateway:routes:- id: task-serviceuri: lb://task-servicepredicates:- Path=/api/tasks/**filters:- name: RequestRateLimiterargs:redis-rate-limiter.replenishRate: 10redis-rate-limiter.burstCapacity: 20- id: crawler-serviceuri: lb://crawler-servicepredicates:- Path=/api/crawlers/**- id: data-serviceuri: lb://data-servicepredicates:- Path=/api/data/**eureka:client:serviceUrl:defaultZone: http://eureka-server1:8761/eureka/,http://eureka-server2:8762/eureka/
server:port: 8080

2. 任务调度服务

任务调度服务负责管理采集任务的创建、分发和状态跟踪,采用 Spring Boot 实现。

@Service
public class TaskService {@Autowiredprivate TaskRepository taskRepository;@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate DiscoveryClient discoveryClient;// 创建新任务public Task createTask(TaskRequest request) {Task task = new Task();task.setKeywords(request.getKeywords());task.setStatus(TaskStatus.PENDING);task.setCreatedTime(new Date());task.setPriority(request.getPriority());task.setTotalPages(calculateTotalPages(request));return taskRepository.save(task);}// 分配任务给可用的采集节点@Scheduled(fixedRate = 5000)public void assignTasks() {List<Task> pendingTasks = taskRepository.findByStatus(TaskStatus.PENDING);if (pendingTasks.isEmpty()) {return;}List<String> crawlerInstances = discoveryClient.getInstances("crawler-service").stream().map(instance -> instance.getHost() + ":" + instance.getPort()).collect(Collectors.toList());if (crawlerInstances.isEmpty()) {log.warn("No available crawler instances");return;}// 简单的轮询分配策略for (int i = 0; i < pendingTasks.size(); i++) {Task task = pendingTasks.get(i);String crawlerUrl = "http://" + crawlerInstances.get(i % crawlerInstances.size()) + "/api/crawlers/execute";try {restTemplate.postForObject(crawlerUrl, task, Task.class);task.setStatus(TaskStatus.ASSIGNED);taskRepository.save(task);log.info("Task {} assigned to crawler", task.getId());} catch (Exception e) {log.error("Failed to assign task {} to crawler", task.getId(), e);}}}// 计算总页数private int calculateTotalPages(TaskRequest request) {// 根据关键词预估总页数,实际应用中可根据京东API返回结果动态调整return 10; // 简化示例}// 更新任务状态public void updateTaskStatus(Long taskId, TaskStatus status) {Task task = taskRepository.findById(taskId).orElseThrow(() -> new TaskNotFoundException("Task not found: " + taskId));task.setStatus(status);if (status == TaskStatus.COMPLETED) {task.setCompletedTime(new Date());}taskRepository.save(task);}
}

3. 数据采集服务

数据采集服务是实际执行商品数据采集的工作节点,采用分布式部署以提高采集效率和系统可用性。

@Service
public class JdCrawlerService {private static final Logger log = LoggerFactory.getLogger(JdCrawlerService.class);@Autowiredprivate RestTemplate restTemplate;@Value("${jd.api.url}")private String jdApiUrl;@Value("${jd.api.appkey}")private String appKey;@Value("${jd.api.secret}")private String secret;@Autowiredprivate DataServiceClient dataServiceClient;// 执行采集任务public void executeTask(Task task) {log.info("Starting to crawl task: {}", task.getId());try {// 更新任务状态为运行中updateTaskStatus(task.getId(), TaskStatus.RUNNING);// 分页采集商品数据for (int page = 1; page <= task.getTotalPages(); page++) {List<Product> products = crawlPage(task.getKeywords(), page);if (products.isEmpty()) {log.info("No more products found for task {}, stopping early", task.getId());break;}// 保存采集的数据dataServiceClient.saveProducts(products, task.getId());log.info("Crawled page {} for task {}, {} products", page, task.getId(), products.size());// 随机休眠,避免触发反爬机制Thread.sleep(new Random().nextInt(3000) + 2000);}// 采集完成,更新任务状态updateTaskStatus(task.getId(), TaskStatus.COMPLETED);log.info("Task {} completed successfully", task.getId());} catch (Exception e) {log.error("Error crawling task {}", task.getId(), e);updateTaskStatus(task.getId(), TaskStatus.FAILED);}}// 采集单页商品数据private List<Product> crawlPage(String keywords, int page) {try {// 构建API请求参数String timestamp = String.valueOf(System.currentTimeMillis());String sign = generateSign(keywords, page, timestamp);Map<String, String> params = new HashMap<>();params.put("keyword", keywords);params.put("page", String.valueOf(page));params.put("appkey", appKey);params.put("timestamp", timestamp);params.put("sign", sign);// 调用京东API获取商品数据String url = buildUrlWithParams(jdApiUrl, params);JdApiResponse response = restTemplate.getForObject(url, JdApiResponse.class);if (response.isSuccess()) {return convertToProducts(response.getData());} else {log.error("JD API request failed: {}", response.getMsg());return Collections.emptyList();}} catch (Exception e) {log.error("Error crawling page {} for keyword {}", page, keywords, e);return Collections.emptyList();}}// 生成签名,防止API滥用private String generateSign(String keyword, int page, String timestamp) {String baseString = appKey + keyword + page + timestamp + secret;return DigestUtils.md5DigestAsHex(baseString.getBytes(StandardCharsets.UTF_8));}// 构建带参数的URLprivate String buildUrlWithParams(String baseUrl, Map<String, String> params) {UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(baseUrl);params.forEach(builder::queryParam);return builder.toUriString();}// 转换API响应数据为Product对象private List<Product> convertToProducts(List<Map<String, Object>> dataList) {List<Product> products = new ArrayList<>();for (Map<String, Object> data : dataList) {Product product = new Product();product.setSkuId(data.get("sku_id").toString());product.setName(data.get("name").toString());product.setPrice(new BigDecimal(data.get("price").toString()));product.setShopName(data.get("shop_name").toString());product.setCommentCount(Integer.parseInt(data.get("comment_count").toString()));product.setGoodRate(new BigDecimal(data.get("good_rate").toString()));product.setCrawlTime(new Date());products.add(product);}return products;}// 更新任务状态private void updateTaskStatus(Long taskId, TaskStatus status) {// 调用任务服务更新状态restTemplate.put("http://task-service/api/tasks/" + taskId + "/status", status);}
}

4. 数据存储服务

数据存储服务负责将采集到的商品数据进行持久化存储,采用 MySQL 作为主数据库,Redis 作为缓存。

@Service
public class ProductService {@Autowiredprivate ProductRepository productRepository;@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// 批量保存商品数据@Transactionalpublic void saveProducts(List<Product> products, Long taskId) {if (CollectionUtils.isEmpty(products)) {return;}// 批量保存到数据库List<Product> savedProducts = productRepository.saveAll(products);// 缓存热门商品saveHotProductsToCache(savedProducts);// 记录任务关联的商品IDsaveProductTaskRelation(savedProducts, taskId);}// 缓存热门商品private void saveHotProductsToCache(List<Product> products) {// 筛选评论数多的商品作为热门商品List<Product> hotProducts = products.stream().sorted((p1, p2) -> Integer.compare(p2.getCommentCount(), p1.getCommentCount())).limit(10).collect(Collectors.toList());// 存入Redis缓存,过期时间1小时for (Product product : hotProducts) {redisTemplate.opsForValue().set("product:hot:" + product.getSkuId(), product, 1, TimeUnit.HOURS);}}// 记录任务与商品的关联关系private void saveProductTaskRelation(List<Product> products, Long taskId) {List<ProductTaskRelation> relations = products.stream().map(product -> {ProductTaskRelation relation = new ProductTaskRelation();relation.setProductId(product.getId());relation.setTaskId(taskId);return relation;}).collect(Collectors.toList());// 保存关联关系// productTaskRelationRepository.saveAll(relations);}// 根据关键词查询商品public Page<Product> findProductsByKeyword(String keyword, Pageable pageable) {// 先查缓存String cacheKey = "products:keyword:" + keyword + ":" + pageable.getPageNumber() + ":" + pageable.getPageSize();Page<Product> cachedPage = (Page<Product>) redisTemplate.opsForValue().get(cacheKey);if (cachedPage != null) {return cachedPage;}// 缓存未命中,查询数据库Page<Product> productPage = productRepository.findByNameContaining(keyword, pageable);// 存入缓存,过期时间30分钟redisTemplate.opsForValue().set(cacheKey, productPage, 30, TimeUnit.MINUTES);return productPage;}// 根据SKU ID查询商品详情public Product findBySkuId(String skuId) {// 先查缓存Product cachedProduct = (Product) redisTemplate.opsForValue().get("product:sku:" + skuId);if (cachedProduct != null) {return cachedProduct;}// 缓存未命中,查询数据库Product product = productRepository.findBySkuId(skuId).orElseThrow(() -> new ProductNotFoundException("Product not found: " + skuId));// 存入缓存,过期时间1小时redisTemplate.opsForValue().set("product:sku:" + skuId, product, 1, TimeUnit.HOURS);return product;}
}

系统扩展性与可靠性设计

1. 水平扩展能力

系统设计支持水平扩展,通过增加服务实例即可提高系统的处理能力:

  • 采集服务可根据任务量动态扩缩容
  • 采用服务注册与发现机制(如 Eureka),新增节点自动加入集群
  • 任务调度服务采用分布式锁避免任务重复分配
@Component
public class RedisDistributedLock {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String LOCK_PREFIX = "distributed:lock:";private static final int LOCK_EXPIRE = 30000; // 锁默认过期时间30秒// 获取锁public boolean lock(String key) {return lock(key, LOCK_EXPIRE);}// 获取锁,指定过期时间public boolean lock(String key, int expireMillis) {String lockKey = LOCK_PREFIX + key;String value = UUID.randomUUID().toString();// 使用SET NX EX命令获取锁Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, value, expireMillis, TimeUnit.MILLISECONDS);return Boolean.TRUE.equals(success);}// 释放锁public void unlock(String key) {String lockKey = LOCK_PREFIX + key;String value = redisTemplate.opsForValue().get(lockKey);if (value != null) {// 使用Lua脚本保证删除操作的原子性String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class), Collections.singletonList(lockKey), value);}}// 尝试获取锁,带超时时间public boolean tryLock(String key, long waitTime, int leaseTime) throws InterruptedException {long startTime = System.currentTimeMillis();String lockKey = LOCK_PREFIX + key;String value = UUID.randomUUID().toString();while (true) {// 尝试获取锁Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, value, leaseTime, TimeUnit.MILLISECONDS);if (Boolean.TRUE.equals(success)) {return true;}// 判断是否超时if (System.currentTimeMillis() - startTime > waitTime) {return false;}// 短暂休眠后重试Thread.sleep(100);}}
}

2. 容错与重试机制

为提高系统可靠性,设计了多层次的容错机制:

  • 服务间调用采用熔断机制(如 Resilience4j)
  • 采集任务失败自动重试
  • 节点故障时任务自动转移到其他节点
@Configuration
public class RetryConfig {@Beanpublic RetryRegistry retryRegistry() {RetryConfig config = RetryConfig.custom().maxAttempts(3) // 最大重试次数.waitDuration(Duration.ofSeconds(2)) // 重试间隔.retryExceptions(IOException.class, RestClientException.class) // 需要重试的异常.ignoreExceptions(IllegalArgumentException.class) // 不需要重试的异常.build();return RetryRegistry.of(config);}@Beanpublic Retry taskRetry(RetryRegistry retryRegistry) {return retryRegistry.retry("taskRetry");}@Beanpublic CircuitBreakerRegistry circuitBreakerRegistry() {CircuitBreakerConfig config = CircuitBreakerConfig.custom().failureRateThreshold(50) // 失败率阈值,超过此时打开断路器.waitDurationInOpenState(Duration.ofSeconds(60)) // 断路器打开状态持续时间.slidingWindowSize(10) // 滑动窗口大小.build();return CircuitBreakerRegistry.of(config);}@Beanpublic CircuitBreaker jdApiCircuitBreaker(CircuitBreakerRegistry registry) {return registry.circuitBreaker("jdApi");}
}

监控与告警

系统集成了 Spring Boot Actuator 和 Prometheus、Grafana 实现监控功能,主要监控指标包括:

  • 各服务节点的 CPU、内存、磁盘使用率
  • 任务执行成功率、平均执行时间
  • API 调用频率、响应时间
  • 数据采集量、存储量
management:endpoints:web:exposure:include: health,info,metrics,prometheusmetrics:export:prometheus:enabled: trueendpoint:health:show-details: alwaysprobes:enabled: true# 自定义监控指标
custom:metrics:task:enabled: truecrawler:enabled: true

总结与展望

本文介绍了基于微服务架构的分布式京东商品数据采集系统的设计与实现方案。该系统具有以下特点:

  1. 高可扩展性:采用微服务架构,支持各组件独立扩展
  2. 高可靠性:通过熔断、重试、分布式锁等机制保证系统稳定运行
  3. 高效采集:分布式部署的采集节点可并行处理大量采集任务
  4. 易于维护:各服务职责单一,便于开发和维护

未来可以从以下几个方面进行优化:

  1. 引入机器学习算法优化任务分配策略,提高系统整体效率
  2. 增加数据挖掘和分析功能,提供更有价值的商业洞察
  3. 优化反爬机制的应对策略,提高数据采集的稳定性
  4. 引入流处理技术,实现实时数据采集与分析

通过这个分布式商品数据采集系统,企业可以高效、稳定地获取京东平台的商品数据,为业务决策提供有力支持。


文章转载自:

http://L91JSFFX.jhwwr.cn
http://k67iq8EO.jhwwr.cn
http://GH33INyr.jhwwr.cn
http://q6n3tdFN.jhwwr.cn
http://9OXA2HxY.jhwwr.cn
http://qNjiRDiQ.jhwwr.cn
http://0zytUlBs.jhwwr.cn
http://5BaQerUL.jhwwr.cn
http://0X3HpllI.jhwwr.cn
http://5p5sCYqQ.jhwwr.cn
http://Ii6GaPjG.jhwwr.cn
http://ftfyM1Mu.jhwwr.cn
http://sS6aTxMM.jhwwr.cn
http://6Ey1LfCN.jhwwr.cn
http://xz5Upk74.jhwwr.cn
http://bkAOIvuG.jhwwr.cn
http://Fq0JN42d.jhwwr.cn
http://4dtbPMcN.jhwwr.cn
http://PsQtSD7y.jhwwr.cn
http://iyQRNRgZ.jhwwr.cn
http://ZFYvqD5T.jhwwr.cn
http://nce1VCOK.jhwwr.cn
http://F4hnuPOf.jhwwr.cn
http://J9fxXFrX.jhwwr.cn
http://c8KLcccX.jhwwr.cn
http://PStuwdiA.jhwwr.cn
http://WAAXFI2r.jhwwr.cn
http://YN2HS0N9.jhwwr.cn
http://2GCBuaCG.jhwwr.cn
http://zqHpO8ig.jhwwr.cn
http://www.dtcms.com/a/374778.html

相关文章:

  • HTML5点击转圈圈 | 深入了解HTML5技术中的动态效果与用户交互设计
  • springboot rabbitmq 延时队列消息确认收货订单已完成
  • CString(MFC/ATL 框架)和 QString(Qt 框架)
  • Sklearn(机器学习)实战:鸢尾花数据集处理技巧
  • 工具框架:Scikit-learn、Pandas、NumPy预测鸢尾花的种类
  • AI GEO 优化能否快速提升网站在搜索引擎的排名?​
  • nvm和nrm的详细安装配置,从卸载nodejs到安装NVM管理nodejs版本,以及安装nrm管理npm版本
  • 对口型视频怎么制作?从脚本到成片的全流程解析
  • 从“能说话”到“会做事”:AI Agent如何重构日常工作流?
  • 洛谷 P1249 最大乘积-普及/提高-
  • 小红书获取笔记详情API接口会返回哪些数据?
  • JAVA Spring Boot maven导入使用本地SDK(jar包)
  • Linux/UNIX系统编程手册笔记:SOCKET
  • F5和Nginx的区别
  • 9.9网编简单TCP,UDP的实现day2
  • Day39 SQLite数据库操作与HTML核心API及页面构建
  • Vue3 与 AntV X6 节点传参、自动布局及边颜色控制教程
  • 线程与进程的区别
  • RAC概念笔记
  • 如何将视频从安卓手机传输到电脑?
  • Day04_苍穹外卖——套餐管理(实战)
  • ElementUI 组件概览
  • fifo之读写指针
  • 【第三次全国土壤普查】一键制备土壤三普环境变量23项遥感植被指数神器
  • Java反射机制详解
  • PDF文件中的广告二维码图片该怎么批量删除
  • 记一次 .NET 某中医药附属医院门诊系统 崩溃分析
  • WPF/Prism 中计算属性的通知机制详解 —— SetProperty 与 RaisePropertyChanged
  • jmeter使用指南
  • 硬件(六)arm指令