【redis-06】redis的stream流实现消息中间件

redis系列整体栏目


内容链接地址
【一】redis基本数据类型和使用场景https://zhenghuisheng.blog.csdn.net/article/details/142406325
【二】redis的持久化机制和原理https://zhenghuisheng.blog.csdn.net/article/details/142441756
【三】redis缓存穿透、缓存击穿、缓存雪崩https://zhenghuisheng.blog.csdn.net/article/details/142577507
【四】redisson实现分布式锁实战和源码剖析https://zhenghuisheng.blog.csdn.net/article/details/142646301
【五】redis保证和mysql数据一致性https://zhenghuisheng.blog.csdn.net/article/details/142687101
【六】redis的stream流实现消息中间件https://zhenghuisheng.blog.csdn.net/article/details/142721269

如需转载,请输入:https://blog.csdn.net/zhenghuishengq/article/details/142721269

redis的stream流实现消息中间件

  • 一,redis的新特性-队列stream
    • 1,redis的stream流基本使用
    • 2,stream队列消息消费
      • 2.1,单消费者
      • 2.2,消费者组
        • 2.2.1,订阅消费者组
        • 2.2.2,消息消费
    • 3,stream出现之前如何实现消息中间件
      • 3.1,list类型实现
      • 3.2, Pub/Sub 发布订阅模式
    • 4,stream底层设计及优化
      • 4.1,队列设置最大值
      • 4.2,使用消费者组
      • 4.3,消息的应答机制
      • 4.4,优化点

一,redis的新特性-队列stream

在了解这个redis的新特性之前,可以先查看一下官网的详细文档:stream流官方文档

redis的stream流是从5.0版本才开始提出,本人这里安装的是 6.2.6 版本。它的底层原理是借鉴于kafka的底层实现,因此可以参考本人前面的写的kafka的文章。redis 的stream流队列其主要组件有:消息队列、生产者、消费者、消费者组、消息及消息id、偏移量等

在这里插入图片描述

建立这个stream的主要原因,是作者想通过这个redis来取代mq那些中间件,redis在项目中时必不可少的,但是引入mq就会多引入一个第三方的中间件,让系统稳定性没那么高,mq一挂就有可能导致整个系统瘫痪

1,redis的stream流基本使用

创建一个stream队列的命令如下,通过xadd的方式实现往队列中添加消息。如下创建一个商品的队列,然后设置商品type类型为小米手机,商品名称name为小米8,得到的结果如下图

xadd product_queue * type xiaomi name xiaomi8

product_quque 表示队列的名称,***** 表示由服务器自动生成一个id,其id通过时间戳+序号(毫秒时间内第n条消息)

在这里插入图片描述

可以直接通过 xlen 命令查看队列的长度,可以发现已经队列的长度为4

xlen product_queue

在这里插入图片描述

也可以直接通过 xrange 命令将全部的消息展示出来,在后面需要加上 - + 两个命令操作符,也可以在后面加一个id来设置范围 。

  • - 表示在这个队列中的最小的id,
  • + 表示在这个队列里面最大的id
xrange product_queue - +
xrange product_queue - 1728139368101-0   //获取前两个
xrange product_queue 1728139374509-0 +   //获取后两个

在这里插入图片描述

删除命令也比较简单,可以直接通过 xdel 命令实现删除,执行完命令之后,可以发现队列中的数据已被删除。但是这个xdel使用的是逻辑删除消息,而不是物理删除。

xdel product_queue 1728139368101-0

在这里插入图片描述

也可以查看整个stream队列的详细信息,可以直接通过 xinfo 命令来实现。其返回信息如下,length表示返回4条数据,

xinfo stream product_queue

返回的消息如下,会将整个队列的信息详细的返回,并且根据不同的redis版本返回一些不同的额外参数


127.0.0.1:6379> XINFO STREAM product_queue
 1) "length"	
 2) (integer) 5					//表示5条数据
 3) "radix-tree-keys"	
 4) (integer) 1					//用于存储 Stream 元素的 Radix Tree 中的键数量
 5) "radix-tree-nodes"
 6) (integer) 2					//Radix Tree 中的节点数量,反映了树的复杂度
 7) "groups"
 8) (integer) 2					//与此 Stream 相关的消费组(Consumer Group)数量
 9) "last-generated-id"
10) "1608049761947-0"			//Stream 中最后一个生成的消息的 ID
11) "first-entry"
12) 1) "1608049732151-0"
    2) 1) "name"
       2) "item1"
13) "last-entry"
14) 1) "1608049761947-0"
    2) 1) "name"
       2) "item5"

2,stream队列消息消费

由于redis的stream流主要是借鉴于kafka,因此其内部消费方式和kafka一样,主要有单消费者和消费者组 。由于上面已经往 product_queue 队列中投递了消息,因此接下来主要讲解消息如何被消费

