当前位置: 首页 > news >正文

【夜话系列】DelayQueue延迟队列(下):实战应用与面试精讲

🔥 本文是DelayQueue系列的下篇,聚焦实战应用场景和性能优化。通过多个真实案例,带你掌握DelayQueue在项目中的最佳实践和性能调优技巧。

📚 系列专栏推荐

  • JAVA集合专栏 【夜话集】
  • JVM知识专栏
  • 数据库sql理论与实战
  • 小游戏开发

在这里插入图片描述

文章目录

    • 一、DelayQueue实战应用
      • 1.1 订单超时自动取消
        • 1.1.1 业务场景分析
        • 1.1.2 实现方案设计
        • 1.1.3 完整代码示例
        • 1.1.4 注意事项与优化点
      • 1.2 限时优惠券管理
        • 1.2.1 优惠券过期处理
        • 1.2.2 动态失效时间控制
        • 1.2.3 并发安全处理
        • 1.2.4 实现代码与优化
      • 1.3 缓存过期清理
        • 1.3.1 缓存淘汰策略
        • 1.3.2 延迟清理机制
        • 1.3.3 内存占用优化
        • 1.3.4 示例实现
        • 1.3.5 性能优化建议
      • 1.4 实战应用总结
    • 二、面试重点解析
      • 2.1 原理相关题目
        • Q1: DelayQueue的核心原理是什么?它是如何保证元素按照延迟时间顺序被处理的?
        • Q2: DelayQueue与Timer/TimerTask相比有什么优势?
        • Q3: DelayQueue是如何实现线程安全的?
      • 2.2 实现细节考点
        • Q1: 如何实现一个自定义的延时任务放入DelayQueue?
        • Q2: DelayQueue的take()方法是如何实现的?为什么它能够精确地在延迟到期时返回元素?
        • Q3: DelayQueue中的元素可以被更新延迟时间吗?如何实现?
      • 2.3 应用场景案例
        • Q1: 请设计一个基于DelayQueue的限流器,实现令牌桶算法。
        • Q2: 如何使用DelayQueue实现一个支持定时取消的异步任务系统?
        • Q3: 在分布式系统中,如何结合Redis实现类似DelayQueue的功能?
      • 2.4 性能调优问题
        • Q1: 在高并发场景下,DelayQueue可能面临哪些性能问题?如何优化?
        • Q2: 如何设计一个支持持久化的DelayQueue,确保系统重启后任务不丢失?
      • 2.5 面试真题解析
        • 真题1: 如何使用DelayQueue实现一个限流器,要求每个接口每秒最多处理N个请求?
        • 真题2: 在一个电商系统中,如何使用DelayQueue实现秒杀活动的定时开始?
    • 三、思考题
    • 写在最后

一、DelayQueue实战应用

1.1 订单超时自动取消

1.1.1 业务场景分析

在电商系统中,订单创建后通常需要在一定时间内完成支付,否则系统会自动取消订单并释放库存。这是一个典型的延时任务场景:

  • 订单创建后,需要设置一个倒计时(通常为15分钟或30分钟)
  • 如果在倒计时结束前完成支付,需要取消该延时任务
  • 如果倒计时结束时订单仍未支付,则自动取消订单
  • 系统需要支持大量并发订单的超时管理

传统实现方式通常采用定时任务扫描数据库,但这种方式存在以下问题:

  • 数据库压力大,特别是订单量大的场景
  • 实时性不够,可能出现几秒甚至几分钟的延迟
  • 资源消耗高,需要频繁扫描数据库

使用DelayQueue可以很好地解决这些问题,实现内存级的订单超时管理。

1.1.2 实现方案设计

基于DelayQueue的订单超时取消方案设计如下:

  1. 创建一个实现Delayed接口的订单超时任务类
  2. 维护一个全局的DelayQueue,用于管理所有未支付订单的超时任务
  3. 订单创建时,向DelayQueue中添加对应的超时任务
  4. 订单支付成功时,从DelayQueue中移除对应的超时任务
  5. 启动专门的线程从DelayQueue中获取到期的任务,执行订单取消逻辑

这种设计的优势在于:

  • 内存级处理,性能高
  • 精确的超时控制,无需频繁扫描数据库
  • 支持动态取消超时任务
  • 系统重启后可以通过数据库中的订单状态和创建时间重建延时队列
1.1.3 完整代码示例
import java.util.Map;
import java.util.concurrent.*;

/**
 * 基于DelayQueue实现的订单超时自动取消功能
 */
public class OrderTimeoutCancelService {
    // 订单超时时间,单位毫秒
    private final long ORDER_TIMEOUT = 30 * 60 * 1000; // 30分钟
    
    // 延迟队列,用于处理订单超时
    private final DelayQueue<OrderDelayTask> delayQueue = new DelayQueue<>();
    
    // 用于存储订单与任务的映射关系,便于取消任务
    private final Map<String, OrderDelayTask> taskMap = new ConcurrentHashMap<>();
    
    // 订单服务,实际业务中通过依赖注入获取
    private final OrderService orderService;
    
    public OrderTimeoutCancelService(OrderService orderService) {
        this.orderService = orderService;
        // 启动处理线程
        new Thread(this::processTimeoutOrders).start();
    }
    
    /**
     * 添加订单超时任务
     * @param orderId 订单ID
     */
    public void addOrderTimeoutTask(String orderId) {
        // 创建超时任务
        OrderDelayTask task = new OrderDelayTask(orderId, ORDER_TIMEOUT);
        // 添加到延迟队列
        delayQueue.offer(task);
        // 保存映射关系
        taskMap.put(orderId, task);
        
        System.out.println("订单[" + orderId + "]加入超时队列,将在" 
                + ORDER_TIMEOUT/1000 + "秒后自动取消");
    }
    
    /**
     * 订单支付成功,取消超时任务
     * @param orderId 订单ID
     */
    public void orderPaid(String orderId) {
        OrderDelayTask task = taskMap.remove(orderId);
        if (task != null) {
            // 从队列中移除任务(这里利用了equals方法判断)
            delayQueue.remove(task);
            System.out.println("订单[" + orderId + "]已支付,取消超时任务");
        }
    }
    
    /**
     * 处理超时订单的线程任务
     */
    private void processTimeoutOrders() {
        System.out.println("订单超时处理线程已启动");
        while (true) {
            try {
                // 获取超时的订单任务
                OrderDelayTask task = delayQueue.take();
                // 从映射中移除
                taskMap.remove(task.getOrderId());
                // 执行订单取消逻辑
                orderService.cancelOrder(task.getOrderId(), "订单超时未支付");
                
                System.out.println("订单[" + task.getOrderId() + "]超时未支付,已自动取消");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                // 处理异常,实际项目中应该有更完善的异常处理
                System.err.println("处理超时订单异常:" + e.getMessage());
            }
        }
    }
    
    /**
     * 订单延迟任务
     */
    static class OrderDelayTask implements Delayed {
        private final String orderId;
        private final long expireTime; // 过期时间,单位:毫秒
        
        public OrderDelayTask(String orderId, long delayTime) {
            this.orderId = orderId;
            this.expireTime = System.currentTimeMillis() + delayTime;
        }
        
