中间件
[toc]
中间件
1. redis
1.1 数据类型
string: set key value; 自己构建了一种简单动态字符串(simple dynamic string, SDS), 二进制安全, 不会造成缓冲区溢出, string可以包含任何数据, 获取字符串长度复杂度为O(1), 一个键最大能存储512MB. 一般用于需要计数场景: 用户访问次数, 热点文章点赞转发数等.
hash: hmset name key1 value1 key2 value2; 键值对结合, string类型的field和value的映射表, 适合存储对象. 可以直接仅修改对象某个字段的值.一般用于对象数据的存储
list: 双向链表, 字符串列表, 按照插入顺序排序; lpush name value(给key=name的list头部添加字符串元素); rpush(给key=name的list尾部添加字符串元素);LREM key count VALUE(给key=name的list根据参数COUNT的值,移除列表中与参数VALUE相等的元素). 一般用于发布订阅(消息队列), 慢查询等
set: sadd name value; string类型的无序集合(不允许重复值), 通过hash表实现的, 增加删除查找复杂度都是O(1), 轻易实现交集, 并集, 差集等操作.一般用与存放数据不能重复的以及需要多数据源交集并集等场景
zset(sorted set): zadd name score value; string类型的有序集合, 不允许重复值. 每个元素都会关联一个double类型的分数, 通过分数为集合成员进行排序. 成员唯一但是分数可重复. 一般用于需要对数据根据权重进行排序场景.
bitmap: setbit name length value; 存储连续二进制数字, 通过bitmap, 只需要一个bit位来表示某个元素对应的值或者状态. 极大节省存储空间. 一般用于保持状态信息(是否签到, 是否登录) 并需要进一步对这些信息进行分析的场景.
hyperLogLog: 基数统计算法, 当输入数量或体积非常大, 计算基数所需空间总是固定的, 且很小. 只会根据输入元素计算基数, 而不会存储元素本身, 不能返回输入的各个元素
stream: 流. 用来创建消息队列
1.2 线程模型
基于reactor模式开发了一套高效的事件处理模型, 这套事件处理模型对应的是redis中的文件事件处理器(file event handler). 由于文件事件处理器是单线程的, 所以一般说redis是单线程模型
客户端连接: 使用IO多路复用, 将感兴趣的事件及类型(读,写)注册到内核并监听每个事件是否发生. 多路复用的使用让redis不需要额外创建多余线程来监听客户端的大量连接, 降低了资源消耗(与NIO中的Selector很像)
文件事件处理器
- 多个socket(客户端连接)
- IO多路复用程序(支持多个客户端连接)
- 文件事件分派器(socket关联的相应事件处理器)
- 事件处理器(连接应答处理器, 命令请求处理器, 命令回复处理器)
多线程:
4.0已增加多线程, 主要针对大键值对删除操作的命令, 使用这些命令就会使用主处理之外其他线程来清理脏数据, 无用连接释放, 大key删除等
6.0后引入多线程, 主要是为了提供网络IO读写性能, 只在网络数据读写这类耗时操作才使用, 执行命令仍是单线程执行
6.0如何开启多线程?
配置文件io-threads-do-reads yes
开启多线程, io-thread 4
设置线程数(官方建议4核设置位2到3线程, 8核设置位6个线程, 一定要小于核数; 并不是越大越好, 超过8个没意义了)
流程简述:
- 主线程负责接收建立连接请求, 获取socket放入全局等待读处理队列
- 主线程处理完读事件后, 通过Round Robin将连接分配给IO线程
- 主线程阻塞等待IO线程读取socket完毕
- 主线程通过单线程方式执行请求命令, 请求数据读取并解析完成,但不执行
- 主线程阻塞等待IO线程将数据回写socket完毕
- 解除绑定, 清空等待队列
为何使用单线程?
- 单线程编程容易且更容易维护
- redis性能瓶颈不再CPU, 而是内存和网络
- 多线程存在死锁, 上下文切换等问题, 甚至会影响性能
1.3 事务
redis事务是一组命令的集合, redis最小执行单位, 要么都执行,要么都不执行.
- 批量操作发生EXEC命令放入队列缓存
- 收到EXEC命令进入事务执行, 事务中任意命令执行失败, 其余命令仍然被执行
- 执行过程中, 其他命令请求不会插入到事务执行命令序列中
不具备原子性: 单个redis命令是原子性的, 但是没有事务上增加任何维持原子性机制, 可以理解位打包批量执行脚本, 如果中间某条指令失败不会导致前面已做指令回滚, 也不会阻止后续指令执行
1.4 持久化
将内存中的对象保存到可永久保存的存储设备中
主要2中方式:
RDB(快照 snapshotting)持久化
- 内存中的数据库记录定时dump到磁盘上
- fork一个进程, 遍历hash table ,利用copy on write, 将这个db dump保存
- save, shutdown, slave命令会触发这个操作
- 粒度较大, 如果save, shutdown, slave之前crash了, 中间操作无法恢复
- 默认持久化方式
save 900 1 #在900秒(15分钟)之后,如果至少有1个key发生变化,Redis就会自动触发BGSAVE命令创建快照。
save 300 10 #在300秒(5分钟)之后,如果至少有10个key发生变化,Redis就会自动触发BGSAVE命令创建快照。
save 60 10000 #在60秒(1分钟)之后,如果至少有10000个key发生变化,Redis就会自动触发BGSAVE命令创建快照。
优点:
- 紧凑型二进制文件, 适合备份,全量复制场景
- 恢复速度快与AOF
- 对redis影响小, 可保存redis高性能
缺点:
- 无法实现实时或者秒级持久化
- 新老版本无法兼容RDB格式
- 每次fork子进程来执行RDB快照数据文件生成时, 如果文件特别打, 可能会导致对客户端提供的服务暂停数号码, 或者甚至数秒.
AOF(只追加文件, append-only file)持久化:
- Redis操作日志以追加方式写入文件
- 把写操作指令持续写道一个类似日志文件中
- 粒度较小, crash后, 只有crash之前没来得及做日志的操作没办法恢复
- 开启需要设置
appendonly yes
- 每执行一条更高命令, 会将该命令写入到内存缓存
server.aof_buf
中, 再根据appendfsync
配置来决定何时将其同步到硬盘AOF文件 - 保存位置与RDB相同, 都是同dir参数设置, 默认文件名为
appendonly.aof
配置:
appendfsync always #每次有数据修改发生时都会写入AOF文件,这样会严重降低Redis的速度
appendfsync everysec #每秒钟同步一次,显示地将多个写命令同步到硬盘
appendfsync no #让操作系统决定何时进行同步
一般使用everysec
每秒同步一次AOF文件, 对性能几乎没有任何影响, 最多丢失一秒内产生数据
优点:
- 更好的包含数据不丢失
- appen-only模式写入性能较高
- 适合做灾难性的误删除紧急恢复
- 日志文件即使过大, 出现后台重写操作,也不影响客户端读写. (写入日志时, 会对指令压缩, 创建一份需要恢复数据最小的日志. 创建日志文件时,老文件照常写入, 新日志文件创建完成后, 交换新老日志即可)
缺点:
- AOF文件比RDB大
- AOF开启后, 会对写的QPS有所影响, QPS要下降
- 数据库恢复比较慢, 不适合冷备
4.0开始, 支持RDB+AOF混合持久化(默认关闭, 通过aof-use-rdb-preaamble
开启).
打开时, AOF重写就直接把RDB内容写入AOF开头, 结合RDB和AOF优点, 快速加载同时避免丢失过多数据, 缺点: AOF的RDB是压缩格式不再是AOF格式, 可读性较差
1.5 过期与失效测率
1.5.1 过期
给缓存数据设置过期事件, 有助于缓解内存消耗. 有些数据也著需要再一段时间段存在
如何判断数据释放过期: 通过过期字典表(看作hash表)来保存过期数据. 键指向某个key, 值为一个long long类型, 保存过期时间(毫秒精确的UNIX时间戳)
1.5.2 删除策略
redis采用定期删除+惰性删除. 但是还是存在大量过期key堆积内存, 这时就需要内存淘汰机制了
- 惰性删除: 只会在取出key时才对数据进行过期检查. 对CPU友好, 但可能造成太多过期key没有被删除
- 定期删除: 每隔一段时间抽取一批key执行删除过期key操作. redis会通过限制删除操作执行的时长和频率来减少删除操作对CPU的影响
1.5.3 内存淘汰策略
- volatile-lru(least recently used): 从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰
- volatile-ttl: 从已设置过期时间的数据集(server.db[i].expires)中挑选将于过期的数据淘汰
- volatile-random: 从已设置过期时间的数据集(server.db[i].expires)中随机挑选数据淘汰
- allkeys-lru(least recently used): 当内存不足以容纳写入新数据时, 在键空间中,移除最近最少使用的key(最常用)
- allkeys-random: 从数据集(server.db[i].dict)中随机选择数据淘汰
- no-eviction: 禁止驱逐数据, 当内存不足以容纳新写入数据时, 写操作报错.
- volatile-lfu(least frequently used): 从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰
- allkeys-lfu(least frequently used): 当内存不足以容纳写入新数据时, 在键空间中,移除最不经常使用的数据淘汰
1.6 缓存穿透
大量请求key不存在于数据库中, 自然缓存中也没有, 导致每次都请求到数据库, 进行了2次无用查询.
解决办法:
- 缓存空对象: 如果查询返回结果数据为空(不管是否故障或数据不存在), 仍把结果缓存,但这个缓存过期时间很短, 最长不超过5分钟
- 布隆过滤器: 占用内存很小,位存储, 性能特别高. 使用key的hash判断key是否存在, 将所有可能存在的数据hash到一个足够大的bitmap中, 一个一定不存在的数据会被bitMap拦截掉, 从而避免对底层存储系统的查询压力
1.7 缓存雪崩
缓存同一时间大面积失效, 所有请求都直接落到数据库上, 造成数据库短时间承受大量请求.
解决办法:
- 加锁队列: 通过redis分布式全局锁, 拿到锁后, 加载数据库并设置缓存, 否则重试get
- 数据预热: 数据上线后, 相关缓存数据直接加载到缓存. 通过缓存reload机制,预先更新缓存, 再即将发生大并发访问前手动触发加载缓存不同key
- 双层缓存策略: C1为原始缓存, C2为拷贝缓存, C1失效时, 可访问C2; C1缓存失效时间设置为短期, C2设置为长期
- 定时更新缓存策略: 容器启动初始化加载, 定时任务更新或移除缓存
- 设置不同时期时间, 让缓存失效时间点尽量均匀
缓存预热:
- 数据量不大, 工程启动进行加载缓存动作
- 数据量大, 设置定时任务脚本, 进行缓存刷新
- 数据量太大, 优先保证热点数据进行提前加载到缓存
1.8 缓存击穿
数据库中有数据, 读缓存没读到数据, 去数据库查,导致数据库压力瞬间增大, 造成过大压力
解决方法:
- 让一个线程构建缓存, 其他线程等待构建缓存执行完, 重写获取数据. 单机就是(sychronglized或者lock), 分布式就是分布式锁(redis 分布式锁)
- 永不过期: 给缓存不设置过期时间, 过期时间存到key对应的value中, 如果发现过期, 则后台异步线程进行缓存构建, 逻辑过期.
- 缓存屏障, 类似方法1, 使用CountdownLatch和CAS实现轻量级锁.
1.9 缓存读写策略
1.9.1 旁路缓存模式(Cache Aside Pattern)
比较适合读请求比较多的场景
写:
- 先更新DB
- 直接删除cache
读:
- 从cache读取, 读到就直接返回
- 读不到,从DB读取数据返回
- 数据放入cache中
问题:
- 能否先删除cache, 再更新数据库?
会导致缓存不一致. 多线程时, 会存在cache一直缓存旧数据 - 先更新后缓存没问题么?
有几率出现问题, A先读取数据, B更新数据并删除cache, A执行完看到缓存为空设置了缓存
缺陷:
- 首次请求数据不再cache中, 解决办法: 热点数据提前放入cache
- 写操作频繁的话, cache数据被频繁删除, 影响缓存命中率, 解决办法: 1. 必须强一致: 更新DB同时更新cache, 这样就需要分布式锁来保证更新cache时不存在线程安全性问题. 2. 允许短暂不一致: 更新db同样更新cache, 给缓存加一个短过期时间, 这样即使不一致也影响较小
1.9.2 读写穿透模式(Read/Write Through Pattern)
将cache作为主要数据存储, 从cache中读取数据并写入. cache服务负责将此数据读取和写入db, 减轻应用程序职责. (开发中少见, redis并没有提供cache将数据写入db功能).
是对Cache-Aside Pattern的封装
写:
- 先查cache, cache不存在, 直接更新DB
- cache中存在, 先更新cache, 然后由cache服务自己更新到DB(同步更新cache和DB)
读:
- 从cache中读取数据, 读取到则直接返回
- 读取不到, 先从db加载. 写入到cache后返回相应
缺陷:
- 首次请求数据不再cache中, 解决办法: 热点数据提前放入cache
1.9.3 异步缓存写入模式(Write Behind Pattern)
与读写穿透模式(Read/Write Through Pattern)相似, 都是由cache服务负责cache和DB读写
不同之处: 只更新缓存,不直接更新DB, 使用异步批量方式来更新DB
使用场景: 消息队列中消息异步写入磁盘, mysql innodb buffer poll都使用这种策略
DB写性能很高, 适合数据经常变化又对数据一致性要求没那么高的场景,比如浏览量,点赞量
缺陷:
- cache服务未异步更新db, cache服务挂掉了
1.10 集群架构
1.10.1 主从复制
建立主从关系:
- slaveof命令:
建立主从命令: slaveof ip port
取消主从命令: slaveof no one - redis.conf配置文件
配置slaveof ip port
从节点只读: slave-read-only yes # 配置只读
从节点不会让key过期, 只会等待master过期key, 如果master要过期一个key或者淘汰一个key, 那么会模拟一条del命令发送给slave.
主从会互相发送心跳消息. 主节点每隔10秒发送一次. 从节点每隔1秒发送一次.
1.10.2 复制过程
- 从节点执行slaveof命令
- 只保存slaveof中主节点信息, 没有立即发起复制
- 从节点定时任务发现有主节点信息, 开始socket连接主节点
- 连接建立成功, 发送ping命令, 等待pong相应, 否则重连
- 如果主节点设置权限, 进行权限验证,验证失败,复制终止
- 权限验证通过, 发送PSYNC给主节点
- 从节点初次连接, 触发全量复制(full resynchronization). 主节点启动一个后台进程. 开始生成一份RDB快照文件. 同时还会将从客户端client收到的所有写命令缓存到内存中.
- RDB文件生成完毕, 主节点会将RDB发给从节点, 从节点会先写入磁盘,再从本地磁盘加载到内存中, 开始数据同步
- 后续主节点会持续把写命令发送给从节点, 从节点同步这些数据, 保证主从数据一致性
- 如果从节点与主节点由网络故障, 断开了连接, 会自动重连, 主节点仅会复制从节点缺少的部分
主从复制断点续传: 主节点内存会维护一个backlog, 主从都会保持一个replica offset和master run id, offset就保存再backlog中, 网络断开重连后, slave会从master上次的replica offset开始继续复制, 如果没有找到对应offset, 那么会执行一次 resynchronization.
1.10.3 主从结构
主从结构异步方式复制数据到slave节点, 从redis2.8开始, slave会周周期性确认每次复制数据量.
主要可以进行: 1. 冗余备份, 持久化保证即使redis服务重启也会丢失数据, 通过主从复制机制就可以避免单点故障 , 2. 读写分离
- 一个master可以有多个slave, slave也可以有自己的slave
- 复制在master端是非阻塞的, 即便多个slave首次同步,也可以提供查询服务
- 复制在slave端也是非阻塞的, 如果在redis.conf设置, slave首次同步时仍可使用旧数据集提供查询; 也可以配置当master, slave断开, slave返回客户端一个错误提示
- slave要删掉旧数据集, 加载新版数据集时, 会阻塞连接请求
主从建议开启master节点的持久化(万一关闭后, 主节点重启, 一轮复制后, slave数据丢失. 即使采用高可用, 如果sentinel还没检测到master failure. master就重启完成了. 还是会导致slave数据被清空 ), 确保启动时, 是由数据的.
1.10.4 慢查询配置
统计每条指令执行时间, 超过某个阈值就是慢指令.
slowlog-log-slower-than: 10000 # 慢查询指令阈值, 单位微秒
slowlog-max-len: # 慢查询日志列表长度, 当超过长度, 最早一条命令将从列表移除
1.10.4 架构模式
- 主从模式: 一个主节点, 多个从节点. 主节点数据备份到从节点, 从节点不进行实际操作, 只做同步操作, 并不能起到高并发目的
- 哨兵模式: 一个哨兵集群和一组主从架构. 主节点宕机后, 哨兵自动选举主节点对外提供服务
- 集群架构: 多个小主从聚集一起堆外提供服务, 将16384个插槽切分存储. 并不是一个强一致的集群架构,每个小主从会存在选举机制, 保证对外高可用.
1.11 Redis cluster(redis集群)
集群架构下, 开放2个端口号: 6379(处理业务), 16379(节点通信, cluster bus通信, 用来进行故障检测, 配置更新, 故障转移授权, 使用二进制协议gossip, 用于节点间高效数据交换,占用更少网络带宽和处理时间)
1.11.1 节点网络通信
集群元数据维护分2种方式:
集中式: 将集群元数据(节点信息,故障等)集中存储再某个节点, 如zookeeper.
优点: 元数据读取更新时效性号. 一旦变更, 立即更新到集中式存储种, 其他节点读取时就可以感知到
缺点: 所有元数据更新压力全部集中在一个地方, 可能导致元数据存储压力
gossip协议: redis使用协议.每个节点都持有一份元数据. 不同节点出现元数据变更就不断将元数据发送给其他节点,让其他节点也进行元数据变更.
优点: 元数据更新比较分散, 跟新请求会陆陆续续打到所有节点上去更新, 降低了压力
缺点: 元数据更新延时, 导致集群一些操作会有滞后.
1.11.1.1 gossip协议
包含消息:
- meet: 某个节点发送meet给新加入的节点, 让节点加入集群, 然后新节点就会开始与其他节点进行通信
- ping: 每个节点频繁给其他节点发送ping, 包含自己状态和自己维护的集群元数据, 互相通过ping交换元数据
每个节点每秒会执行10次ping, 每次选择5个最久没有通信的其他节点, 当然如果某节点通信延时达到了cluster_node_timeout/2. 就立即发送ping, 避免数据交换延时过长.
每次ping会带上自己节点信息, 还有带上1/10其他节点信息, 发送出去进行交换, 至少包含3个其他节点西悉尼, 最多包含总节点-2个其他节点信息. - pong: 返回ping和meet, 包含自己的状态和其他信息, 也用于信息广播和更新
- fail: 某一节点判断另一节点fail后, 发送fail给其他节点, 通知某节点宕机.
1.11.2 分布式寻址
- hash算法: key进行hash取模, 一旦宕机会导致大部分请求无法有效拿到缓存.
- 一致性hash算法: 环形hash取模. 如果节点太少, 容易因为节点分布不均匀, 造成缓存热点. 引入了虚拟节点机制, 每个节点计算多个hash, 放到环上多个位置, 实现数据均匀分布, 负载均衡.
- hash slot算法: 固定16384个hash slot. 每个key计算CRC16, 然后取模, 获取对应key的hash slot. 每个master持有部分slot(如有3个master, 那每个master持有5000多个slot). 增加master, 就将其他master的hash slot移动过去, 减少master, 将hash slot移动到其他master上.
1.11.3 高可用与主备切换
- 判断节点宕机: 在cluster-node-timeout内. 某一节点一直没有返回pong, 被任务pfaild. 如果一个节点被认为某个节点pfail, 会gossip ping广播到其他节点, 如果超过半数节点认为pfail, 那么就会变为fail
- 从节点过滤: 对宕机master node. 从其所有slave node, 选择一个切换成master node, 检测每个slave node与master node断开时间, 超过cluster-node-timeout * cluster-slave-validity-factor, 就没资格切换成master了
- 从节点选举: 每个从节点根据对master的offset, 设置选举时间, offset越大(复制数据越多)的从节点,选举时间越靠前, 优先进行选举. 所有master node开始对正在选举的slave选举投票, 如果大部分master node(n/2 +1)都投给了某个从节点, 那么选举通过, 这个从节点切换为主节点.
与哨兵比较. 整体流程与哨兵相似, 但更强大, 集成了replication和sentinel功能.
2. RabbitMQ
2.1 消费模型