2.1,单消费者

单消费者也比较好理解,就是此时不属于任何一个消费者组中的消费者。其消费方式如下,可以直接通过 xread 的方式进行消息的读取

xread count 1 streams product_queue 0-0
  • count表示读取消息的条数,比如后面接1表示只读取一条数据
  • streams表示一个关键字,需要配合xread使用
  • 0-0前面这个0表示读取队列最开始的消息,后面这个0表示只读取一条数据

在这里插入图片描述

除了从前面读取消息之外,也可以直接从后面开始读取数据,可以直接通过 $ 解决

xread count 1 streams product_queue $				//直接读取
xread block 0 count 1 streams product_queue $		//阻塞式读取    

但是直接通过这种单消费者方式实现消息消费的话,也存在着一定的缺陷,因为单消费者消费消息,其消费完成的偏移量是需要手动实现提交的,因此单消费者实现消息消费会比较的复杂。

2.2,消费者组

2.2.1,订阅消费者组

上面提到了单消费者实现消息消费需要手动的提交偏移量,这样下次才知道当前消费者的消息消费到了哪里,在redis内部中,已经提供好了一个可以自动实现消息消费后记录偏移量的功能,不需要开发者自行的去实现。

创建消费者组的命令如下,可以通过xgroup 实现,如为 product_queue 的队列创建一个consumer1的消费者组,设置从头开始读取消息

xgroup create product_queue consumer1 0-0		//从前面开始消费消息

也可以创建一个名称为consumer2的消费者组,从后面开始读取消息

xgroup create product_queue consumer2 $    		//从后面开始消费消息

可以直接通过 xinfo groups 命令来查看该队列对应的全部的消费者组的信息,可以发现此时已经有两个消费者组

xinfo groups product_queue

在这里插入图片描述

2.2.2,消息消费

在实现完消息订阅之后,由于kafka设计的理念是,一个分区下的消息只能被消费者组中的一条消息消费,因此redis中stream流的设计理念也一样。

其消息消费的命令如下,通过 xreadgroup 实现消费者组消费,GROUP表示一个关键字,需要和xreadGroup结合使用,consumer1表示一个消费者组,c1表示消费者中的任意一个消费者,count 1表示只消费一条消息,最后面的 > 表示获取的消息通过 last_delivered_id 后一条开始消费

xreadgroup GROUP consumer1 c1 count 1 streams product_queue >

在这里插入图片描述

在消息被消费完成之后,再来查看一下消费者组的详细消息,在上面执行这个命令时此时的consumer1消费者组对应的value值是0-0,当有消息被消费之后,这个消费者组对应的 last_delivered_id 就发生了改变,其指针往后移动了一位

在这里插入图片描述

其偏移量主要就是通过这个 last_delivered_id 来解决的,每次消费一条消息,偏移量就会往后移动一位,这样就能解决消息重复消费的问题,也不需要像单消费者一样需要手动去记录消息消费完后偏移量的记录。

也可以直接通过命令查看消费者本身的消息,通过 xinfo comsumers 结合使用,查看哪个队列下面的那个消费者组,可以发现此时有一个c1的消费者进行消费,并且有一条消息处于pending未确认的状态

xinfo consumers product_queue consumer1

在这里插入图片描述

当然也可以通过命令的方式手动的进行消息消费的确认机制,通过xack的机制进行手动的确认,再次查询这个消费者详细信息之后,可以发现此时处于pending未确认的状态的消息已经被确认了,此时的值为0

xack product_queue consumer1 1728141022160-0

在这里插入图片描述

3,stream出现之前如何实现消息中间件

3.1,list类型实现

在list的数据类型中,可以通过Lpush+Rprop的方式实现消息中间件,其原理也比较简单,生产者从列表的左边加入消息,消费者从列表的右边消费消息, 这样保证了消息先进先出(FIFO)原则,适用于简单的消息队列系统 。

  • 生产者:使用 LPUSH 向列表的左边插入消息
  • 消费者:使用 RPOP 从列表的右边消费消息。

但是这种数据类型也存在缺陷,只能适用于小型轻量级、快速开发的场景,如果遇到了高并发场景,或者消息需要手动确认机制等场景,那么这种list方式就不太合适

img

3.2, Pub/Sub 发布订阅模式

redis内部也提供了一种发布订阅的模式,其简单使用如下,就是通过publish发送消息,subscribe接收消息

PUBLISH channel message: 发布者通过该命令向 channel 发布 message

SUBSCRIBE channel: 订阅者通过该命令订阅 channel,并接收其发布的消息。

