当前位置: 首页 > news >正文

SpringCloud项目阶段七:延迟任务技术选项对比以及接入redis实现延迟队列添加/取消/消费等任务

前言:

项目地址:https://gitee.com/whltaoin_admin/hmtt_cloud-project.git

阶段七进度版本号:14201bc24ae77d11efd2ca99923490d4399fb9fe

延迟任务概述

定时任务和延迟任务对比

  1. 定时任务:有固定周期,有明确的触发时间
    1. 例如:在每周一的凌晨3点统计某一张表。
  2. 延迟任务:没有固定周期,常常由于一个事件触发,这个事件触发后的一段时间内会触发另一个时间,任务可以立即执行或者延迟。
    1. 例如:在淘宝下订单时,没有支付,在30分钟内订单就取消了。

技术对比

  1. delayQueue

是一个阻塞队列,生产者向delayQueue添加任务,可以给每个任务设置过期时间,消费者从队列中获取任务就可以了。

缺陷:存在于内存中,重启服务或者宕机会产生数据丢失。

  1. RabbitMQ

  1. redis

因为reids中的zset可以通过分值进行排序,所以可以通过排序进行延迟。

总结

redis实现延迟任务流程

  1. 首先将任务进行一个持久化的存储。
  2. 通过判断是立即执行还是需要延迟分别存储到redis不同的数据类型中
  3. 执行时间小于当前时间村list
  4. 执行时间小于预设时间(多5分钟的)存zset(只存5分钟的任务)
  5. 其余多的延迟任务还是存在mysql中,在通过mysql定时同步到zset中

  1. 解释1

  1. 解释2

搭建schedule服务

  1. 模块名:
heima-leadnews-schedule
  1. bootstrap.yml
server:port: 51701
spring:application:name: leadnews-schedulecloud:nacos:discovery:server-addr: varin.cn:8848config:server-addr: varin.cn:8848file-extension: yml
  1. nacos

  1. 数据名称:leadnews_schedule
  2. 启动:

项目集成Redis

  1. 倒入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>
  1. 在common模块中建立redis包名

  1. 封装redis常用的方法到CacheService.java中