2.1.1 生产者消费者
Producer(生产者): 生成消息的一方
Consumer(消费者): 消费消息的一方
Message(消息): 消息, 一般由2部分组成; 消息头(标签label)和消息体(payload)
消息体不透明. 消息头由一系列可选属性组成: 路由键(routing-key), 优先权(priority, 相对于其他消息优先权), 发送方式(delivery-mode, 指出该消息可能需要持久性存储)等. 生产者把消息交给RabbitMQ后, RabbitMQ会根据消息头把消息发送给感兴趣的消费者
2.1.2 交换器(Exchange)
接收生产者发送的消息并将这些消息路由给服务器中队列. 如果路由不到, 可能返回给生产者, 可能丢弃.
生产者将消息发送给交换器时, 会指定路由键(RoutingKey), 来指定这个消息的路由规则.
RabbitMQ通过绑定(Binding)将交换器(Exchange)和消息对了(Queue)关联. 绑定时会指定一个绑定键(BindingKey). 交换器和消息队列是多对多的关系. 绑定多个队列到同一个交换器时, 允许使用相同的BindingKey
生产者将消息发送给交换器, 交换器会判断RoutingKey与BindingKey, 如果想匹配, 那么消息会被路由到相应队列
RabbitMQ有4中类型交换器. 对应不同的路由策略:
fanout: 所有发送给该交换器的消息路由到所有与她绑定的队列中, 不做任何判断操作. 所有交换器类型中最快. 常用来广播消息
direct: 消息路由到BindingKey与RoutingKey完全匹配的队列中. 常用来处理有优先级的任务, 根据任务优先级把消息发送给对应的队列, 这样可指派更多资源来处理高优先级的队列
top: 消息路由到BindingKey与RoutingKey匹配的队列中.
RoutingKey以"."分隔字符串(分隔开的每一段独立的字符串被称为一个单词)
BindingKey和RoutingKey一样也是点号分隔字符串
BindingKey可以存在2个特殊字符串做模糊匹配, "*"匹配一个单词, "#"匹配多个单词(也可能是零个)
header: 不推荐, 不依赖路由键匹配规则类来路由消息. 而是根据发送消息内容中的headers属性进行匹配. 发送消息到交换器时, 会读取该消息的headers(键值对形式), 对比键值对是否完全匹配队列和交换器绑定时指定的键值对, 如果完全匹配则消息会路由到该队列, 否则不会路由到该队列. 性能很差, 不实用
2.1.3 消息队列(Queue)
保存消息直到发送给消费者. 它是消息的容器,也是消息的终点. 一个消息可投入到一个或者多个队列, 消息会一直在队列中,等待消费者连接到这个队列将其取走.
多个消费者可订阅同一个队列, 这是队列中的消息会被平均分摊(Round-Robin, 即轮询)给多个消息者处理, 避免消息被重复消费
队列中数据无上限, 取决于机器内存.
2.1.4 消息中间件服务节点(Broker)
一个Broker可以简单看作一个服务节点或服务实例.
2.2 消息队列
消息队列作用: 异步处理, 应用解耦, 消息分发, 流量削峰, 消息缓冲
副作用: 系统可用性降低, 系统复杂度提高(消息重复消费,消息顺序消费,消息堆积), 一致性问题(分布式事务)等问题
2.2.1 重复消费, 重复投递
幂等, MQ内部会给每条生产者发送消息生成一个inner-mgs-id, 作为去重和幂等的依据, 必满重复消息进入队列. 消息消费时, 消息体中必须有一个业务id(同一业务全局唯一)作为去重和幂等的依据. 避免消息重复消费
2.2.2 顺序消费
- 按照同一语义消息(比如一个订单)按照唯一id hash取模到同一队列中, 并只用一个消费者消费该队列
- 消息体中添加全局有序标识来实现
2.2.3 分布式事务
2PC, TCC, 事务消息等
2.2.4 消息堆积
先检查是否出现大量的消费错误, 然后水平增加消费者
- 先修复并停掉consumer
- 新建topic, partition为原有10倍,建立临时队列, 数量为原有10倍
- 写临时consumer, 临时征用10倍机器去消费数据
- 消费完成, 恢复原有consumer
2.2.5 回溯消费
查看对应MQ是否支持
2.2.6 确保消息发送给MQ
发送方确认模式: 将信道设置为confirm模式(发送方确认模式), 所以信道发布的消息会指派一个唯一ID. 一旦消息被投递到目标队列或者消息被写入磁盘(可持久化消息), 信道给生产者发送一个确认(ACK, 包含消息唯一ID). 如果内部错误导致消息丢失, 发送nack(not acknowledge, 未确认)消息, 可以在内存维护每个消息id状态, 一定实际内没收到消息回调, 那么就可以重发
2.2.7 确保消息被消费
接收方确认机制: 消费者没接收到一条消息都必须进行确认, 只有消费者确认了消息, MQ才能安全把消息从队列中删除(没有超时机制, 而是根据消费者连接中断来判断是否需要重新发送消息)
特殊请求:
- 如果消费者接收到消息, 在确认之前断开连接或取消订阅, 重新分发到下一个订阅消费者(可能存在消息重复消费, 需要根据业务id去重)
- 如果消费者接收了消息却没有确认消息, 连接也未断开, 认为消费者繁忙, 不会给消费者分发更多消息
2.2.8 确保消息不丢失
交换器/队列的duration属性设置为true(持久化交换器/队列). 服务器崩溃或重启后不需要重建交换器/队列
消息要从崩溃中恢复, 需要:
- 消息发布前, 设置投递模式(deliveryMode)设置为2(持久)来把消息持久化
- 消息发送给持久交换器
- 消息到达持久队列
确保持久性消息能从服务器重启中恢复: 将消息写入持久化日志文件.
- 发布持久性消息到持久交换器上, 会在消息提交到日志文件后才发送响应(如果消息队列到非持久队列, 会自动从持久话日志中移除)
- 一旦消费者从持久队列消费了一条持久化消息, 会把持久化日志中的这条消息标记为等待垃圾收集
- 如果持久化消息在消费之前MQ重启, 那么会重建交换器,队列, 并绑定. 并重播持久化日志中的消息到正确的队列或者交换器上
2.2.8 消息过期处理
RabbitMQ可以设置过期时间(TTL), 如果消息在队列中积压超过一定时间就会被MQ清理. 造成大量数据丢失.
处理方法为: 等待高峰期过后, 找回丢失数据, 重新灌入MQ, 补回数据.
2.2.9 集群部署
- 集群
部署负载均衡器, 生产者,消费者访问负载均衡
负载均衡代理所有MQ. 主要作用提高吞吐量
多台机器部署实例, 创建的队列只会在一台机器上, 所有机器同步queue元数据. 实际连接到一个随机的实例, 这个实例会把对应queue的数据拉取过来.
- 集群可以扩展消息通信吞吐量, 但不会备份消息, 备份消息要通过镜像队列方式解决
- 队列存储在单个阶段, 交换器存储在所有节点
缺点: 如果连接的随机实例, 会有数据拉取开销. 如果是固定queue所在机器, 就会有单点问题. 如果开启了消息实例化, 消息不一定会丢.得等到实例恢复. 才能继续从这个实例拉取消息.
- 镜像队列
将需要消费的队列变为镜像队列, 存于多个节点, 实现HA高可用.
缺点: 集群内部的同步通讯会占用大量网络带宽; 非分布式, 无扩展性(所有消息都同步, 如果某个queue负载很大, 新增机器也包含了所有数据, 没办法线性扩展)
3. Kafka
分布式流处理平台
- 消息队列: 发布和定义消息流, 类似与消息队列
- 容错的持久方式存储记录消息流: kafka会消息持久化到磁盘,有效避免消息丢失风险
- 流式处理平台: 消息发布时候进行处理, 提供完整流式处理类库
应用场景: - 消息队列: 建立实时流数据管道. 以可靠的在系统或应用程序之间获取数据
- 数据处理: 构建实时流数据处理程序来转换或处理数据
与RocketMQ, RabbitMQ相比, 优势:
- 极致性能
- 生态系统兼容性
3.1 队列模型
3.1.1 队列模型
使用队列(Queue)作为消息通信载体, 满足生产者与消费者模式, 一条消息只能被一个消费者使用, 未被消费的消息在队列中保留直到消费或者超时.
存在问题: 消息发给多个消费者, 且每个消费者都能接收到完整消息内容时, 无法处理
3.1.2 发布订阅模型
发布订阅模型(Pub-Sub)使用(Topic)作为消息通信载体, 类似广播模式, 发布者发布一条消息, 该消息通过主题传递给所有的订阅者. 消息广播后才订阅是接收不到消息.
如果在发布订阅模型, 只有一个订阅者, 那么就与队列模型基本一致. 所有发布订阅模型在功能层面可以兼容队列模型
3.2 概念
生产者(Producer): 产生消息的一方
消费者(Consumer): 消费消息的一方
代理(Broker): 可看作一个独立的Kafka实例, 多个Broker组成一个Cluster(集群)
主题(Topic): 生产者将消息发送到特定主题, 消费者订阅特定主题来消费消息
分区(Partition): 属于主题的一部分, 一个Topic可以有多个分区, 并且同一主题下的分区可以分布在不同的broker上(Topic可以横跨多个Broker). 可实际对应上消息队列中的队列.
3.2.1 多副本与分区
多副本: 分区中的多个副本存在leader和follower. 生产者和消费者只与leader交互, 其他follower作为leader的备份, 保证消息存储的安全性, 发送消息会被发送给leader, follower从leader中拉取消息同步. 当leader不可用, 可以选举出新的leader.
分区和多副本的好处:
- 给特定Topic指定多个Partition, 各个Parition可分布在不同Broker上, 提供比较好的并发能力(负载均衡)
- Parittion可以指定对应的Replica数, 极大提高消息存储的安全性, 提高了容灾能力, 不过也相应增加了所需的存储空间
zookeeper在kafka中的作用
- Broker注册: 会有一个专门用来进行Broker服务器列表记录的节点(/broker/ids). 每个Broker启动都会在Zookeeper上注册. 记录Broker的IP地址和端口等信息
- Topic注册: 同一Topic会有多个分区,这些分区信息和Broker对应关系也是Zookeeper维护, 如/brokers/topics/test_topic/Parition/0, /brokers/topics/test_topic/Parition/1
- 负载均衡: 对于同一Topic的不同Parition, 尽量将Partition分到不同的Broker服务器上,生产者产生的消息也尽量投递到不同的Broker的Partition中,当消费者消费时, zookeeper根据当前的Partition数量和消费者数量来实现动态负载均衡
3.2.2 消费有序
- 一个Topic只对应一个Partition, 一个consumer, 内部单线程消费
- 发送消息时指定表/对象id作为key, 根据key发送到指定Partition
3.2.3 消息不丢失
3.2.3.1 生产者不丢失
推荐通过回调方式, 发送失败重新发送:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
推荐生产者设置retries(重试次数)设置一个比较合理的值(一般是3), 为了保证消息不丢失会设置的比较大一点. 当出现网络问题就能够自动重试消息发送. 另外还要设置重试间隔, 间隔太小重试效果不明显.
3.2.3.2 消费者者不丢失
消费者消费时, 会分配一个特定的偏移量(offset), 记录当前消费者对当前分区消费到的位置, 通过偏移量可以保证消息在分区内顺序性
存在刚拉取到消息, 消费者自动提交offset, 但是消费者自己挂掉了.消息就丢失了.
解决办法: 手动关闭自动提交offset, 消费完之后再手动提交offset.(存在问题: 如果刚消费完, 还没提交offset, 消费者挂了, 那么消息会消费两次)
注意: 新版offset存储从zookeeper转移到broker, 并使用内部位移主题__consumer_offsets进行存储
3.2.3.3 kafka消息丢失
leader所在broker突然挂, 从follower中重选leader, 但是也有部分leader数据还没被follower同步, 就会导致消息丢失
- 给生产者端设置acks=all. 默认值为1, 表示消息被leader接收就算发送成功. 设置为all代表所有副本接收到消息后该消息才算真正成功被发送
- 给生产者端设置retries=MAX. 一旦写入失败, 无限重试
- 给topic设置replication.factor >=3. 保证有follower副本能同步消息, 保证每个分区至少有3个副本. 虽造成数据冗余, 但带来数据安全性
- 给服务端设置min.insync.replicas >1. 代表消息至少写入2个副本才算被成功发送. 默认值是1, 生成应避免默认值1. 为了保证高可用, 需设置replication.factor = min.insync.replicas +1. 如果2者相等, 有一个副本挂了, 整个分区就无法工作了.
- 设置unclean.leader.election.enable=false. 表示当leader发送故障, 不会从follower副本中和leader同步程度达不到要求的副本中选择出leader, 降低消息丢失可能性.
4. zookeeper
分布式应用程序协调服务, 集群的管理者,监视集群中各节点的状态, 根据节点提交的反馈进行下一步合理操作.
4.1 znode(数据节点)
4.1.1 节点类型
- PERSISTENT(持久化目录节点): 客户端断开后, 该节点依旧存在
- PERSISTENT_SEQUENTIAL(持久化顺序编号目录节点): 客户端断开后, 该节点依旧存在, Zookeeper给该节点名称进行顺序编号
- EPHEMERAL(临时目录节点): 客户端断开后, 该节点删除
- EPHEMERAL_SEQUENTIAL(临时顺序编号目录节点): 客户端断开后, 该节点删除, Zookeeper给该节点名称进行顺序编号
4.1.2 数据结构
名称 | 状态信息 | 说明 |
---|---|---|
stat | cZxid | create ZXID,即该数据节点被创建时的事务 id |
^ | ctime | create time,即该节点的创建时间 |
^ | mZxid | modified ZXID,即该节点最终一次更新时的事务 id |
^ | mtime | modified time,即该节点最后一次的更新时间 |
^ | pZxid | 该节点的子节点列表最后一次修改时的事务 id,只有子节点列表变更才会更新 pZxid,子节点内容变更不会更新 |
^ | cversion | 子节点版本号,当前节点的子节点每次变化时值增加 1 |
^ | dataVersion | 数据节点内容版本号,节点创建时为 0,每更新一次节点内容(不管内容有无变化)该版本号的值增加 1 |
^ | aclVersion | 节点的 ACL 版本号,表示该节点 ACL 信息变更次数 |
^ | ephemeralOwner | 创建该临时节点的会话的 sessionId;如果当前节点为持久节点,则 ephemeralOwner=0 |
^ | dataLength | 数据节点内容长度 |
^ | numChildren | 当前节点的子节点个数 |
data | - | 节点存放的数据的具体内容 |
4.1.3 ACL
类似UNIX文件系统控制权限
- CREATE: 能创建子节点
- READ: 能获取节点数据和列出其子节点
- WRITE : 能设置/更新节点数据
- DELETE: 能删除子节点
- ADMIN: 能设置节点 ACL 的权限
4.2 通知机制
客户都安对某个znode创建watcher事件, 当znode变化时, 客户端收到zk通知,客户端可根据znode变化来做出业务改变
4.3 用途
- 命名服务(文件系统): 通过名字获取资源或服务地址,利用zk创建全局唯一路径, 指向集群中的服务, 提供服务的地址,或者远程对象等等
- 配置管理: 程序分布式部署在不同机器,将程序配置信息放到zk的znode下, 当匹配值发生变化, 也就是znode发生变化时, 可通过改变zk中某个节点内容, 利用watcher通知各个客户端, 从而更改配置
- 集群管理: 1. 机器加入退出(约定在父目录下创建临时节点, 监听父目录子节点变化消息. 退出时, 临时节点被删除,其他机器都收到消息; 加入时, 收到通知节点增加); 2. 选举master(选取编号为最小的机器作为master就好)
- 分布式锁: 1. 保持独占锁(将znode作为一把锁,所有客户端去创建节点, 最终成功创建的客户端拥有这把锁, 用完后删除分布式锁节点), 2. 控制时序锁(锁节点预先存在, 都在锁节点下创建临时编号目录节点, 编号最小的获得锁, 用完删除)
- 队列管理: 1. 同步队列(成员都聚齐时, 队列才可用, 否则一致等待所有成员到达. 实现方式: 约定目录下创建临时目录节点, 监听节点数目是否达到要求). 2. 队列按照FIFO入队出队(特定目录下创建持久化顺序节点, 创建成功时, watcher通知等待队列, 队列删除序号最小的节点用来消费, znode存储的数据就是消息队列中的消息内容, SEQUENTIAL序列号就是消息编号, 按序取出即可, 由于创建节点是持久化的, 不用担心队列消息丢失问题)
4.4 数据复制
zookeeper集群提供一致的数据服务, 所有机器间做数据复制, 好处:
- 容错: 一个节点出错, 别的节点可以接管他的工作
- 提高系统扩展能力: 负载分布到多个节点上, 或者增加节点来提高系统负载能力
- 提高性能: 让客户端就近访问, 提高访问速度
zk通过写任意(write any, 对数据修改可提交到任意节点). 通过增加机器, 读吞吐能力和响应能力扩展性非常好. 随机器增多,写吞吐能力下降.
4.5 事务顺序一致性
所有提议(proposal)都会加上zxid(64位数字, 高32位是epoch标识leader是否发生改变, 如果新leader产生, epoch自增; 低32位递增计数). 当新proposal时, 先向其他server发出事务指向请求, 过半机器都能指向并且能够成功, 那么就会开始执行
4.6 选举流程
每个Server由3种状态:
LOOKING: 当前server不知道leader,正在搜寻leader
LEADING: 当前server就是选举出来的leader
FOLLOWING: 当前server为选举出的follower
OBSERVING: 当前server为选举出的Observer
为何会有leader: 有些逻辑只需要集群某一台机器进行执行, 其他机器共享结果, 减少计算工作量, 提高性能, 就需要leader选举
4.6.1 基于basic paxos选举
- 选举线程由当前server发起选举的线程担当, 主要对投票结果统计, 并选出推荐server
- 选举线程向所有server发起一次询问(包括自己)
- 选举线程收到回复后,验证是否是自己发起的询问(zxid是否一致), 然后获取对方id(myid), 并存储到当前询问列表中, 最后获取对方提议的leader相关信息(id, zxid), 将这些信息存储到当次选举投票记录表中
- 收到所有server回复后, 计算zxid最大的server, 将这个server相关信息设置为下一次要投票的server
- 线程将当前zxid最大的server设置为当前server要推荐的Leader, 如果此时获胜的server获得n/2+1的票数, 设置当前推荐leader为获胜server,将根据获胜server相关信息设置自己状态, 否则继续这个过程, 知道leader被选举出来
4.6.2 基于fast paxos选举
- 将epoch+1, 某个server向所有server提议自己(zxid, id)要成为Leader发送给集群
- 其他server收到提议后, 解决epoch和zxid冲突(判断自身当前epoch是否合法), 并接收对方提议(更新自身epoch为对方推荐zxid, epoch, id), 向对方发送是哪个接收提议完成的消息
- 重复这个流程, 最后一定能选举出leader
4.6.3 同步流程
选完leader后, zk进入状态同步过程
- leader等待follower连接
- follower连接上, 将最大zxid发送给leader
- leader根据follower的zxid确定同步点
- 完成同步后通知follower以称为uptodate状态
- follower接收到uptodate消息后, 可重新接收client请求进行服务
4.6 集群角色
leader: 为客户端提供读写服务, 负责投票的发起和决议,更新系统状态
follower: 为客户端提供读服务, 写服务则转发给Leader, 参与选举过程中投票
Observer: 为客户端提供读服务, 写服务转发给Leader, 不参与选举过程中投票, 不参与"过半写成功"策略,在不影响写性能的情况下提升集群读性能
5. ElasticSearch
5.1 ES分布式架构原理
ES设计理念为分布式搜索引擎. 基于luncene. 核心思想是在多态机器上启动多个ES进程实例
- index: 存储单位为索引. 相当于数据库表.
- type: 类型. 如订单分为实体物品订单, 虚拟物品订单, ES7后, 概念废除, 所有索引默认只有一个_doc类型
- mapping: 类型的表结构定义. 相当于表字段列表
- document: 写入的一条数据.
- field: 写入数据的一个字段值.
shard: 索引可以拆分为多个shard. 每个shard存储部分数据. 好处:
- 支持横向扩展. 数据增多, 那么可以扩展机器将新增数据迁移到新机器上
- 提高性能. 数据分布在多个shard(服务器)上, 所有操作, 都会在多台机器上分布式执行, 提高吞吐量和性能.
shard还可以分为2类:
- primary shard: 负责写入数据
- replica shard: 负责同步primary shard写入的数据.
通过replica方案, 每个shard有多个备份, 某台机器宕机, 数据副本可用. 实现高可用.
master节点: 集群多个节点, 会自动选举一个节点作为master节点, 负责: 维护索引元数据, 切换primary shard和replica share角色等工作.
- master宕机则会重新选举一个节点为master节点
- 非master宕机, 那么会由master节点将此节点的primary shard角色转移到其他replica shard上. 如果当前接地点恢复. 会将本机shard还是设置为replica shard节点, 并同步后续修改的数据, 集群恢复正常.
5.2 写数据
过程:
- 客户端向任意一个node发送请求, 这个node就是coordinating node(协调节点)
- 由coordinating node对document进行路由, 将请求转发给对应的node(有primary shard)
- 实际由primary node处理请求, 然后数据同步到replica node.
- 当coordinating node发现primary node和所有replica node完成后, 返回响应结果给客户端
5.2.1 写数据底层原理
- 将写入数据写入内存buffer, 在buffer中的数据搜索不到, 同时数据写入translog日志文件
- 如果buffer快满, 或者到达一定时间(默认1s). 将内存buffer数据refresh先写入os cache(操作系统缓存), 然后再写入一个新的segment file磁盘文件. 这个segment file存储1秒内写入的数据. 只要buffer中数据刷入os cache, 数据就可以查询到.
- 写入完成, buffer清空.
- 重复上面步骤, buffer数据写入一个又一个新的segment file. 每次refresh后, buffer清空, translog保留, 随着过程推荐, translog增到, 当translog增到到一定程度, 就会触发commit操作. 此过程可通过restful api手动执行一次refresh操作.
- commit操作首先将buffer refresh到os cache. 清空buffer. 将commit point写入磁盘文件(标识这个commit point对应的所有segment file).同时强行将os cache中所有数据fsync 到磁盘文件中. 清空现有traslog 日志文件. 重启一个translog. commit完成(此操作叫flush, 默认30分钟一次). 此过程可通过restful api手动执行一次commit操作.
translog日志作用: commit之前, 数据要么停留再buffer, 要么停留在os cache. 都是在内存中, 容易丢失.写入专门的translog日志后, 一旦机器重启, es会自动读取translog日志文件数据, 恢复内存buffer和os cache数据.
translog写入: 也是写入os cache中, 定时5秒刷入一次到磁盘. 默认情况下, 5秒数据仅停留在buffer或者translog的os cache. 有可能丢失这5秒数据. 但这样性能最好. 可以这时刷盘时机: index.translog.sync_interval控制fync到磁盘时间间隔, 最小100ms. index.translog.durability有2个取值: request(每次请求都fsync, 需要等translog写入磁盘才成功,性能下降很多)和async(默认值, 每隔一段时间fsync一次)
5.2.1 删除/更新数据底层原理
- 如果是删除操作, commit会生成一个.del文件. 标识某个doc状态为deleted状态. 搜索时根据.del文件就知道这个doc是否被删除
- 如果是跟新操作, 将原doc标识为deleted状态, 写入一条新数据
buffer每refresh一次, 就会产生一个segment file. 默认1秒一个segment file, 文件会越来越多. 需要定期merge. 过程:
- 将多个segment file合并成一个. 同时将标识为deleted的doc物理删除
- 将新的segment file写入磁盘. 这里会写入一个commit point, 标识所有新的semeng file, 然后打开segment file 供搜索使用,同时删除旧的segment file
5.3 读数据
通过doc _id来查询, 根据doc _id来hash, 判断id分配到哪个shard上, 去对应shard查询. 过程:
- 客户端向任意一个node发送请求, 这个node就是coordinating node(协调节点)
- 由coordinating node对doc _id进行hash路由, 将请求转发给对应节点, 此时会使用随机轮询算法(round-robin). 在primary shard和所有replica shard中随机选择一个, 让读负载均衡
- 收到请求的node将document返回给coordinating node
- coordinating node将document返回给客户端
5.4 搜索数据
关键词搜索时, 过程:
- 客户端向任意一个node发送请求, 这个node就是coordinating node(协调节点)
- coordinating node将搜索请求转发给所有shard对应的primary shard或replica shard.
- 查询阶段(query phase): 每个shard将自己的搜索结果(就是一些doc _id)返回给协调节点, 由coordinating node对数据进行合并, 排序, 分页等操作. 产出最终结果
- 拉取节点(fetch phase): 由coordinating node根据doc _id去对应节点拉取实际document 数据, 返回给客户端
5.5 倒排索引
每个文档对应一个文档id, 文档内容被表示为一系列关键词集合. 每隔关键词会记录它在文档中出现的次数和出现位置. 倒排索引就是关键词到文档ID的映射. 每隔关键词都对应一系列的文件. 这些文件中都出现了关键词. 如
wordId | word | DocIds | other |
---|---|---|---|
1 | word | 1,2,3,4 | 文档频率信息(文档集合中有多个个文档包含某个单词) |
查询时: 搜索系统查找倒排索引, 从中读取包含查询条件单词的文档, 这些文档就是提供给用户的搜索结果.
注意:
- 倒排索引中所有词项对应一个或多个文档
- 倒排索引中的词项过根据字典顺序升序排列
5.5 查询优化
- 文件系统缓存(filesystem cache):
es搜索严重依赖文件系统缓存, 给文件系统缓存更多内存, 尽可能内存容纳所有idx segment file, 那么搜索基本走内存, 性能很高. 最佳情况下, 机器内存至少可以容纳总数据量的一半. 索引只存需要搜索的数据, 根据查询结果, 去hbase或者其他地方取出完整数据(减少索引内存占用, 使文件缓存可容纳更多数据). - 数据预热:
es每个机器写入的数据量还是超过文件系统缓存一倍, 可以使用定时任务, 将比较热点的, 经常访问的数据, 专门做一个缓存预热子系统. 对热数据每隔一段时间, 提前访问, 让数据进入filesystem cache. 下次访问性能就高很多 - 冷热分离:
将大量访问很少, 频率很低的数据, 单独写一个索引, 然后将访问很频繁的热数据单独写一个索引. 确保热数据在被预热后, 尽量留在filesystem cache. - document模型设计:
es复杂关联查询尽量别用, 性能一般不太好. 最好java系统完成关联, 数据直接写入es. - 分页性能优化:
es分页会把所有数据都查询到协调节点. 协调节点合并,处理后, 再执行分页. 如果数据量很多, 那么协调节点处理时间就变得很长. 越翻页越慢. 优化办法:- 不允许深度分页(默认深度分页性能很差)
- 使用scroll api: scroll会一次性生成所有数据的快照, 每次滑动后翻页就是通过游标scroll_id移动,获取下一页. 性能比强行分页高很多. 初始化时,必须指定scroll参数, 告诉es保持这次搜索的上下文多长时间.需要确保用户不会持翻页超过这个时间, 导致查询超时失败. 缺点: 不能随意跳到任何一页.
- search_after: 使用前一页的结果来帮助检索下一页数据, 也不允许随意翻页, 初始化使用一个唯一字段作为sort字段.