在redis中,这种发布订阅一般比较的适用于实时场景,如实时消息推送,聊天等场景。但是缺陷也明显:

  • 首先内部并没有提供持久化机制,意味着数据会丢失
  • 其次内部也没有提供消息小人机制,某些高可靠场景不适合
  • 消息堆积很可能造成redis宕机问题

在这里插入图片描述

4,stream底层设计及优化

再讲解上面的基本使用之后,再来看这幅图就比较简单。

在这里插入图片描述

4.1,队列设置最大值

在redis中一个队列的大小也可以设置最大值,防止因为队列太长导致内存不足而而宕机。再创建队列时可以直接通过 MAXLEN 设置最大值,并且可以通过一个 ~ 设置成一个近似值,也可以不加这个近似值成为一个精确值

//最大值设置成1000,并且是一个近似值
xadd product_queue MAXLEN ~ 1000 * type xiaomi name xiaomi8
    
//最大值设置成1000,并且是一个精确值
xadd product_queue MAXLEN 1000 * type xiaomi name xiaomi8

近似值往往可以更加灵活,在性能上高于精准值。当达到或者超过这个设置的值的时候,redis就会触发内存淘汰策略将数据淘汰。

4.2,使用消费者组

在消费者端应该直接考虑使用消费者组而不是单消费者,每个消费者组内部有一个 last_delivered_id ,可以通过这个字段记录对应的偏移量,这样如果出现宕机或者重启等情况,就能知道消费者消费到了哪个偏移量上面,从而从根本上解决一些消息重复消费等情况

4.3,消息的应答机制

在每个消费者组中,都会有一个pendding ids的数组,这个数组会记录所有未应答的消息id,可以通过确认数组中的id来保证消息确实被消费。当消息长时间不被ack应答时,也会被触发内存淘汰策略被淘汰

4.4,优化点

如果设计的队列太多,可以考虑部署一些cluster集群、哨兵+主从集群来保证整个系统的高可用和高性能。如果一个队列中的消息被大量的生产和消费,可以考虑 写热点分散 的方式将数据多分布在几个队列里面,然后通过hash或者轮询等方式进行消息的消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/889649.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32-ADC模数转换

一、概述 ADC(Analog-Digital Converter)模拟-数字转换器 ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字变量,建立模拟电路到数字电路的桥梁12位逐次逼近型ADC,1us转换时间输入电压范围:0~3.3V&#xff…

vscode配置R语言debugger环境:“vscDebugger“的安装

要在 R 中安装 vscDebugger 包,可以按照以下步骤进行: 方法一:使用命令面板自动安装 打开命令面板: 在 Visual Studio Code 中按 CtrlShiftP 打开命令面板。 运行安装命令: 在命令面板中输入并选择 r.debugger.updat…

大数据新视界 --大数据大厂之 Dremio:改变大数据查询方式的创新引擎

💖💖💖亲爱的朋友们,热烈欢迎你们来到 青云交的博客!能与你们在此邂逅,我满心欢喜,深感无比荣幸。在这个瞬息万变的时代,我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

前端的全栈混合之路Meteor篇:分布式数据协议DDP深度剖析

本文属于进阶篇,并不是太适合新人阅读,但纯粹的学习还是可以的,因为后续会实现很多个ddp的版本用于web端、nodejs端、安卓端和ios端,提前预习和复习下。ddp协议是一个C/S架构的协议,但是客户端也同时可以是服务端。 什…

SSD |(二)SSD主控

文章目录 📚控制器架构🐇PCIe和NVMe控制器前端子系统🐇NAND闪存控制器后端子系统🐇内存子系统🐇安全子系统🐇CPU计算子系统 📚控制器架构 控制器作为一个片上系统,处理来自用户端的…

【二分算法】——8个题目让你找到二分算法的感觉势如破竹

文章目录 1.二分查找2.在排序数组中查找元素的第一个和最后一个位置3.x的平方根4.搜索插入位置5.山脉数组的峰顶索引6.寻找峰值7.寻找旋转排序数组中的最小值8.JZ53(2) 1.二分查找 https://leetcode.cn/problems/binary-search/ 思路: 标准的二分查找。给定一个有序数组和目…

【2024版本】Mac/Windows IDEA安装教程

IDEA 2024版本真的很强大,此外JDK发布了最新稳定版 JDK21 ,只有新版本支持JDK 21、JDK22。原来数据库插件不支持redis等一些NoSql的数据库的连接,如果要使用需要自己单独装收费的插件。直接打开idea就很吃内存了,再打开其他一大堆…

47 C 语言实战项目——家庭收支记账软件

目录 1 需求说明 1.1 菜单显示 1.2 登记收入 1.3 登记支出 1.4 显示收支明细 1.5 退出 2 流程分析 2.1 总流程图 2.2 登记收入流程图 2.3 登记支出流程图 2.4 收支明细流程图 2.5 退出流程图 3 代码实现 3.1 框架搭建 3.2 收支明细功能 3.3 登记收入功能 3.4 …