        public String getOrderId() {
            return orderId;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.expireTime, ((OrderDelayTask) o).expireTime);
        }
        
        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null || getClass() != obj.getClass()) return false;
            OrderDelayTask that = (OrderDelayTask) obj;
            return orderId.equals(that.orderId);
        }
        
        @Override
        public int hashCode() {
            return orderId.hashCode();
        }
    }
    
    /**
     * 模拟订单服务接口
     */
    interface OrderService {
        void cancelOrder(String orderId, String reason);
    }
    
    /**
     * 测试代码
     */
    public static void main(String[] args) throws InterruptedException {
        // 模拟订单服务实现
        OrderService orderService = (orderId, reason) -> 
            System.out.println("执行订单[" + orderId + "]取消操作,原因:" + reason);
        
        // 创建订单超时服务
        OrderTimeoutCancelService service = new OrderTimeoutCancelService(orderService);
        
        // 模拟创建3个订单
        service.addOrderTimeoutTask("ORDER_001");
        service.addOrderTimeoutTask("ORDER_002");
        service.addOrderTimeoutTask("ORDER_003");
        
        // 模拟1秒后支付了订单2
        Thread.sleep(1000);
        service.orderPaid("ORDER_002");
        
        // 等待所有订单处理完成
        Thread.sleep(35000);
    }
}
1.1.4 注意事项与优化点
  1. 任务去重

    • 重写了equals和hashCode方法,确保可以根据订单ID正确移除任务
    • 使用ConcurrentHashMap存储订单ID与任务的映射,便于快速查找和取消任务
  2. 异常处理

    • 处理线程中捕获所有异常,避免因单个任务异常导致整个处理线程终止
    • 实际项目中应该添加更完善的日志记录和异常处理机制
  3. 系统重启恢复

    • 系统重启后,内存中的DelayQueue会丢失所有任务
    • 解决方案:系统启动时,从数据库加载所有未支付且未超时的订单,重新加入DelayQueue
    // 系统启动时恢复未处理的订单超时任务
    public void recoverOrderTasks() {
        List<Order> pendingOrders = orderService.findPendingPaymentOrders();
        for (Order order : pendingOrders) {
            // 计算剩余超时时间
            long createTime = order.getCreateTime().getTime();
            long now = System.currentTimeMillis();
            long remainTimeout = createTime + ORDER_TIMEOUT - now;
            
            // 如果订单还未超时,则加入延迟队列
            if (remainTimeout > 0) {
                OrderDelayTask task = new OrderDelayTask(order.getOrderId(), remainTimeout);
                delayQueue.offer(task);
                taskMap.put(order.getOrderId(), task);
            } else {
                // 已超时但未处理的订单,直接执行取消逻辑
                orderService.cancelOrder(order.getOrderId(), "系统重启,订单超时未支付");
            }
        }
        System.out.println("成功恢复" + pendingOrders.size() + "个未支付订单的超时任务");
    }
    
  4. 性能优化

    • 使用线程池替代单个线程处理超时订单,提高并发处理能力
    • 批量处理超时订单,减少数据库操作次数
    • 考虑使用分布式延迟队列,解决单机容量和可靠性问题

1.2 限时优惠券管理

1.2.1 优惠券过期处理

电商和营销系统中,限时优惠券是常见的营销手段。优惠券通常有固定的有效期,过期后需要自动失效。传统的优惠券过期处理方式有:

  1. 定时任务扫描:定期扫描数据库,将过期优惠券标记为失效
  2. 使用时判断:用户使用优惠券时判断是否过期
  3. 缓存过期:将优惠券信息存入Redis等缓存,设置过期时间

这些方式各有优缺点,但都不够实时或资源消耗较大。使用DelayQueue可以实现内存级的优惠券过期管理,既保证实时性,又减少系统资源消耗。

1.2.2 动态失效时间控制

优惠券的一个特点是失效时间可能是动态的:

  • 固定日期失效:如"2023-12-31 23:59:59"
  • 相对时间失效:如"领取后7天内有效"
  • 活动结束失效:如"双11活动结束后失效"

DelayQueue可以很好地支持这些动态失效时间控制,只需在创建延时任务时计算正确的延迟时间即可。

1.2.3 并发安全处理

优惠券系统面临的并发场景主要有:

  • 大量用户同时领取优惠券
  • 用户使用优惠券的同时,优惠券可能正好过期
  • 系统需要动态调整优惠券的有效期

这些场景需要保证数据一致性和操作的原子性。DelayQueue本身是线程安全的,但与数据库操作的配合需要特别注意事务和锁的使用。

1.2.4 实现代码与优化
import java.util.*;
import java.util.concurrent.*;

/**
 * 基于DelayQueue实现的限时优惠券管理系统
 */
public class CouponExpirationManager {
    // 延迟队列,用于处理优惠券过期
    private final DelayQueue<CouponExpireTask> delayQueue = new DelayQueue<>();
    
    // 用于存储优惠券ID与任务的映射关系
    private final Map<String, CouponExpireTask> taskMap = new ConcurrentHashMap<>();
    
    // 优惠券服务,实际业务中通过依赖注入获取
    private final CouponService couponService;
    
    // 线程池,用于处理过期优惠券
    private final ExecutorService executorService;
    
    public CouponExpirationManager(CouponService couponService) {
        this.couponService = couponService;
        // 创建线程池
        this.executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors(),
            r -> {
                Thread thread = new Thread(r, "coupon-expiration-thread");
                thread.setDaemon(true); // 设置为守护线程
                return thread;
            }
        );
        
        // 启动处理线程
        this.executorService.execute(this::processExpiredCoupons);
    }
    
    /**
     * 添加优惠券过期任务
     * @param couponId 优惠券ID
     * @param expireTime 过期时间点(时间戳)
     */
    public void addCouponExpireTask(String couponId, long expireTime) {
        // 计算延迟时间
        long delay = expireTime - System.currentTimeMillis();
        if (delay <= 0) {
            // 已过期,直接处理
            couponService.expireCoupon(couponId);
            return;
        }
        
        // 创建过期任务
        CouponExpireTask task = new CouponExpireTask(couponId, delay);
        // 添加到延迟队列
        delayQueue.offer(task);
        // 保存映射关系
        taskMap.put(couponId, task);
        
        System.out.println("优惠券[" + couponId + "]加入过期队列,将在" 
                + new Date(expireTime) + "过期");
    }
    
    /**
     * 更新优惠券过期时间
     * @param couponId 优惠券ID
     * @param newExpireTime 新的过期时间点(时间戳)
     */
    public void updateCouponExpireTime(String couponId, long newExpireTime) {
        // 移除旧任务
        CouponExpireTask oldTask = taskMap.remove(couponId);
        if (oldTask != null) {
            delayQueue.remove(oldTask);
        }
        
        // 添加新任务
        addCouponExpireTask(couponId, newExpireTime);
        System.out.println("优惠券[" + couponId + "]过期时间已更新为" + new Date(newExpireTime));
    }
    
    /**
     * 取消优惠券过期任务(优惠券被使用或手动作废)
     * @param couponId 优惠券ID
     */
    public void cancelExpireTask(String couponId) {
        CouponExpireTask task = taskMap.remove(couponId);
        if (task != null) {
            delayQueue.remove(task);
            System.out.println("优惠券[" + couponId + "]过期任务已取消");
        }
    }
    
    /**
     * 处理过期优惠券的线程任务
     */
    private void processExpiredCoupons() {
        System.out.println("优惠券过期处理线程已启动");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 批量处理过期优惠券,提高效率
                List<CouponExpireTask> expiredTasks = new ArrayList<>();
                CouponExpireTask task = delayQueue.take(); // 获取一个过期任务
                expiredTasks.add(task);
                
                // 尝试一次性获取多个过期任务
                delayQueue.drainTo(expiredTasks, 100);
                
                // 批量处理过期优惠券
                List<String> couponIds = new ArrayList<>(expiredTasks.size());
                for (CouponExpireTask expiredTask : expiredTasks) {
                    String couponId = expiredTask.getCouponId();
                    taskMap.remove(couponId);
                    couponIds.add(couponId);
                }
                
                // 批量更新数据库
                couponService.batchExpireCoupons(couponIds);
                
                System.out.println("已处理" + couponIds.size() + "张过期优惠券");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                System.err.println("处理过期优惠券异常:" + e.getMessage());
            }
        }
    }
    
    /**
     * 关闭管理器
     */
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * 优惠券过期任务
     */
    static class CouponExpireTask implements Delayed {
        private final String couponId;
        private final long expireTime; // 过期时间点,单位:毫秒
        
        public CouponExpireTask(String couponId, long delayTime) {
            this.couponId = couponId;
            this.expireTime = System.currentTimeMillis() + delayTime;
        }
        
        public String getCouponId() {
            return couponId;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.expireTime, ((CouponExpireTask) o).expireTime);
        }
        
        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null || getClass() != obj.getClass()) return false;
            CouponExpireTask that = (CouponExpireTask) obj;
            return couponId.equals(that.couponId);
        }
        
        @Override
        public int hashCode() {
            return couponId.hashCode();
        }
    }
    
    /**
     * 优惠券服务接口
     */
    interface CouponService {
        void expireCoupon(String couponId);
        void batchExpireCoupons(List<String> couponIds);
    }
    
    /**
     * 测试代码
     */
    public static void main(String[] args) throws InterruptedException {
        // 模拟优惠券服务实现
        CouponService couponService = new CouponService() {
            @Override
            public void expireCoupon(String couponId) {
                System.out.println("优惠券[" + couponId + "]已过期");
            }
            
            @Override
            public void batchExpireCoupons(List<String> couponIds) {
                System.out.println("批量处理过期优惠券:" + couponIds);
            }
        };
        
        // 创建优惠券过期管理器
        CouponExpirationManager manager = new CouponExpirationManager(couponService);
        
        // 模拟添加优惠券过期任务
        Calendar calendar = Calendar.getInstance();
        
        // 优惠券1:5秒后过期
        calendar.add(Calendar.SECOND, 5);
        manager.addCouponExpireTask("COUPON_001", calendar.getTimeInMillis());
        
        // 优惠券2:10秒后过期
        calendar.add(Calendar.SECOND, 5);
        manager.addCouponExpireTask("COUPON_002", calendar.getTimeInMillis());
        
        // 优惠券3:15秒后过期
        calendar.add(Calendar.SECOND, 5);
        manager.addCouponExpireTask("COUPON_003", calendar.getTimeInMillis());
        
        // 2秒后更新优惠券1的过期时间为20秒后
        Thread.sleep(2000);
        calendar.add(Calendar.SECOND, 5);
        manager.updateCouponExpireTime("COUPON_001", calendar.getTimeInMillis());
        
        // 3秒后取消优惠券2的过期任务(模拟优惠券被使用)
        Thread.sleep(3000);
        manager.cancelExpireTask("COUPON_002");
        
        // 等待所有优惠券处理完成
        Thread.sleep(25000);
        
        // 关闭管理器
        manager.shutdown();
    }
}

