一、原理
消息入队(消息生产):
第一次 A
第二次 B A
第三次 C B A消息出队(消息消费):
从右往左开始,依次消费A、B、C结论:简易队列,先进先出
二、代码
class Program
{// Redis 连接配置private static readonly ConnectionMultiplexer Redis =ConnectionMultiplexer.Connect("127.0.0.1:6379");private static readonly IDatabase Database = Redis.GetDatabase();private const string QueueKey = "simple_queue"; // 队列名称(Redis List 的 key,不要和现有的重复)/// <summary>/// 发送消息(生产者)/// </summary>/// <param name="message">消息内容(字符串)</param>public static async Task<bool> ProduceAsync(string message){if (string.IsNullOrEmpty(message)) return false;try{await Database.ListLeftPushAsync(QueueKey, message);Console.WriteLine($"[生产者] 已发送消息: {message}");return true;}catch (Exception ex){Console.WriteLine($"[生产者] 发送失败: {ex.Message}");return false;}}/// <summary>/// 接收消息(消费者) - 阻塞模式(推荐)/// </summary>public static async Task ConsumeAsync(){Console.WriteLine("[消费者] 开始监听消息队列...");while (true){try{var result = await Database.ListRightPopAsync(QueueKey);if (!result.IsNull){string message = result.ToString();Console.WriteLine($"[消费者] 收到消息: {message}");// 在此处处理你的业务逻辑ProcessMessage(message);}}catch (Exception ex){Console.WriteLine($"[消费者] 读取消息出错: {ex.Message}");await Task.Delay(1000); }}}/// <summary>/// 模拟处理消息(业务逻辑方法)/// </summary>private static void ProcessMessage(string message){// 例如:保存到数据库、调用 API 等Console.WriteLine($"[Processor] 已处理消息: {message}");Thread.Sleep(5000);}// --- 主程序示例 ---static async Task Main(string[] args){var isConsume = true;//是否为消费者if (isConsume){// 启动消费者await ConsumeAsync();}else{//启动生产者for (int i = 1; i <= 5; i++){string msg = $"Test message {i} at {DateTime.Now:HH:mm:ss}";await ProduceAsync(msg);await Task.Delay(1000); // 每秒发一条}}}
}
三、效果