23年408数据结构

第一题: 解析: 第一点,我们要知道顺序存储的特点:优点就是随用随取,就是你想要查询第几个元素可以直接查询出来,时间复杂度就是O(1),缺点就是不适合删除和插入,因为每次删除和插入一…

【数据分享】2000—2023年我国省市县三级逐年植被覆盖度(FVC)数据(Shp/Excel格式)

之前我们分享过2000—2023年逐月植被覆盖度(FVC)栅格数据(可查看之前的文章获悉详情)和Excel和Shp格式的省市县三级逐月FVC数据(可查看之前的文章获悉详情),原始的逐月栅格数据来源于高吉喜学者…

青云AI算力创新:直击AI落地痛点 打造企业数智化基石

在当今这个数字化、智能化的时代,企业数字化转型、智能化升级应用实践在加速,AI算力已经成为企业数字化转型和智能化升级的重要基石,而AI算力在推动技术创新和业务增长中起到了关键作用。青云科技近日举办的AI算力发布会,标志着AI…

知识图谱入门——5:Neo4j Desktop安装和使用手册(小白向:Cypher 查询语言:逐步教程!Neo4j 优缺点分析)

Neo4j简介 Neo4j 是一个基于图结构的 NoSQL 数据库,专门用于存储、查询和管理图形数据。它的核心思想是使用节点、关系和属性来描述数据。图数据库非常适合那些需要处理复杂关系的数据集,如社交网络、推荐系统、知识图谱等领域。 与传统的关系型数据库…

如何快速给word文件加拼音?请跟着步骤完成吧

如何快速给word文件加拼音?在日常工作中,我们时常会遇到需要为Word文件中的文字添加拼音的情况,这尤其在教育、出版或国际交流等领域显得尤为重要。为文字配上拼音,不仅能帮助学习者准确发音,还能提升文档的可读性和普…

【JavaEE初阶】深入理解不同锁的意义,synchronized的加锁过程理解以及CAS的原子性实现(面试经典题);

前言 🌟🌟本期讲解关于锁的相关知识了解,这里涉及到高频面试题哦~~~ 🌈上期博客在这里:【JavaEE初阶】深入理解线程池的概念以及Java标准库提供的方法参数分析-CSDN博客 🌈感兴趣的小伙伴看一看小编主页&am…

【STM32单片机_(HAL库)】4-2-1【定时器TIM】定时器输出PWM实现呼吸灯实验

1.硬件 STM32单片机最小系统LED灯模块 2.软件 pwm驱动文件添加定时器HAL驱动层文件添加GPIO常用函数定时器输出PWM配置步骤main.c程序 #include "sys.h" #include "delay.h" #include "led.h" #include "pwm.h"int main(void) {HA…

【瑞萨RA8D1 CPK开发板】串口的使用和STDOUT输出重定向

串口 本次串口的使用关于时钟导致串口的波特率不对,坑了我很久的时间 使能时钟 串口发现一个问题就是,只能使用下边的时钟配置,修改时钟源和分频系数都会导致串口波特率不正常,这种问题出现在mdkrasc的使用场景之下&#xff1b…

adaptor lora基础

https://www.zhihu.com/question/508658141/answer/3340979311 adaptor和PEFT的区别:前者在模型子层后加一个小型的dense;后者直接稀疏化模型本身; Loading Pre-Trained Adapters — AdapterHub documentation CVPR 2024 | SD-DiT&#xff…

Python | Leetcode Python题解之第468题验证IP地址

题目: 题解: class Solution:def validIPAddress(self, queryIP: str) -> str:if queryIP.find(".") ! -1:# IPv4last -1for i in range(4):cur (len(queryIP) if i 3 else queryIP.find(".", last 1))if cur -1:return &q…

每日OJ题_牛客_小乐乐改数字_模拟_C++_Java

目录 牛客_小乐乐改数字_模拟 题目解析 C代码 Java代码 牛客_小乐乐改数字_模拟 小乐乐改数字_牛客题霸_牛客网 (nowcoder.com) 描述: 小乐乐喜欢数字,尤其喜欢0和1。他现在得到了一个数,想把每位的数变成0或1。如果某一位是奇数&#…

【网络安全】CVE-2024-46990: Directus环回IP过滤器绕过实现SSRF

未经许可,不得转载。 文章目录 背景漏洞详情受影响版本解决方案背景 Directus 是一款开源 CMS,提供强大的内容管理 API,使开发人员能够轻松创建自定义应用程序,凭借其灵活的数据模型和用户友好的界面备受欢迎。然而,Directus 存在一个漏洞,允许攻击者绕过默认的环回 IP …