这个实现包含了以下优化点:

  1. 使用线程池:使用线程池替代单个线程,提高并发处理能力
  2. 批量处理:使用drainTo方法批量获取过期任务,减少数据库操作次数
  3. 动态调整过期时间:支持更新优惠券的过期时间
  4. 任务取消:支持取消过期任务,适用于优惠券被使用或手动作废的场景
  5. 守护线程:将处理线程设置为守护线程,避免阻止JVM退出
  6. 优雅关闭:提供shutdown方法,确保线程池正确关闭

1.3 缓存过期清理

1.3.1 缓存淘汰策略

缓存系统通常需要实现一定的淘汰策略,防止内存无限增长。常见的淘汰策略有:

  1. LRU (Least Recently Used):最近最少使用,淘汰最长时间未被访问的数据
  2. LFU (Least Frequently Used):最不经常使用,淘汰访问次数最少的数据
  3. FIFO (First In First Out):先进先出,淘汰最先加入的数据
  4. TTL (Time To Live):基于时间的淘汰策略,数据项在一定时间后自动过期

其中,TTL策略最适合使用DelayQueue实现,可以精确控制每个缓存项的过期时间,并在过期后自动清理。

1.3.2 延迟清理机制

传统的缓存过期实现通常有两种方式:

  • 惰性删除:在访问缓存时检查是否过期,过期则删除
  • 定期删除:定期扫描缓存,删除过期项

DelayQueue可以实现第三种方式:精确删除,即在缓存项到达过期时间点时精确触发删除操作。这种方式既避免了惰性删除可能导致的内存占用问题,又避免了定期删除的资源消耗和不精确问题。

1.3.3 内存占用优化

在实现基于DelayQueue的缓存过期清理时,需要注意内存占用问题:

  • DelayQueue本身会占用内存,每个缓存项都对应一个延迟任务
  • 缓存数据和延迟任务的引用关系需要谨慎处理,避免内存泄漏
  • 对于大型缓存,可能需要分片或分级管理,避免单个DelayQueue过大
1.3.4 示例实现

下面是一个基于DelayQueue实现的简单缓存系统,支持TTL过期策略:

import java.util.Map;
import java.util.concurrent.*;

/**
 * 基于DelayQueue实现的缓存系统,支持自动过期清理
 * @param <K> 键类型
 * @param <V> 值类型
 */
public class DelayedCache<K, V> {
    // 缓存数据存储
    private final Map<K, V> cache = new ConcurrentHashMap<>();
    
    // 过期任务队列
    private final DelayQueue<ExpireTask<K>> delayQueue = new DelayQueue<>();
    
    // 默认过期时间(毫秒)
    private final long defaultExpiration;
    
    // 清理线程
    private final Thread cleanerThread;
    
    // 是否关闭
    private volatile boolean closed = false;
    
    /**
     * 创建缓存,指定默认过期时间
     * @param defaultExpiration 默认过期时间(毫秒)
     */
    public DelayedCache(long defaultExpiration) {
        this.defaultExpiration = defaultExpiration;
        
        // 创建并启动清理线程
        this.cleanerThread = new Thread(this::cleanExpiredEntries);
        this.cleanerThread.setDaemon(true);
        this.cleanerThread.setName("cache-cleaner-thread");
        this.cleanerThread.start();
    }
    
    /**
     * 添加缓存项,使用默认过期时间
     * @param key 键
     * @param value 值
     */
    public void put(K key, V value) {
        put(key, value, defaultExpiration);
    }
    
    /**
     * 添加缓存项,指定过期时间
     * @param key 键
     * @param value 值
     * @param expiration 过期时间(毫秒)
     */
    public void put(K key, V value, long expiration) {
        if (closed) {
            throw new IllegalStateException("Cache is closed");
        }
        
        // 先移除旧的过期任务(如果存在)
        removeExpireTask(key);
        
        // 添加到缓存
        cache.put(key, value);
        
        // 创建过期任务
        if (expiration > 0) {
            ExpireTask<K> task = new ExpireTask<>(key, expiration);
            delayQueue.offer(task);
        }
    }
    
    /**
     * 获取缓存项
     * @param key 键
     * @return 值,如果不存在或已过期则返回null
     */
    public V get(K key) {
        if (closed) {
            throw new IllegalStateException("Cache is closed");
        }
        return cache.get(key);
    }
    
    /**
     * 移除缓存项
     * @param key 键
     * @return 被移除的值,如果不存在则返回null
     */
    public V remove(K key) {
        if (closed) {
            throw new IllegalStateException("Cache is closed");
        }
        
        // 移除过期任务
        removeExpireTask(key);
        
        // 从缓存中移除并返回值
        return cache.remove(key);
    }
    
    /**
     * 移除指定键的过期任务
     * @param key 键
     */
    private void removeExpireTask(K key) {
        // 创建一个临时任务用于比较和删除
        // 注意:这里利用了equals方法只比较key的特性
        ExpireTask<K> task = new ExpireTask<>(key, 0);
        delayQueue.remove(task);
    }
    
      /**
     * 清理过期缓存项的线程任务
     */
    private void cleanExpiredEntries() {
        while (!closed) {
            try {
                // 获取过期任务
                ExpireTask<K> task = delayQueue.take();
                // 从缓存中移除
                K key = task.getKey();
                cache.remove(key);
                System.out.println("缓存项[" + key + "]已过期,自动清理");
            } catch (InterruptedException e) {
                if (closed) {
                    break;
                }
            } catch (Exception e) {
                System.err.println("清理过期缓存异常:" + e.getMessage());
            }
        }
    }
    
    /**
     * 获取缓存大小
     * @return 缓存中的项数
     */
    public int size() {
        return cache.size();
    }
    
    /**
     * 关闭缓存,清理资源
     */
    public void close() {
        closed = true;
        cleanerThread.interrupt();
        clear();
    }
    
    /**
     * 清空缓存
     */
    public void clear() {
        delayQueue.clear();
        cache.clear();
    }
    
    /**
     * 缓存过期任务
     * @param <K> 键类型
     */
    static class ExpireTask<K> implements Delayed {
        private final K key;
        private final long expireTime;
        
        public ExpireTask(K key, long delayMillis) {
            this.key = key;
            this.expireTime = System.currentTimeMillis() + delayMillis;
        }
        
        public K getKey() {
            return key;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.expireTime, ((ExpireTask<?>) o).expireTime);
        }
        
        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null || getClass() != obj.getClass()) return false;
            ExpireTask<?> that = (ExpireTask<?>) obj;
            return key.equals(that.key);
        }
        
        @Override
        public int hashCode() {
            return key.hashCode();
        }
    }
    
    /**
     * 测试代码
     */
    public static void main(String[] args) throws InterruptedException {
        // 创建缓存,默认过期时间5秒
        DelayedCache<String, String> cache = new DelayedCache<>(5000);
        
        // 添加缓存项
        cache.put("key1", "value1");                    // 使用默认过期时间(5秒)
        cache.put("key2", "value2", 10000);  // 10秒后过期
        cache.put("key3", "value3", 3000);   // 3秒后过期
        
        System.out.println("初始缓存大小:" + cache.size());
        
        // 等待3.5秒,key3应该过期
        Thread.sleep(3500);
        System.out.println("3.5秒后,key1=" + cache.get("key1") + ", key2=" + cache.get("key2") + ", key3=" + cache.get("key3"));
        System.out.println("当前缓存大小:" + cache.size());
        
        // 等待2秒,key1应该过期
        Thread.sleep(2000);
        System.out.println("再过2秒后,key1=" + cache.get("key1") + ", key2=" + cache.get("key2"));
        System.out.println("当前缓存大小:" + cache.size());
        
        // 手动移除key2
        cache.remove("key2");
        System.out.println("手动移除key2后,缓存大小:" + cache.size());
        
        // 添加新的缓存项
        cache.put("key4", "value4", 2000);   // 2秒后过期
        System.out.println("添加key4后,缓存大小:" + cache.size());
        
        // 等待3秒,key4应该过期
        Thread.sleep(3000);
        System.out.println("3秒后,key4=" + cache.get("key4"));
        System.out.println("最终缓存大小:" + cache.size());
        
        // 关闭缓存
        cache.close();
    }
}