package com.heima.common.redis;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;@Component
public class CacheService extends CachingConfigurerSupport {@Autowiredprivate StringRedisTemplate stringRedisTemplate;public StringRedisTemplate getstringRedisTemplate() {return this.stringRedisTemplate;}/** -------------------key相关操作--------------------- *//*** 删除key** @param key*/public void delete(String key) {stringRedisTemplate.delete(key);}/*** 批量删除key** @param keys*/public void delete(Collection<String> keys) {stringRedisTemplate.delete(keys);}/*** 序列化key** @param key* @return*/public byte[] dump(String key) {return stringRedisTemplate.dump(key);}/*** 是否存在key** @param key* @return*/public Boolean exists(String key) {return stringRedisTemplate.hasKey(key);}/*** 设置过期时间** @param key* @param timeout* @param unit* @return*/public Boolean expire(String key, long timeout, TimeUnit unit) {return stringRedisTemplate.expire(key, timeout, unit);}/*** 设置过期时间** @param key* @param date* @return*/public Boolean expireAt(String key, Date date) {return stringRedisTemplate.expireAt(key, date);}/*** 查找匹配的key** @param pattern* @return*/public Set<String> keys(String pattern) {return stringRedisTemplate.keys(pattern);}/*** 将当前数据库的 key 移动到给定的数据库 db 当中** @param key* @param dbIndex* @return*/public Boolean move(String key, int dbIndex) {return stringRedisTemplate.move(key, dbIndex);}/*** 移除 key 的过期时间,key 将持久保持** @param key* @return*/public Boolean persist(String key) {return stringRedisTemplate.persist(key);}/*** 返回 key 的剩余的过期时间** @param key* @param unit* @return*/public Long getExpire(String key, TimeUnit unit) {return stringRedisTemplate.getExpire(key, unit);}/*** 返回 key 的剩余的过期时间** @param key* @return*/public Long getExpire(String key) {return stringRedisTemplate.getExpire(key);}/*** 从当前数据库中随机返回一个 key** @return*/public String randomKey() {return stringRedisTemplate.randomKey();}/*** 修改 key 的名称** @param oldKey* @param newKey*/public void rename(String oldKey, String newKey) {stringRedisTemplate.rename(oldKey, newKey);}/*** 仅当 newkey 不存在时,将 oldKey 改名为 newkey** @param oldKey* @param newKey* @return*/public Boolean renameIfAbsent(String oldKey, String newKey) {return stringRedisTemplate.renameIfAbsent(oldKey, newKey);}/*** 返回 key 所储存的值的类型** @param key* @return*/public DataType type(String key) {return stringRedisTemplate.type(key);}/** -------------------string相关操作--------------------- *//*** 设置指定 key 的值* @param key* @param value*/public void set(String key, String value) {stringRedisTemplate.opsForValue().set(key, value);}/*** 获取指定 key 的值* @param key* @return*/public String get(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** 返回 key 中字符串值的子字符* @param key* @param start* @param end* @return*/public String getRange(String key, long start, long end) {return stringRedisTemplate.opsForValue().get(key, start, end);}/*** 将给定 key 的值设为 value ,并返回 key 的旧值(old value)** @param key* @param value* @return*/public String getAndSet(String key, String value) {return stringRedisTemplate.opsForValue().getAndSet(key, value);}/*** 对 key 所储存的字符串值,获取指定偏移量上的位(bit)** @param key* @param offset* @return*/public Boolean getBit(String key, long offset) {return stringRedisTemplate.opsForValue().getBit(key, offset);}/*** 批量获取** @param keys* @return*/public List<String> multiGet(Collection<String> keys) {return stringRedisTemplate.opsForValue().multiGet(keys);}/*** 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value** @param key* @param* @param value*            值,true为1, false为0* @return*/public boolean setBit(String key, long offset, boolean value) {return stringRedisTemplate.opsForValue().setBit(key, offset, value);}/*** 将值 value 关联到 key ,并将 key 的过期时间设为 timeout** @param key* @param value* @param timeout*            过期时间* @param unit*            时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES*            秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS*/public void setEx(String key, String value, long timeout, TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, value, timeout, unit);}/*** 只有在 key 不存在时设置 key 的值** @param key* @param value* @return 之前已经存在返回false,不存在返回true*/public boolean setIfAbsent(String key, String value) {return stringRedisTemplate.opsForValue().setIfAbsent(key, value);}/*** 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始** @param key* @param value* @param offset*            从指定位置开始覆写*/public void setRange(String key, String value, long offset) {stringRedisTemplate.opsForValue().set(key, value, offset);}/*** 获取字符串的长度** @param key* @return*/public Long size(String key) {return stringRedisTemplate.opsForValue().size(key);}/*** 批量添加** @param maps*/public void multiSet(Map<String, String> maps) {stringRedisTemplate.opsForValue().multiSet(maps);}/*** 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在** @param maps* @return 之前已经存在返回false,不存在返回true*/public boolean multiSetIfAbsent(Map<String, String> maps) {return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps);}/*** 增加(自增长), 负数则为自减** @param key* @param* @return*/public Long incrBy(String key, long increment) {return stringRedisTemplate.opsForValue().increment(key, increment);}/**** @param key* @param* @return*/public Double incrByFloat(String key, double increment) {return stringRedisTemplate.opsForValue().increment(key, increment);}/*** 追加到末尾** @param key* @param value* @return*/public Integer append(String key, String value) {return stringRedisTemplate.opsForValue().append(key, value);}/** -------------------hash相关操作------------------------- *//*** 获取存储在哈希表中指定字段的值** @param key* @param field* @return*/public Object hGet(String key, String field) {return stringRedisTemplate.opsForHash().get(key, field);}/*** 获取所有给定字段的值** @param key* @return*/public Map<Object, Object> hGetAll(String key) {return stringRedisTemplate.opsForHash().entries(key);}/*** 获取所有给定字段的值** @param key* @param fields* @return*/public List<Object> hMultiGet(String key, Collection<Object> fields) {return stringRedisTemplate.opsForHash().multiGet(key, fields);}public void hPut(String key, String hashKey, String value) {stringRedisTemplate.opsForHash().put(key, hashKey, value);}public void hPutAll(String key, Map<String, String> maps) {stringRedisTemplate.opsForHash().putAll(key, maps);}/*** 仅当hashKey不存在时才设置** @param key* @param hashKey* @param value* @return*/public Boolean hPutIfAbsent(String key, String hashKey, String value) {return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value);}/*** 删除一个或多个哈希表字段** @param key* @param fields* @return*/public Long hDelete(String key, Object... fields) {return stringRedisTemplate.opsForHash().delete(key, fields);}/*** 查看哈希表 key 中,指定的字段是否存在** @param key* @param field* @return*/public boolean hExists(String key, String field) {return stringRedisTemplate.opsForHash().hasKey(key, field);}/*** 为哈希表 key 中的指定字段的整数值加上增量 increment** @param key* @param field* @param increment* @return*/public Long hIncrBy(String key, Object field, long increment) {return stringRedisTemplate.opsForHash().increment(key, field, increment);}/*** 为哈希表 key 中的指定字段的整数值加上增量 increment** @param key* @param field* @param delta* @return*/public Double hIncrByFloat(String key, Object field, double delta) {return stringRedisTemplate.opsForHash().increment(key, field, delta);}/*** 获取所有哈希表中的字段** @param key* @return*/public Set<Object> hKeys(String key) {return stringRedisTemplate.opsForHash().keys(key);}/*** 获取哈希表中字段的数量** @param key* @return*/public Long hSize(String key) {return stringRedisTemplate.opsForHash().size(key);}/*** 获取哈希表中所有值** @param key* @return*/public List<Object> hValues(String key) {return stringRedisTemplate.opsForHash().values(key);}/*** 迭代哈希表中的键值对** @param key* @param options* @return*/public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {return stringRedisTemplate.opsForHash().scan(key, options);}/** ------------------------list相关操作---------------------------- *//*** 通过索引获取列表中的元素** @param key* @param index* @return*/public String lIndex(String key, long index) {return stringRedisTemplate.opsForList().index(key, index);}/*** 获取列表指定范围内的元素** @param key* @param start*            开始位置, 0是开始位置* @param end*            结束位置, -1返回所有* @return*/public List<String> lRange(String key, long start, long end) {return stringRedisTemplate.opsForList().range(key, start, end);}/*** 存储在list头部** @param key* @param value* @return*/public Long lLeftPush(String key, String value) {return stringRedisTemplate.opsForList().leftPush(key, value);}/**** @param key* @param value* @return*/public Long lLeftPushAll(String key, String... value) {return stringRedisTemplate.opsForList().leftPushAll(key, value);}/**** @param key* @param value* @return*/public Long lLeftPushAll(String key, Collection<String> value) {return stringRedisTemplate.opsForList().leftPushAll(key, value);}/*** 当list存在的时候才加入** @param key* @param value* @return*/public Long lLeftPushIfPresent(String key, String value) {return stringRedisTemplate.opsForList().leftPushIfPresent(key, value);}/*** 如果pivot存在,再pivot前面添加** @param key* @param pivot* @param value* @return*/public Long lLeftPush(String key, String pivot, String value) {return stringRedisTemplate.opsForList().leftPush(key, pivot, value);}/**** @param key* @param value* @return*/public Long lRightPush(String key, String value) {return stringRedisTemplate.opsForList().rightPush(key, value);}/**** @param key* @param value* @return*/public Long lRightPushAll(String key, String... value) {return stringRedisTemplate.opsForList().rightPushAll(key, value);}/**** @param key* @param value* @return*/public Long lRightPushAll(String key, Collection<String> value) {return stringRedisTemplate.opsForList().rightPushAll(key, value);}/*** 为已存在的列表添加值** @param key* @param value* @return*/public Long lRightPushIfPresent(String key, String value) {return stringRedisTemplate.opsForList().rightPushIfPresent(key, value);}/*** 在pivot元素的右边添加值** @param key* @param pivot* @param value* @return*/public Long lRightPush(String key, String pivot, String value) {return stringRedisTemplate.opsForList().rightPush(key, pivot, value);}/*** 通过索引设置列表元素的值** @param key* @param index*            位置* @param value*/public void lSet(String key, long index, String value) {stringRedisTemplate.opsForList().set(key, index, value);}/*** 移出并获取列表的第一个元素** @param key* @return 删除的元素*/public String lLeftPop(String key) {return stringRedisTemplate.opsForList().leftPop(key);}/*** 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止** @param key* @param timeout*            等待时间* @param unit*            时间单位* @return*/public String lBLeftPop(String key, long timeout, TimeUnit unit) {return stringRedisTemplate.opsForList().leftPop(key, timeout, unit);}/*** 移除并获取列表最后一个元素** @param key* @return 删除的元素*/public String lRightPop(String key) {return stringRedisTemplate.opsForList().rightPop(key);}/*** 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止** @param key* @param timeout*            等待时间* @param unit*            时间单位* @return*/public String lBRightPop(String key, long timeout, TimeUnit unit) {return stringRedisTemplate.opsForList().rightPop(key, timeout, unit);}/*** 移除列表的最后一个元素,并将该元素添加到另一个列表并返回** @param sourceKey* @param destinationKey* @return*/public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,destinationKey);}/*** 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止** @param sourceKey* @param destinationKey* @param timeout* @param unit* @return*/public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,long timeout, TimeUnit unit) {return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,destinationKey, timeout, unit);}/*** 删除集合中值等于value得元素** @param key* @param index*            index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;*            index<0, 从尾部开始删除第一个值等于value的元素;* @param value* @return*/public Long lRemove(String key, long index, String value) {return stringRedisTemplate.opsForList().remove(key, index, value);}/*** 裁剪list** @param key* @param start* @param end*/public void lTrim(String key, long start, long end) {stringRedisTemplate.opsForList().trim(key, start, end);}/*** 获取列表长度** @param key* @return*/public Long lLen(String key) {return stringRedisTemplate.opsForList().size(key);}/** --------------------set相关操作-------------------------- *//*** set添加元素** @param key* @param values* @return*/public Long sAdd(String key, String... values) {return stringRedisTemplate.opsForSet().add(key, values);}/*** set移除元素** @param key* @param values* @return*/public Long sRemove(String key, Object... values) {return stringRedisTemplate.opsForSet().remove(key, values);}/*** 移除并返回集合的一个随机元素** @param key* @return*/public String sPop(String key) {return stringRedisTemplate.opsForSet().pop(key);}/*** 将元素value从一个集合移到另一个集合** @param key* @param value* @param destKey* @return*/public Boolean sMove(String key, String value, String destKey) {return stringRedisTemplate.opsForSet().move(key, value, destKey);}/*** 获取集合的大小** @param key* @return*/public Long sSize(String key) {return stringRedisTemplate.opsForSet().size(key);}/*** 判断集合是否包含value** @param key* @param value* @return*/public Boolean sIsMember(String key, Object value) {return stringRedisTemplate.opsForSet().isMember(key, value);}/*** 获取两个集合的交集** @param key* @param otherKey* @return*/public Set<String> sIntersect(String key, String otherKey) {return stringRedisTemplate.opsForSet().intersect(key, otherKey);}/*** 获取key集合与多个集合的交集** @param key* @param otherKeys* @return*/public Set<String> sIntersect(String key, Collection<String> otherKeys) {return stringRedisTemplate.opsForSet().intersect(key, otherKeys);}/*** key集合与otherKey集合的交集存储到destKey集合中** @param key* @param otherKey* @param destKey* @return*/public Long sIntersectAndStore(String key, String otherKey, String destKey) {return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey,destKey);}/*** key集合与多个集合的交集存储到destKey集合中** @param key* @param otherKeys* @param destKey* @return*/public Long sIntersectAndStore(String key, Collection<String> otherKeys,String destKey) {return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys,destKey);}/*** 获取两个集合的并集** @param key* @param otherKeys* @return*/public Set<String> sUnion(String key, String otherKeys) {return stringRedisTemplate.opsForSet().union(key, otherKeys);}/*** 获取key集合与多个集合的并集** @param key* @param otherKeys* @return*/public Set<String> sUnion(String key, Collection<String> otherKeys) {return stringRedisTemplate.opsForSet().union(key, otherKeys);}/*** key集合与otherKey集合的并集存储到destKey中** @param key* @param otherKey* @param destKey* @return*/public Long sUnionAndStore(String key, String otherKey, String destKey) {return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);}/*** key集合与多个集合的并集存储到destKey中** @param key* @param otherKeys* @param destKey* @return*/public Long sUnionAndStore(String key, Collection<String> otherKeys,String destKey) {return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);}/*** 获取两个集合的差集** @param key* @param otherKey* @return*/public Set<String> sDifference(String key, String otherKey) {return stringRedisTemplate.opsForSet().difference(key, otherKey);}/*** 获取key集合与多个集合的差集** @param key* @param otherKeys* @return*/public Set<String> sDifference(String key, Collection<String> otherKeys) {return stringRedisTemplate.opsForSet().difference(key, otherKeys);}/*** key集合与otherKey集合的差集存储到destKey中** @param key* @param otherKey* @param destKey* @return*/public Long sDifference(String key, String otherKey, String destKey) {return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey,destKey);}/*** key集合与多个集合的差集存储到destKey中** @param key* @param otherKeys* @param destKey* @return*/public Long sDifference(String key, Collection<String> otherKeys,String destKey) {return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys,destKey);}/*** 获取集合所有元素** @param key* @param* @param* @return*/public Set<String> setMembers(String key) {return stringRedisTemplate.opsForSet().members(key);}/*** 随机获取集合中的一个元素** @param key* @return*/public String sRandomMember(String key) {return stringRedisTemplate.opsForSet().randomMember(key);}/*** 随机获取集合中count个元素** @param key* @param count* @return*/public List<String> sRandomMembers(String key, long count) {return stringRedisTemplate.opsForSet().randomMembers(key, count);}/*** 随机获取集合中count个元素并且去除重复的** @param key* @param count* @return*/public Set<String> sDistinctRandomMembers(String key, long count) {return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count);}/**** @param key* @param options* @return*/public Cursor<String> sScan(String key, ScanOptions options) {return stringRedisTemplate.opsForSet().scan(key, options);}/**------------------zSet相关操作--------------------------------*//*** 添加元素,有序集合是按照元素的score值由小到大排列** @param key* @param value* @param score* @return*/public Boolean zAdd(String key, String value, double score) {return stringRedisTemplate.opsForZSet().add(key, value, score);}/**** @param key* @param values* @return*/public Long zAdd(String key, Set<TypedTuple<String>> values) {return stringRedisTemplate.opsForZSet().add(key, values);}/**** @param key* @param values* @return*/public Long zRemove(String key, Object... values) {return stringRedisTemplate.opsForZSet().remove(key, values);}public Long zRemove(String key, Collection<String> values) {if(values!=null&&!values.isEmpty()){Object[] objs = values.toArray(new Object[values.size()]);return stringRedisTemplate.opsForZSet().remove(key, objs);}return 0L;}/*** 增加元素的score值,并返回增加后的值** @param key* @param value* @param delta* @return*/public Double zIncrementScore(String key, String value, double delta) {return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta);}/*** 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列** @param key* @param value* @return 0表示第一位*/public Long zRank(String key, Object value) {return stringRedisTemplate.opsForZSet().rank(key, value);}/*** 返回元素在集合的排名,按元素的score值由大到小排列** @param key* @param value* @return*/public Long zReverseRank(String key, Object value) {return stringRedisTemplate.opsForZSet().reverseRank(key, value);}/*** 获取集合的元素, 从小到大排序** @param key* @param start*            开始位置* @param end*            结束位置, -1查询所有* @return*/public Set<String> zRange(String key, long start, long end) {return stringRedisTemplate.opsForZSet().range(key, start, end);}/*** 获取zset集合的所有元素, 从小到大排序**/public Set<String> zRangeAll(String key) {return zRange(key,0,-1);}/*** 获取集合元素, 并且把score值也获取** @param key* @param start* @param end* @return*/public Set<TypedTuple<String>> zRangeWithScores(String key, long start,long end) {return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);}/*** 根据Score值查询集合元素** @param key* @param min*            最小值* @param max*            最大值* @return*/public Set<String> zRangeByScore(String key, double min, double max) {return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max);}/*** 根据Score值查询集合元素, 从小到大排序** @param key* @param min*            最小值* @param max*            最大值* @return*/public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,double min, double max) {return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);}/**** @param key* @param min* @param max* @param start* @param end* @return*/public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,double min, double max, long start, long end) {return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,start, end);}/*** 获取集合的元素, 从大到小排序** @param key* @param start* @param end* @return*/public Set<String> zReverseRange(String key, long start, long end) {return stringRedisTemplate.opsForZSet().reverseRange(key, start, end);}public Set<String> zReverseRangeByScore(String key, long min, long max) {return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);}/*** 获取集合的元素, 从大到小排序, 并返回score值** @param key* @param start* @param end* @return*/public Set<TypedTuple<String>> zReverseRangeWithScores(String key,long start, long end) {return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start,end);}/*** 根据Score值查询集合元素, 从大到小排序** @param key* @param min* @param max* @return*/public Set<String> zReverseRangeByScore(String key, double min,double max) {return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);}/*** 根据Score值查询集合元素, 从大到小排序** @param key* @param min* @param max* @return*/public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(String key, double min, double max) {return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,min, max);}/**** @param key* @param min* @param max* @param start* @param end* @return*/public Set<String> zReverseRangeByScore(String key, double min,double max, long start, long end) {return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max,start, end);}/*** 根据score值获取集合元素数量** @param key* @param min* @param max* @return*/public Long zCount(String key, double min, double max) {return stringRedisTemplate.opsForZSet().count(key, min, max);}/*** 获取集合大小** @param key* @return*/public Long zSize(String key) {return stringRedisTemplate.opsForZSet().size(key);}/*** 获取集合大小** @param key* @return*/public Long zZCard(String key) {return stringRedisTemplate.opsForZSet().zCard(key);}/*** 获取集合中value元素的score值** @param key* @param value* @return*/public Double zScore(String key, Object value) {return stringRedisTemplate.opsForZSet().score(key, value);}/*** 移除指定索引位置的成员** @param key* @param start* @param end* @return*/public Long zRemoveRange(String key, long start, long end) {return stringRedisTemplate.opsForZSet().removeRange(key, start, end);}/*** 根据指定的score值的范围来移除成员** @param key* @param min* @param max* @return*/public Long zRemoveRangeByScore(String key, double min, double max) {return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max);}/*** 获取key和otherKey的并集并存储在destKey中** @param key* @param otherKey* @param destKey* @return*/public Long zUnionAndStore(String key, String otherKey, String destKey) {return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);}/**** @param key* @param otherKeys* @param destKey* @return*/public Long zUnionAndStore(String key, Collection<String> otherKeys,String destKey) {return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKeys, destKey);}/*** 交集** @param key* @param otherKey* @param destKey* @return*/public Long zIntersectAndStore(String key, String otherKey,String destKey) {return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey,destKey);}/*** 交集** @param key* @param otherKeys* @param destKey* @return*/public Long zIntersectAndStore(String key, Collection<String> otherKeys,String destKey) {return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys,destKey);}/**** @param key* @param options* @return*/public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {return stringRedisTemplate.opsForZSet().scan(key, options);}/*** 扫描主键,建议使用* @param patten* @return*/public Set<String> scan(String patten){Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {Set<String> result = new HashSet<>();try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder().match(patten).count(10000).build())) {while (cursor.hasNext()) {result.add(new String(cursor.next()));}} catch (IOException e) {e.printStackTrace();}return result;});return  keys;}/*** 管道技术,提高性能* @param type* @param values* @return*/public List<Object> lRightPushPipeline(String type,Collection<String> values){List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {public Object doInRedis(RedisConnection connection) throws DataAccessException {StringRedisConnection stringRedisConn = (StringRedisConnection)connection;//集合转换数组String[] strings = values.toArray(new String[values.size()]);//直接批量发送stringRedisConn.rPush(type, strings);return null;}});return results;}public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;String[] strings = values.toArray(new String[values.size()]);stringRedisConnection.rPush(topic_key,strings);stringRedisConnection.zRem(future_key,strings);return null;}});return objects;}}
  1. 添加注入扫描路径:src/main/resources/META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.heima.common.exception.ExceptionCatch,\com.heima.common.swagger.SwaggerConfig,\com.hei.common.knife4j.Swagger2Configuration,\cn.varin.tencent.GreenTextScan,\cn.varin.tencent.GreenImageScan,\com.heima.common.redis.CacheService
  1. 测试类测试
package com.heima.schedule.test;import com.heima.common.redis.CacheService;
import com.heima.schedule.ScheduleApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void test(){cacheService.lLeftPush("1", "1");cacheService.zAdd("name","varin",99);}
}
  1. 测试结果

添加任务

添加任务到数据库中

mapper接口
  1. TaskinfoMapper
package com.heima.schedule.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.schedule.pojos.Taskinfo;
import org.apache.ibatis.annotations.Mapper;@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
}
  1. TaskInfoLogsMapper
package com.heima.schedule.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import org.apache.ibatis.annotations.Mapper;@Mapper
public interface TaskInfoLogsMapper  extends BaseMapper<TaskinfoLogs> {
}
mapper文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper"><select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">select *from taskinfowhere task_type = #{taskType}and priority = #{priority}and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}</select></mapper>
TaskService
package com.heima.schedule.service;import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;public interface TaskService {// 添加任务Long addTask(Task task);
}
TaskServiceImpl

通过addDbStatus方法将任务存储到数据库中,当返回true时,保存成功

返回false时,保存失败。

package com.heima.schedule.service.impl;import com.hei.common.constance.ScheduleConstants;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskInfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;import java.util.Date;@Service
@Transactional
@Slf4j
public class TaskServiceImpl  implements TaskService {@Overridepublic Long addTask(Task task) {// 添加任务到数据库中Boolean addDbStatus =  addTaskToDb(task);System.out.println(addDbStatus);return 0L;}/*** 添加任务到数据库中* @return*/@Autowiredprivate TaskInfoLogsMapper taskInfoLogsMapper;@Autowiredprivate TaskinfoMapper taskinfoMapper;private Boolean addTaskToDb(Task task) {Boolean flag = false;// infoSystem.out.println(task);try {Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task,taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);System.out.println(taskinfo);// logTaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo,taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskInfoLogsMapper.insert(taskinfoLogs);System.out.println(taskinfoLogs);flag = true;}catch (Exception e){e.printStackTrace();}return  flag;}
}
测试类
package com.heima.schedule.test;import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.ScheduleApplication;
import com.heima.schedule.service.TaskService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Date;@SpringBootTest(classes = {ScheduleApplication.class})
@RunWith(SpringRunner.class)
public class TaskServiceTest {@Autowiredprivate TaskService taskService;@Testpublic void test(){Task task = new Task();task.setTaskId(3L);task.setParameters(null);task.setTaskType(0);task.setExecuteTime(new Date().getTime());task.setPriority(1);taskService.addTask(task);}
}

结果:

添加到Redis中

实现方法
// 在redis中添加数据
@Autowired
private CacheService cacheService;
private void addTasktoCache(Task task) {
String key =task.getTaskType()+""+task.getPriority()+"";// 获取当前时间多5分钟的时间毫秒
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE,5);
long timeInMillis = calendar.getTimeInMillis();
// 时间小于=当前时间
if (task.getExecuteTime()<=System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));// 时间大于当前时间并且只是多5分钟内
}else if(task.getExecuteTime()<=timeInMillis){cacheService.zAdd(ScheduleConstants.FUTURE+key, JSON.toJSONString(task),task.getExecuteTime());}}
调用方法
public Long addTask(Task task) {
// 添加任务到数据库中
Boolean addDbStatus =  addTaskToDb(task);
if (!addDbStatus) {throw new RuntimeException("TaskServiceImpl=--------》存储任务数据失败。");
}
addTasktoCache(task);return task.getTaskId();
}
测试:立即任务
package com.heima.schedule.test;import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.ScheduleApplication;
import com.heima.schedule.service.TaskService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Date;@SpringBootTest(classes = {ScheduleApplication.class})
@RunWith(SpringRunner.class)
public class TaskServiceTest {@Autowiredprivate TaskService taskService;@Testpublic void test(){Task task = new Task();task.setParameters("varin".getBytes());task.setTaskType(0);task.setExecuteTime(new Date().getTime());task.setPriority(1333);taskService.addTask(task);}
}