这个缓存实现具有以下特点:

  1. 自动过期清理:利用DelayQueue精确控制缓存项的过期时间
  2. 线程安全:使用ConcurrentHashMap存储数据,支持并发访问
  3. 灵活的过期策略:支持默认过期时间和自定义过期时间
  4. 资源管理:提供close方法释放资源,清理线程设置为守护线程
  5. 动态管理:支持手动移除缓存项,自动取消对应的过期任务
1.3.5 性能优化建议

在实际应用中,可以对上述实现进行以下优化:

  1. 分段锁设计:对于高并发场景,可以实现分段锁机制,减少锁竞争
  2. 批量处理:定期批量处理过期任务,而不是每次只处理一个
  3. 弱引用/软引用:使用WeakReference或SoftReference存储值,支持内存敏感的缓存策略
  4. 统计信息:添加命中率、过期率等统计信息,便于监控和调优
  5. 多级缓存:实现内存+磁盘的多级缓存,提高容量和持久性
  6. 事件通知:添加缓存项过期的事件通知机制,支持自定义过期处理逻辑
// 使用软引用优化的缓存实现示例
public void put(K key, V value, long expiration) {
    if (closed) {
        throw new IllegalStateException("Cache is closed");
    }
    
    // 先移除旧的过期任务(如果存在)
    removeExpireTask(key);
    
    // 使用软引用包装值
    SoftReference<V> softRef = new SoftReference<>(value);
    
    // 添加到缓存
    cache.put(key, softRef);
    
    // 创建过期任务
    if (expiration > 0) {
        ExpireTask<K> task = new ExpireTask<>(key, expiration);
        delayQueue.offer(task);
    }
}

// 获取时需要处理软引用
public V get(K key) {
    if (closed) {
        throw new IllegalStateException("Cache is closed");
    }
    
    SoftReference<V> softRef = cache.get(key);
    if (softRef == null) {
        return null;
    }
    
    V value = softRef.get();
    if (value == null) {
        // 值已被GC回收,从缓存中移除
        cache.remove(key);
        removeExpireTask(key);
        return null;
    }
    
    return value;
}

1.4 实战应用总结

通过以上三个实战案例,我们可以看到DelayQueue在处理延时任务方面的强大能力。总结DelayQueue的适用场景和优势:

  1. 精确的延时处理:DelayQueue能够精确控制任务的执行时间,适合对时间精度要求较高的场景
  2. 内存级处理:相比数据库扫描等方式,DelayQueue的内存级处理性能更高
  3. 动态管理:支持动态添加、移除和更新延时任务,适应业务变化
  4. 优先级排序:自动按照到期时间排序,确保最先到期的任务最先处理

同时,在使用DelayQueue时也需要注意以下几点:

  1. 内存限制:所有任务都存储在内存中,需要控制任务数量,避免内存溢出
  2. 任务持久化:系统重启后DelayQueue中的任务会丢失,需要结合数据库等持久化方案
  3. 分布式支持:DelayQueue不支持分布式,在分布式环境下需要结合Redis、ZooKeeper等工具
  4. 异常处理:任务处理过程中的异常需要妥善处理,避免影响其他任务

在实际项目中,可以根据具体需求选择合适的延时任务处理方案,DelayQueue是单机环境下处理延时任务的优秀选择。

二、面试重点解析

在Java并发编程的面试中,DelayQueue作为一个特殊的阻塞队列,经常成为考察重点。本章将从原理、实现细节、应用场景和性能调优四个方面,梳理DelayQueue相关的面试题,帮助读者在面试中游刃有余。

2.1 原理相关题目

Q1: DelayQueue的核心原理是什么?它是如何保证元素按照延迟时间顺序被处理的?

参考答案
DelayQueue的核心原理是结合了优先级队列(PriorityQueue)和阻塞队列(BlockingQueue)的特性。它通过以下机制保证元素按照延迟时间顺序处理:

  1. Delayed接口:所有放入DelayQueue的元素必须实现Delayed接口,该接口定义了获取剩余延迟时间的方法getDelay()和元素之间比较的方法compareTo()。

  2. 优先级排序:内部使用PriorityQueue存储元素,并根据延迟时间进行排序,确保延迟时间最小的元素位于队列头部。

  3. 阻塞机制:take()方法会阻塞直到队列头部的元素延迟时间到期。如果队列为空或队列头部元素尚未到期,线程将被阻塞。

  4. Leader-Follower模式:使用leader线程优化,避免所有线程都等待相同的时间。

// DelayQueue核心属性
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
private Thread leader = null;
Q2: DelayQueue与Timer/TimerTask相比有什么优势?

参考答案
DelayQueue相比Timer/TimerTask有以下优势:

  1. 线程安全:DelayQueue是线程安全的,而Timer只有一个工作线程,一个任务异常会影响其他任务。

  2. 任务隔离:DelayQueue中的任务相互独立,一个任务异常不会影响其他任务的执行。

  3. 灵活性:DelayQueue可以与线程池结合使用,实现更灵活的任务调度。

  4. 动态管理:可以动态添加、移除和修改延时任务,而Timer的任务一旦提交就不易取消。

  5. 精确调度:DelayQueue基于优先级队列实现,调度更精确,而Timer可能因为某个任务执行时间过长导致后续任务延迟执行。

Q3: DelayQueue是如何实现线程安全的?

参考答案
DelayQueue通过以下机制实现线程安全:

  1. ReentrantLock:使用可重入锁保护所有对队列的操作,确保线程安全。
private final ReentrantLock lock = new ReentrantLock();
  1. Condition:使用条件变量实现线程等待和唤醒机制。
private final Condition available = lock.newCondition();
  1. 原子操作:所有对队列的修改操作都在锁的保护下进行,确保原子性。
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
  1. Leader-Follower模式:使用leader线程优化并发性能,减少不必要的线程唤醒。

2.2 实现细节考点

Q1: 如何实现一个自定义的延时任务放入DelayQueue?

参考答案
实现自定义延时任务需要完成以下步骤:

  1. 实现Delayed接口,包括getDelay()和compareTo()方法:
public class CustomDelayedTask implements Delayed {
    private final String taskId;
    private final long executeTime; // 任务执行时间点
    
    public CustomDelayedTask(String taskId, long delayInMillis) {
        this.taskId = taskId;
        this.executeTime = System.currentTimeMillis() + delayInMillis;
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        // 返回剩余延迟时间
        return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    
    @Override
    public int compareTo(Delayed other) {
        if (other == this) {
            return 0;
        }
        if (other instanceof CustomDelayedTask) {
            CustomDelayedTask otherTask = (CustomDelayedTask) other;
            return Long.compare(this.executeTime, otherTask.executeTime);
        }
        // 比较剩余延迟时间
        return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
    }
    
    public String getTaskId() {
        return taskId;
    }
    
    @Override
    public String toString() {
        return "CustomDelayedTask{taskId='" + taskId + "', executeTime=" + executeTime + "}";
    }
}
  1. 创建DelayQueue并添加任务:
DelayQueue<CustomDelayedTask> delayQueue = new DelayQueue<>();
delayQueue.offer(new CustomDelayedTask("task-1", 5000)); // 5秒后执行
  1. 启动消费线程处理到期任务:
new Thread(() -> {
    while (true) {
        try {
            CustomDelayedTask task = delayQueue.take(); // 阻塞直到有任务到期
            System.out.println("执行任务: " + task);
            // 执行任务逻辑...
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
}).start();
Q2: DelayQueue的take()方法是如何实现的?为什么它能够精确地在延迟到期时返回元素?

参考答案
DelayQueue的take()方法实现精确延迟的关键在于以下几点:

  1. 检查队列头部元素:首先检查队列是否为空,如果为空则等待。

  2. 检查延迟是否到期:获取队列头部元素,检查其延迟是否已到期(getDelay() <= 0)。

  3. 精确等待:如果头部元素延迟未到期,使用awaitNanos()方法精确等待剩余的延迟时间。

  4. Leader-Follower模式:使用leader线程优化,只有一个线程(leader)会等待特定的时间,其他线程无限期等待,直到leader线程被唤醒或超时后,会唤醒一个follower成为新的leader。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                // 队列为空,等待
                available.await();
            } else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0) {
                    // 延迟已到期,返回元素
                    return q.poll();
                }
                
                // 延迟未到期,需要等待
                first = null; // 避免内存泄漏
                if (leader != null) {
                    // 已有leader线程,无限期等待
                    available.await();
                } else {
                    // 成为leader线程,精确等待
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 精确等待剩余延迟时间
                        available.awaitNanos(delay);
                    } finally {
                        // 等待结束,取消leader身份
                        if (leader == thisThread) {
                            leader = null;
                        }
                    }
                }
            }
        }
    } finally {
        // 如果队列非空且没有leader,唤醒一个等待线程
        if (leader == null && q.peek() != null) {
            available.signal();
        }
        lock.unlock();
    }
}
Q3: DelayQueue中的元素可以被更新延迟时间吗?如何实现?

参考答案
DelayQueue本身不直接支持更新元素的延迟时间,因为PriorityQueue的排序是在元素插入时确定的。但可以通过以下方式实现延迟时间的更新:

  1. 移除并重新添加:从队列中移除元素,更新延迟时间后重新添加。
public boolean updateDelay(DelayedTask task, long newDelayInMillis) {
    // 从队列中移除任务
    boolean removed = delayQueue.remove(task);
    if (removed) {
        // 更新延迟时间
        task.updateDelay(newDelayInMillis);
        // 重新添加到队列
        return delayQueue.offer(task);
    }
    return false;
}
  1. 使用任务ID映射:维护一个任务ID到任务对象的映射,通过ID查找和更新任务。
public class DelayQueueManager {
    private final DelayQueue<MutableDelayedTask> delayQueue = new DelayQueue<>();
    private final Map<String, MutableDelayedTask> taskMap = new ConcurrentHashMap<>();
    
    public boolean addTask(String taskId, long delayInMillis) {
        MutableDelayedTask task = new MutableDelayedTask(taskId, delayInMillis);
        taskMap.put(taskId, task);
        return delayQueue.offer(task);
    }
    
    public boolean updateDelay(String taskId, long newDelayInMillis) {
        MutableDelayedTask task = taskMap.get(taskId);
        if (task != null) {
            // 从队列中移除
            delayQueue.remove(task);
            // 更新延迟时间
            task.updateDelay(newDelayInMillis);
            // 重新添加到队列
            return delayQueue.offer(task);
        }
        return false;
    }
    
    public boolean cancelTask(String taskId) {
        MutableDelayedTask task = taskMap.remove(taskId);
        if (task != null) {
            return delayQueue.remove(task);
        }
        return false;
    }
    
    // 可变延迟任务类
    static class MutableDelayedTask implements Delayed {
        private final String taskId;
        private long executeTime;
        
        public MutableDelayedTask(String taskId, long delayInMillis) {
            this.taskId = taskId;
            this.executeTime = System.currentTimeMillis() + delayInMillis;
        }
        
        public void updateDelay(long delayInMillis) {
            this.executeTime = System.currentTimeMillis() + delayInMillis;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed other) {
            if (other instanceof MutableDelayedTask) {
                return Long.compare(this.executeTime, ((MutableDelayedTask) other).executeTime);
            }
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
        }
        
        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null || getClass() != obj.getClass()) return false;
            MutableDelayedTask that = (MutableDelayedTask) obj;
            return taskId.equals(that.taskId);
        }
        
        @Override
        public int hashCode() {
            return taskId.hashCode();
        }
    }
}

注意:更新延迟时间需要小心处理,确保在并发环境下的线程安全。

2.3 应用场景案例

Q1: 请设计一个基于DelayQueue的限流器,实现令牌桶算法。

参考答案
基于DelayQueue实现的令牌桶限流器设计如下:

public class DelayQueueRateLimiter {
    // 令牌桶容量
    private final int capacity;
    // 令牌生成速率(个/秒)
    private final int rate;
    // 当前可用令牌数
    private final AtomicInteger tokens;
    // 延迟队列,用于定时生成令牌
    private final DelayQueue<TokenTask> delayQueue = new DelayQueue<>();
    
    public DelayQueueRateLimiter(int capacity, int rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.tokens = new AtomicInteger(capacity); // 初始填满令牌桶
        
        // 启动令牌生成线程
        new Thread(this::generateTokens).start();
    }
    
    /**
     * 尝试获取指定数量的令牌
     * @param count 需要的令牌数量
     * @return 是否获取成功
     */
    public boolean tryAcquire(int count) {
        if (count <= 0) {
            return true;
        }
        
        // 尝试减少令牌
        int current, next;
        do {
            current = tokens.get();
            if (current < count) {
                return false; // 令牌不足
            }
            next = current - count;
        } while (!tokens.compareAndSet(current, next));
        
        // 如果令牌桶未满,添加令牌生成任务
        if (current < capacity) {
            scheduleTokenGeneration();
        }
        
        return true;
    }
    
    /**
     * 安排令牌生成任务
     */
    private void scheduleTokenGeneration() {
        // 计算生成一个令牌需要的时间(毫秒)
        long tokenGenerationTime = 1000 / rate;
        delayQueue.offer(new TokenTask(tokenGenerationTime));
    }
    
    /**
     * 令牌生成线程
     */
    private void generateTokens() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 等待下一个令牌生成时间
                TokenTask task = delayQueue.take();
                
                // 生成令牌,但不超过容量
                int current, next;
                do {
                    current = tokens.get();
                    if (current >= capacity) {
                        break; // 令牌桶已满
                    }
                    next = current + 1;
                } while (!tokens.compareAndSet(current, next));
                