效果:

测试:未来任务
package com.heima.schedule.test;import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.ScheduleApplication;
import com.heima.schedule.service.TaskService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Calendar;
import java.util.Date;@SpringBootTest(classes = {ScheduleApplication.class})
@RunWith(SpringRunner.class)
public class TaskServiceTest {@Autowiredprivate TaskService taskService;@Testpublic void test(){Task task = new Task();task.setParameters("varin".getBytes());task.setTaskType(0);Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 2);task.setExecuteTime(calendar.getTimeInMillis());task.setPriority(1333);taskService.addTask(task);}
}

效果:

取消任务

流程:

取消方法:cancelTask()

@Override
public Boolean cancelTask(Long taskId) {
Boolean flag = false;// 先通过id删除数据库中的任务,再将日志文件状态修改
Task task =  updateTask(taskId);
if(task!=null){removeTaskFromCache(task);flag = true;
}
// 在通过返回的task查询到redis中的数据并将其删除
return flag;
}// 删除缓存中的任务
private void removeTaskFromCache(Task task) {String key =task.getTaskType()+"_"+task.getPriority();// 时间小于=当前时间if (task.getExecuteTime()<=System.currentTimeMillis()) {cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));// 时间大于当前时间并且只是多5分钟内}else {cacheService.zRemove(ScheduleConstants.FUTURE+key,JSON.toJSONString(task));}}// 先通过id删除数据库中的任务,再将日志文件状态修改private Task updateTask(Long taskId) {
Task task = null;
try{// 1. 删除taskinfoMapper.deleteById(taskId);// 2.查询TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(taskId);taskinfoLogs.setTaskType(ScheduleConstants.CANCELLED);taskInfoLogsMapper.updateById(taskinfoLogs);// 复制属性到task并返回BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}
return task;}

测试方法

package com.heima.schedule.test;import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.ScheduleApplication;
import com.heima.schedule.service.TaskService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Calendar;
import java.util.Date;@SpringBootTest(classes = {ScheduleApplication.class})
@RunWith(SpringRunner.class)
public class TaskServiceTest {@Autowiredprivate TaskService taskService;@Testpublic void addTask(){Task task = new Task();task.setParameters("varin varin varintest".getBytes());task.setTaskType(0);Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 2);task.setExecuteTime(calendar.getTimeInMillis());task.setPriority(99999);taskService.addTask(task);}@Testpublic void deleteTask(){System.out.printf("删除任务:"+ taskService.cancelTask(1970463648249909250L));}}
效果:
![](https://i-blog.csdnimg.cn/img_convert/3e8e9d637deb1fbaca4aa69734f06946.png)

消费任务

pull方法

 @Overridepublic Task pull(Integer taskType, Integer priority) {Task task =null;try {String key = taskType +"_"+priority;//通过cacheService类中的pop方法将list中的任务消费弹出String taskJson= cacheService.lRightPop(ScheduleConstants.TOPIC + key);if (StringUtils.isNotBlank(taskJson)) {task = JSON.parseObject(taskJson, Task.class);// 更想删除taskInfo并更新log表中的状态delTaskInfoAndUpdateTaskLog(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){log.error("poll task exception");}return task;}

测试

package com.heima.schedule.test;import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.ScheduleApplication;
import com.heima.schedule.service.TaskService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.Calendar;
import java.util.Date;@SpringBootTest(classes = {ScheduleApplication.class})
@RunWith(SpringRunner.class)
public class TaskServiceTest {@Autowiredprivate TaskService taskService;@Testpublic void pull(){Task task = taskService.pull(1, 993);System.out.println(task);}}

效果:


加油呀!

http://www.dtcms.com/a/398294.html

相关文章:

  • 建站特别慢wordpress网站项目总体设计模板
  • 驱动开发,为什么需要映射?
  • 网站栏目模版确定网站推广目标
  • AI产品经理项目实战:BERT语义分析识别重复信息
  • 亚远景-ISO 42001:为汽车AI安全设定新标杆
  • 电路方案分析(二十四)汽车高压互锁参考设计
  • 深圳网站快速备案手机app播放器
  • CSS精灵技术
  • 数据库导论#1
  • Web应用接入支付功能的准备工作和开发规范
  • 专业做logo的网站wordpress安装模板
  • 8 shiro的web整合
  • iOS 26 系统电耗分析实战指南 如何检测电池掉电、液体玻璃导致的能耗变化
  • 自动化平台自动化能力统一的建设
  • 做网站学的是代码吗网站备案流程教程
  • 【Unity 入门教程】二、核心概念
  • 【春秋云镜】CVE-2022-30887(文件上传/rce)
  • [iOS] YYModel 初步学习
  • 视频录屏软件 视频录屏软件 Bandicam (班迪录屏) 8.2.2.2531
  • 今天继续学习nginx服务部署与配置
  • flutter 编译报错java.util.zip.ZipException: zip END header not found
  • 网站建设精英京东商城网站域名
  • 《AI工具驱动的分布式任务调度系统从0到1实践解析》
  • C#练习——事件
  • 深拷贝浅拷贝的区别?如何实现⼀个深拷贝?
  • C primer plus (第六版)第十一章 编程练习第10题
  • AgentScope Studio 安装与测试
  • 长沙房产交易中心官网做seo网站空间
  • 金融培训网站源码淘宝基地网站怎么做
  • Spark核心Storage详解