                // 如果令牌桶未满,继续安排令牌生成
                if (next < capacity) {
                    scheduleTokenGeneration();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    /**
     * 令牌生成任务
     */
    private static class TokenTask implements Delayed {
        private final long executeTime;
        
        public TokenTask(long delayInMillis) {
            this.executeTime = System.currentTimeMillis() + delayInMillis;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed other) {
            if (other instanceof TokenTask) {
                return Long.compare(this.executeTime, ((TokenTask) other).executeTime);
            }
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

使用示例:

// 创建限流器:容量为100,每秒生成10个令牌
DelayQueueRateLimiter limiter = new DelayQueueRateLimiter(100, 10);

// 尝试获取令牌
if (limiter.tryAcquire(1)) {
    // 获取成功,执行业务逻辑
    processRequest();
} else {
    // 获取失败,触发限流
    rejectRequest();
}
Q2: 如何使用DelayQueue实现一个支持定时取消的异步任务系统?

参考答案
基于DelayQueue实现支持定时取消的异步任务系统:

public class TimedAsyncTaskSystem {
    // 任务执行线程池
    private final ExecutorService executorService;
    // 任务取消队列
    private final DelayQueue<CancellationTask> cancellationQueue = new DelayQueue<>();
    // 任务ID到Future的映射
    private final ConcurrentMap<String, Future<?>> taskFutures = new ConcurrentHashMap<>();
    
    public TimedAsyncTaskSystem(int threadPoolSize) {
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
        // 启动任务取消线程
        new Thread(this::processCancellations).start();
    }
    
    /**
     * 提交异步任务,并设置超时时间
     * @param taskId 任务ID
     * @param task 任务
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return 任务ID
     */
    public String submitTask(String taskId, Runnable task, long timeout, TimeUnit unit) {
        // 包装任务,记录任务ID
        Runnable wrappedTask = () -> {
            try {
                task.run();
            } finally {
                // 任务完成后,从映射中移除
                taskFutures.remove(taskId);
            }
        };
        
        // 提交任务到线程池
        Future<?> future = executorService.submit(wrappedTask);
        // 保存Future
        taskFutures.put(taskId, future);
        
        // 如果设置了超时,添加取消任务
        if (timeout > 0) {
            long timeoutMillis = unit.toMillis(timeout);
            cancellationQueue.offer(new CancellationTask(taskId, timeoutMillis));
        }
        
        return taskId;
    }
    
    /**
     * 手动取消任务
     * @param taskId 任务ID
     * @return 是否成功取消
     */
    public boolean cancelTask(String taskId) {
        Future<?> future = taskFutures.remove(taskId);
        if (future != null) {
            return future.cancel(true);
        }
        return false;
    }
    
    /**
     * 处理超时取消
     */
    private void processCancellations() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 获取到期的取消任务
                CancellationTask task = cancellationQueue.take();
                String taskId = task.getTaskId();
                
                // 尝试取消任务
                Future<?> future = taskFutures.remove(taskId);
                if (future != null) {
                    boolean cancelled = future.cancel(true);
                    System.out.println("任务[" + taskId + "]超时取消: " + (cancelled ? "成功" : "失败"));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    /**
     * 关闭任务系统
     */
    public void shutdown() {
        executorService.shutdownNow();
        Thread.currentThread().interrupt();
    }
    
    /**
     * 取消任务
     */
    private static class CancellationTask implements Delayed {
        private final String taskId;
        private final long executeTime;
        
        public CancellationTask(String taskId, long delayInMillis) {
            this.taskId = taskId;
            this.executeTime = System.currentTimeMillis() + delayInMillis;
        }
        
        public String getTaskId() {
            return taskId;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed other) {
            if (other instanceof CancellationTask) {
                return Long.compare(this.executeTime, ((CancellationTask) other).executeTime);
            }
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

使用示例:

TimedAsyncTaskSystem taskSystem = new TimedAsyncTaskSystem(10);

// 提交一个5秒后超时的任务
String taskId = taskSystem.submitTask("task-1", () -> {
    try {
        System.out.println("任务开始执行");
        Thread.sleep(10000); // 模拟长时间运行
        System.out.println("任务执行完成");
    } catch (InterruptedException e) {
        System.out.println("任务被中断");
    }
}, 5, TimeUnit.SECONDS);

// 也可以手动取消任务
// taskSystem.cancelTask(taskId);
Q3: 在分布式系统中,如何结合Redis实现类似DelayQueue的功能?

参考答案
在分布式系统中,可以使用Redis的有序集合(Sorted Set)实现类似DelayQueue的功能:

public class RedisDelayQueue {
    private final JedisPool jedisPool;
    private final String queueKey;
    private volatile boolean running = true;
    
    public RedisDelayQueue(JedisPool jedisPool, String queueKey) {
        this.jedisPool = jedisPool;
        this.queueKey = queueKey;
    }
    
    /**
     * 添加延时任务
     * @param taskId 任务ID
     * @param delayInMillis 延迟时间(毫秒)
     */
    public void addTask(String taskId, long delayInMillis) {
        try (Jedis jedis = jedisPool.getResource()) {
            // 计算任务执行时间
            double score = System.currentTimeMillis() + delayInMillis;
            // 添加到有序集合,分数为执行时间
            jedis.zadd(queueKey, score, taskId);
        }
    }
    
    /**
     * 移除任务
     * @param taskId 任务ID
     * @return 是否成功移除
     */
    public boolean removeTask(String taskId) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.zrem(queueKey, taskId) > 0;
        }
    }
    
    /**
     * 启动任务处理
     * @param processor 任务处理器
     */
    public void start(Consumer<String> processor) {
        new Thread(() -> {
            while (running) {
                try {
                    // 获取当前时间
                    long now = System.currentTimeMillis();
                    
                    try (Jedis jedis = jedisPool.getResource()) {
                        // 获取所有到期的任务
                        Set<String> tasks = jedis.zrangeByScore(queueKey, 0, now);
                        
                        if (!tasks.isEmpty()) {
                            // 移除这些任务
                            jedis.zremrangeByScore(queueKey, 0, now);
                            
                            // 处理任务
                            for (String taskId : tasks) {
                                try {
                                    processor.accept(taskId);
                                } catch (Exception e) {
                                    System.err.println("处理任务[" + taskId + "]异常: " + e.getMessage());
                                }
                            }
                        }
                    }
                    
                    // 休眠一段时间,避免频繁查询Redis
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    System.err.println("处理延时任务异常: " + e.getMessage());
                }
            }
        }).start();
    }
    
    /**
     * 停止任务处理
     */
    public void stop() {
        running = false;
    }
}

使用示例:

// 创建Redis连接池
JedisPool jedisPool = new JedisPool("localhost", 6379);

// 创建分布式延时队列
RedisDelayQueue delayQueue = new RedisDelayQueue(jedisPool, "delayed_tasks");

// 添加任务
delayQueue.addTask("order-timeout-123", 30000); // 30秒后执行

// 启动任务处理
delayQueue.start(taskId -> {
    System.out.println("处理任务: " + taskId);
    // 执行实际的任务处理逻辑...
});

// 应用关闭时停止处理
// delayQueue.stop();

这种实现方式的优势在于:

  1. 支持分布式环境,多个节点可以共享同一个延时队列
  2. 任务持久化,即使系统重启任务也不会丢失
  3. 可以方便地集成到现有的Redis架构中

缺点是需要定期轮询Redis,无法做到像DelayQueue那样精确的延时触发。

2.4 性能调优问题

Q1: 在高并发场景下,DelayQueue可能面临哪些性能问题?如何优化?

参考答案
在高并发场景下,DelayQueue可能面临以下性能问题及优化方案:

  1. 锁竞争问题
    • 问题:DelayQueue使用单一的ReentrantLock保护所有操作,在高并发下会导致严重的锁竞争。
    • 优化:使用分片技术,将一个大的DelayQueue拆分为多个小队列,减少锁竞争。
public class ShardedDelayQueue<E extends Delayed> {
    private final int shardCount;
    private final DelayQueue<E>[] queues;
    private final ExecutorService[] executors;
    
    @SuppressWarnings("unchecked")
    public ShardedDelayQueue(int shardCount) {
        this.shardCount = shardCount;
        this.queues = new DelayQueue[shardCount];
        this.executors = new ExecutorService[shardCount];
        
        for (int i = 0; i < shardCount; i++) {
            queues[i] = new DelayQueue<>();
            executors[i] = Executors.newSingleThreadExecutor();
        }
    }
    
    public boolean offer(E element, Object shardKey) {
        int shard = selectShard(shardKey);
        return queues[shard].offer(element);
    }
    
    public void start(Consumer<E> processor) {
        for (int i = 0; i < shardCount; i++) {
            final int shard = i;
            executors[i].submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        E element = queues[shard].take();
                        processor.accept(element);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
    }
    
    private int selectShard(Object shardKey) {
        return Math.abs(shardKey.hashCode() % shardCount);
    }
    
    public void shutdown() {
        for (ExecutorService executor : executors) {
            executor.shutdownNow();
        }
    }
}
  1. 内存占用问题
    • 问题:大量延时任务会占用大量内存,可能导致GC压力增大。
    • 优化:使用对象池复用任务对象,减少对象创建和GC压力。
public class DelayedTaskPool<T> {
    private final Queue<DelayedTask<T>> pool = new ConcurrentLinkedQueue<>();
    
    public DelayedTask<T> obtain(T data, long delayInMillis) {
        DelayedTask<T> task = pool.poll();
        if (task == null) {
            task = new DelayedTask<>();
        }
        task.setData(data);
        task.setDelay(delayInMillis);
        return task;
    }
    
    public void recycle(DelayedTask<T> task) {
        task.reset();
        pool.offer(task);
    }
    
    public static class DelayedTask<T> implements Delayed {
        private T data;
        private long executeTime;
        
        public void setData(T data) {
            this.data = data;
        }
        
        public void setDelay(long delayInMillis) {
            this.executeTime = System.currentTimeMillis() + delayInMillis;
        }
        
        public void reset() {
            this.data = null;
            this.executeTime = 0;
        }
        
        public T getData() {
            return data;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.executeTime, ((DelayedTask<?>) o).executeTime);
        }
    }
}
  1. 任务堆积问题
    • 问题:如果消费速度跟不上生产速度,会导致任务堆积。
    • 优化:使用动态线程池,根据队列大小动态调整消费线程数量。
public class AdaptiveDelayQueueConsumer<E extends Delayed> {
    private final DelayQueue<E> delayQueue;
    private final Consumer<E> processor;
    private final ThreadPoolExecutor executor;
    
    private final int minThreads;
    private final int maxThreads;
    private final int queueSizePerThread;
    
    public AdaptiveDelayQueueConsumer(
            DelayQueue<E> delayQueue, 
            Consumer<E> processor,
            int minThreads,
            int maxThreads,
            int queueSizePerThread) {
        this.delayQueue = delayQueue;
        this.processor = processor;
        this.minThreads = minThreads;
        this.maxThreads = maxThreads;
        this.queueSizePerThread = queueSizePerThread;
        
        this.executor = new ThreadPoolExecutor(
            minThreads, maxThreads,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            new ThreadPoolExecutor.CallerRunsPolicy());
        
        // 启动监控线程
        startMonitoring();
    }
    
    private void startMonitoring() {
        new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(1000); // 每秒检查一次
                    
                    int queueSize = delayQueue.size();
                    int targetThreads = Math.min(
                        maxThreads,
                        Math.max(minThreads, queueSize / queueSizePerThread + 1)
                    );
                    
                    // 调整线程池大小
                    if (executor.getCorePoolSize() != targetThreads) {
                        executor.setCorePoolSize(targetThreads);
                        executor.setMaximumPoolSize(targetThreads);
                        System.out.println("调整线程池大小为: " + targetThreads);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }).start();
        
        // 启动消费线程
        for (int i = 0; i < minThreads; i++) {
            startConsumerThread();
        }
    }
    
    private void startConsumerThread() {
        executor.execute(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    E element = delayQueue.take();
                    processor.accept(element);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    System.err.println("处理任务异常: " + e.getMessage());
                }
            }
        });
    }
    
    public void shutdown() {
        executor.shutdownNow();
    }
}
  1. GC暂停影响
    • 问题:GC暂停会导致延时任务执行不准确。
    • 优化:使用G1或ZGC等低延迟垃圾收集器,减少GC暂停时间。
// JVM参数示例
// -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:InitiatingHeapOccupancyPercent=70
Q2: 如何设计一个支持持久化的DelayQueue,确保系统重启后任务不丢失?

参考答案
设计支持持久化的DelayQueue需要结合外部存储,主要有以下几种方案:

  1. 数据库持久化方案
public class PersistentDelayQueue<T> {
    private final DelayQueue<PersistentTask<T>> memoryQueue = new DelayQueue<>();
    private final TaskRepository taskRepository;
    private final TaskProcessor<T> processor;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    
    public PersistentDelayQueue(TaskRepository taskRepository, TaskProcessor<T> processor) {
        this.taskRepository = taskRepository;
        this.processor = processor;
        
        // 启动消费线程
        new Thread(this::processTask).start();
        
        // 定期从数据库加载任务
        scheduler.scheduleAtFixedRate(this::loadTasksFromDB, 0, 1, TimeUnit.MINUTES);
    }
    
    public void addTask(T data, long delayInMillis) {
        // 创建任务
        String taskId = UUID.randomUUID().toString();
        long executeTime = System.currentTimeMillis() + delayInMillis;
        
        // 持久化到数据库
        TaskEntity entity = new TaskEntity(taskId, serialize(data), executeTime, TaskStatus.PENDING);
        taskRepository.save(entity);
        
        // 添加到内存队列
        PersistentTask<T> task = new PersistentTask<>(taskId, data, executeTime);
        memoryQueue.offer(task);
    }
    
    private void processTask() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 获取到期任务
                PersistentTask<T> task = memoryQueue.take();
                
                // 更新任务状态为处理中
                taskRepository.updateStatus(task.getTaskId(), TaskStatus.PROCESSING);
                
                try {
                    // 处理任务
                    processor.process(task.getData());
                    
                    // 更新任务状态为已完成
                    taskRepository.updateStatus(task.getTaskId(), TaskStatus.COMPLETED);
                } catch (Exception e) {
                    // 处理失败,记录异常信息
                    taskRepository.updateStatus(task.getTaskId(), TaskStatus.FAILED, e.getMessage());
                    
                    // 可以实现重试逻辑
                    if (shouldRetry(task)) {
                        rescheduleTask(task);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    private void loadTasksFromDB() {
        try {
            // 获取所有未处理的任务
            List<TaskEntity> pendingTasks = taskRepository.findByStatus(TaskStatus.PENDING);
            
            for (TaskEntity entity : pendingTasks) {
                // 检查是否已在内存队列中
                if (!isInMemoryQueue(entity.getTaskId())) {
                    // 反序列化数据
                    T data = deserialize(entity.getData());
                    
                    // 添加到内存队列
                    PersistentTask<T> task = new PersistentTask<>(
                        entity.getTaskId(), data, entity.getExecuteTime());
                    memoryQueue.offer(task);
                }
            }
        } catch (Exception e) {
            System.err.println("从数据库加载任务异常: " + e.getMessage());
        }
    }
    
    private boolean isInMemoryQueue(String taskId) {
        // 实际实现可能需要维护一个任务ID集合
        return false;
    }
    
    private boolean shouldRetry(PersistentTask<T> task) {
        // 实现重试策略
        return false;
    }
    
    private void rescheduleTask(PersistentTask<T> task) {
        // 实现重试逻辑
    }
    
    private byte[] serialize(T data) {
        // 实现序列化逻辑
        return null;
    }
    
    private T deserialize(byte[] bytes) {
        // 实现反序列化逻辑
        return null;
    }
    
    public void shutdown() {
        scheduler.shutdownNow();
    }
    
    static class PersistentTask<T> implements Delayed {
        private final String taskId;
        private final T data;
        private final long executeTime;
        
        public PersistentTask(String taskId, T data, long executeTime) {
            this.taskId = taskId;
            this.data = data;
            this.executeTime = executeTime;
        }
        
        public String getTaskId() {
            return taskId;
        }
        
        public T getData() {
            return data;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.executeTime, ((PersistentTask<?>) o).executeTime);
        }
    }
    
    enum TaskStatus {
        PENDING, PROCESSING, COMPLETED, FAILED
    }
    
    interface TaskRepository {
        void save(TaskEntity entity);
        void updateStatus(String taskId, TaskStatus status);
        void updateStatus(String taskId, TaskStatus status, String errorMessage);
        List<TaskEntity> findByStatus(TaskStatus status);
    }
    
    static class TaskEntity {
        private final String taskId;
        private final byte[] data;
        private final long executeTime;
        private final TaskStatus status;
        
        public TaskEntity(String taskId, byte[] data, long executeTime, TaskStatus status) {
            this.taskId = taskId;
            this.data = data;
            this.executeTime = executeTime;
            this.status = status;
        }
        
        public String getTaskId() {
            return taskId;
        }
        
        public byte[] getData() {
            return data;
        }
        
        public long getExecuteTime() {
            return executeTime;
        }
        
        public TaskStatus getStatus() {
            return status;
        }
    }
    
    interface TaskProcessor<T> {
        void process(T data) throws Exception;
    }
}
  1. 文件系统持久化方案
public class FileBackedDelayQueue<T extends Serializable> {
    private final DelayQueue<SerializableTask<T>> memoryQueue = new DelayQueue<>();
    private final String storageDir;
    private final Consumer<T> processor;
    
    public FileBackedDelayQueue(String storageDir, Consumer<T> processor) {
        this.storageDir = storageDir;
        this.processor = processor;
        
        // 创建存储目录
        File dir = new File(storageDir);
        if (!dir.exists()) {
            dir.mkdirs();
        }
        
        // 从文件系统恢复任务
        recoverTasksFromFiles();
        
        // 启动消费线程
        new Thread(this::processTask).start();
    }
    
    public void addTask(T data, long delayInMillis) {
        String taskId = UUID.randomUUID().toString();
        long executeTime = System.currentTimeMillis() + delayInMillis;
        
        // 创建任务
        SerializableTask<T> task = new SerializableTask<>(taskId, data, executeTime);
        
        // 持久化到文件
        saveTaskToFile(task);
        
        // 添加到内存队列
        memoryQueue.offer(task);
    }
    
    private void processTask() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 获取到期任务
                SerializableTask<T> task = memoryQueue.take();
                
                try {
                    // 处理任务
                    processor.accept(task.getData());
                    
                    // 删除任务文件
                    deleteTaskFile(task.getTaskId());
                } catch (Exception e) {
                    System.err.println("处理任务异常: " + e.getMessage());
                    // 可以实现重试逻辑
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    private void saveTaskToFile(SerializableTask<T> task) {
        File file = new File(storageDir, task.getTaskId() + ".task");
        try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file))) {
            oos.writeObject(task);
        } catch (IOException e) {
            System.err.println("保存任务到文件异常: " + e.getMessage());
        }
    }
    
    private void deleteTaskFile(String taskId) {
        File file = new File(storageDir, taskId + ".task");
        if (file.exists()) {
            file.delete();
        }
    }
    
    @SuppressWarnings("unchecked")
    private void recoverTasksFromFiles() {
        File dir = new File(storageDir);
        File[] files = dir.listFiles((d, name) -> name.endsWith(".task"));
        
        if (files != null) {
            for (File file : files) {
                try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(file))) {
                    SerializableTask<T> task = (SerializableTask<T>) ois.readObject();
                    
                    // 检查任务是否已过期
                    if (task.getDelay(TimeUnit.MILLISECONDS) > 0) {
                        // 添加到内存队列
                        memoryQueue.offer(task);
                    } else {
                        // 已过期的任务直接处理
                        try {
                            processor.accept(task.getData());
                        } catch (Exception e) {
                            System.err.println("处理恢复的过期任务异常: " + e.getMessage());
                        }
                        // 删除任务文件
                        deleteTaskFile(task.getTaskId());
                    }
                } catch (Exception e) {
                    System.err.println("恢复任务异常: " + e.getMessage());
                    // 删除损坏的任务文件
                    file.delete();
                }
            }
        }
    }
    
    static class SerializableTask<T extends Serializable> implements Delayed, Serializable {
        private static final long serialVersionUID = 1L;
        
        private final String taskId;
        private final T data;
        private final long executeTime;
        
        public SerializableTask(String taskId, T data, long executeTime) {
            this.taskId = taskId;
            this.data = data;
            this.executeTime = executeTime;
        }
        
        public String getTaskId() {
            return taskId;
        }
        
        public T getData() {
            return data;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.executeTime, ((SerializableTask<?>) o).executeTime);
        }
    }
}

这两种方案各有优缺点:

  • 数据库方案:适合分布式环境,支持事务,但性能较低
  • 文件系统方案:性能较高,但不适合分布式环境,且需要处理文件锁等问题

在实际应用中,可以根据具体需求选择合适的持久化方案。

2.5 面试真题解析

以下是一些来自真实面试的DelayQueue相关问题及其解析:

真题1: 如何使用DelayQueue实现一个限流器,要求每个接口每秒最多处理N个请求?

解析
这是一个典型的令牌桶限流算法应用场景,可以使用DelayQueue实现:

public class DelayQueueRateLimiter {
    private final int permitsPerSecond;
    private final DelayQueue<DelayedPermit> queue = new DelayQueue<>();
    
    public DelayQueueRateLimiter(int permitsPerSecond) {
        this.permitsPerSecond = permitsPerSecond;
        // 初始化令牌
        refillTokens();
    }
    
    private void refillTokens() {
        // 添加N个令牌,每个令牌延迟1秒
        for (int i = 0; i < permitsPerSecond; i++) {
            queue.offer(new DelayedPermit(1000)); // 1秒后可用
        }
    }
    
    public boolean tryAcquire() {
        // 尝试获取一个可用的令牌
        DelayedPermit permit = queue.poll();
        if (permit != null) {
            // 获取成功,添加一个新的延迟令牌
            queue.offer(new DelayedPermit(1000));
            return true;
        }
        return false;
    }
    
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        // 尝试在指定时间内获取令牌
        DelayedPermit permit = queue.poll(timeout, unit);
        if (permit != null) {
            queue.offer(new DelayedPermit(1000));
            return true;
        }
        return false;
    }
    
    static class DelayedPermit implements Delayed {
        private final long availableTime;
        
        public DelayedPermit(long delayInMillis) {
            this.availableTime = System.currentTimeMillis() + delayInMillis;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(availableTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.availableTime, ((DelayedPermit) o).availableTime);
        }
    }
}

使用示例:

// 创建限流器,每秒最多处理10个请求
DelayQueueRateLimiter limiter = new DelayQueueRateLimiter(10);

// 在API接口中使用
public Response handleRequest(Request request) {
    if (limiter.tryAcquire()) {
        // 处理请求
        return processRequest(request);
    } else {
        // 限流,返回错误
        return Response.error("请求过于频繁,请稍后再试");
    }
}
真题2: 在一个电商系统中,如何使用DelayQueue实现秒杀活动的定时开始?

解析
秒杀活动定时开始是一个典型的定时任务场景,可以使用DelayQueue实现:

public class SecKillScheduler {
    private final DelayQueue<SecKillTask> delayQueue = new DelayQueue<>();
    private final SecKillService secKillService;
    
    public SecKillScheduler(SecKillService secKillService) {
        this.secKillService = secKillService;
        // 启动任务处理线程
        new Thread(this::processSecKillTasks).start();
    }
    
    public void scheduleSecKill(String activityId, Date startTime) {
        // 计算延迟时间
        long delay = startTime.getTime() - System.currentTimeMillis();
        if (delay < 0) {
            // 活动时间已过,直接开始
            secKillService.startSecKill(activityId);
            return;
        }
        
        // 创建秒杀任务
        SecKillTask task = new SecKillTask(activityId, startTime.getTime());
        delayQueue.offer(task);
        
        System.out.println("秒杀活动[" + activityId + "]已调度,将在" 
                + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(startTime) + "开始");
    }
    
    private void processSecKillTasks() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 获取到期的秒杀任务
                SecKillTask task = delayQueue.take();
                
                // 开始秒杀活动
                secKillService.startSecKill(task.getActivityId());
                
                System.out.println("秒杀活动[" + task.getActivityId() + "]已开始");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                System.err.println("处理秒杀任务异常: " + e.getMessage());
            }
        }
    }
    
    static class SecKillTask implements Delayed {
        private final String activityId;
        private final long startTime;
        
        public SecKillTask(String activityId, long startTime) {
            this.activityId = activityId;
            this.startTime = startTime;
        }
        
        public String getActivityId() {
            return activityId;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.startTime, ((SecKillTask) o).startTime);
        }
    }
    
    interface SecKillService {
        void startSecKill(String activityId);
    }
}

这种实现的优势在于:

  1. 精确控制秒杀活动的开始时间
  2. 系统资源占用低,不需要频繁检查数据库
  3. 可以动态添加和取消秒杀活动

三、思考题

🤔 设计一个商城的订单系统,要求:

  1. 订单创建15分钟内未支付自动取消
  2. 支持每天百万级订单量
  3. 取消操作要高效且准确
  4. 系统重启后订单状态不丢失

如何使用DelayQueue实现这个功能?欢迎在评论区分享你的方案!

写在最后

🎉 通过这两篇文章的学习,相信大家已经完整掌握了DelayQueue的原理和实战应用。从理论到实践,从基础到进阶,让我们能够在实际项目中熟练运用这个强大的工具。

📚 推荐几篇很有趣的文章

  • DeepSeek详解:探索下一代语言模型
  • 算法模型从入门到起飞系列——递归(探索自我重复的奇妙之旅)

📚博主匠心之作,强推专栏

  • JAVA集合专栏 【夜话集】
  • JVM知识专栏
  • 数据库sql理论与实战【博主踩坑之道】
  • 小游戏开发【博主强推 匠心之作 拿来即用无门槛】

如果觉得有帮助的话,别忘了点个赞 👍 收藏 ⭐ 关注 🔖 哦!


🎯 我是果冻~,一个热爱技术、乐于分享的开发者
📚 更多精彩内容,请关注我的博客
🌟 我们下期再见!

相关文章:

  • 51. “闲转易”交易平台小程序(基于springbootvue)
  • 两个有序序列的合并-手摇算法
  • 【深度学习新浪潮】Grok过去两周的进展一览(2025.04.01)
  • [学成在线]09-课程预览
  • 论文阅读笔记:Denoising Diffusion Implicit Models (2)
  • Spring Boot 快速入手
  • node.js、npm相关知识
  • Redis 在 Linux 系统的安装指南
  • 【Docker项目实战】使用Docker部署NoteFlow笔记工具
  • 本地RAG知识库,如何进行数据结构化和清洗?
  • 在 Vue2 项目中配置自定义属性并在组件中使用,可按以下步骤进行:
  • 探秘 LPC 接收端重建:从理论根基到 Matlab 仿真实战
  • Android 应用程序包的 adb 命令
  • android studio 安装flutter插件
  • kubectl 命令
  • LLM的Sink(水槽) Token
  • 【漫话机器学习系列】168.最大最小值缩放(Min-Max Scaling)
  • 解锁兰亭妙微桌面端 UE/UI 设计,抢占数字先机
  • 主流数据库的存储引擎/存储机制的详细对比分析,涵盖关系型数据库、NoSQL数据库和分布式数据库
  • go游戏后端开发21:处理nats消息
  • 成都公积金新政征求意见:购买保障性住房最高贷款额度上浮50%
  • 深入贯彻中央八项规定精神学习教育中央第六指导组指导督导中国工商银行见面会召开
  • 太原一高中生指出博物馆多件藏品标识不当,馆方已邀请他和专家共同探讨
  • 印媒证实:至少3架印军战机7日在印控克什米尔地区坠毁
  • 习近平致电祝贺默茨当选德国联邦总理
  • 特朗普称不会为了和中国谈判而取消对华关税,外交部回应