• 对于一辆马车来说,米其林轮胎再好,没有轴承和打气筒,手里只有木头时,磨圆就是最务实方案
  • 资源有限时,把现有工具用到极致,才是最优雅的设计

缓存一致性组件

省流版

  • 基于的是旁路缓存(先更新DB后删缓存、读DB后回写)方案
  • 两个极小机制,解决了读DB-回写间隙数据被更新而导致的缓存脏数据问题
    • 不是简单的延迟双删 和 缩短缓存时间
  • 借助逻辑过期机制,预判到要过期的时候,提前去进行下一轮缓存的装载,实现缓存平滑过渡

背景

  1. 需求:请求量太大 or 查询太耗资源,加入缓存层减少下游压力
  2. 没这个轮子前
    • 每次临时造轮子,简单实现有风险、复杂实现代码量大,细节注意不到
    • 大部分实现都是目标是高可用,选择旁路缓存(先更新DB后删缓存、读DB后回写)
      1. 极端情况下会出现缓存到脏数据问题:读DB和写缓存的间隙,数据被更新,缓存内就是旧数据持续到过期(即使延时双删 也会存在交叉读取更新的场景)
      2. key过期或删除,读DB期间,无数据可访问,等待期间=不可用状态(监控上看耗时就是一个波动)

优化方向

  1. 内置击穿、穿透、雪崩防护
  2. AP外,再提供CP模式应对强一致性场景
  3. 解决AP极端情况下写脏数据的问题
  4. 读DB-回写窗口期设计一个平滑过渡的方案
  5. 更友好的接入方式,开箱即用

优化点

内置击穿、穿透、雪崩防护

采用市面常用的方案

  • 击穿:缓存未命中时,只允许一个读请求去查。
    • 靠加锁来实现(这里其实不需要分布式锁,单机保证一个请求就行。除非有几万个节点)
  • 穿透:查到空数据回写空值到缓存
  • 雪崩:将过期时间打散

最终一致性(AP)模式

  1. 更新完DB后,写缓存还是删缓存?

    • 更新缓存
      • 不确定后面是否还要读 浪费内存
      • 两个挨着比较近的更新操作,再混合读请求的写缓存操作,触发乱序的可能性很高
    • 删缓存:逻辑简单、各司其职、冲突较少、业界首选
  2. 删缓存情况下,有什么问题?

    • 读请求读完DB后,没来得及写缓存,在这个期间数据被更新,读请求写进去的就是旧值

        sequenceDiagram
          participant R as 读请求
          participant W as 写请求
          participant C as 缓存(Redis)
          participant D as 数据库(DB)
      
          Note over C, D: 初始状态: DB=100, Cache=Empty
      
          R->>C: 1. Get Key
          C-->>R: Miss
      
          R->>D: 2. Read DB
          D-->>R: Value = 100
      
          Note right of R: 读请求发生阻塞/GC/网络延迟...
      
          W->>D: 3. Update DB
          Note right of D: Value = 200
      
          W->>C: 4. Del Key
          Note right of C: Cache Deleted
      
          R->>C: 5. Set Key = 100
          Note right of C: 写入旧值! (脏数据)
      
    • 一般的应对方案是 延迟双删 或 缩短过期时间。但是延迟双删仍有可能和其它读写请求穿插导致上述问题发生

    要注意,这个情况虽然极端,但这个并不是最终一致性。最终一致性描述的是:可以允许在数据同步期间读到旧值(比如consul、kafka ACK=0 没同步到从节点就立即返回),但是上述这种情形会导致脏数据一直停留在内存,直到过期。

🛑 核心设计:读写锁竞争与脏数据防护

针对这种情况,解决方案如下:

  1. 在读线程构建缓存前,获取一把锁。读完DB准备回写缓存时,需要检测这把锁
    • 如果没有被破坏,读到的肯定是新数据
    • 如果被破坏,表示读到的是旧数据,不允许回写。重新开始抢锁构建缓存
  2. 写线程更新完DB后,立即标记删除缓存,并破坏上面这把锁。等待下个读线程来构建

简而言之:

  • 读线程的锁是互相抢的,保证只有一个线程读DB
  • 更新线程可以最高优先级移除这个锁,让在更新DB期间,准备但未执行的写缓存操作失效

这些改动,只是在现有防击穿锁的基础上增加了两个操作

  • 写缓存前多了一个 GET lock
  • 更新DB后,删缓存DEL key改为了DEL key lock
  sequenceDiagram
    participant A as 线程A (读旧值)
    participant Redis
    participant DB
    participant B as 线程B (写新值)
    participant C as 线程C (读新值)

    Note over Redis: 初始状态: Key不存在 (Cold)

    %% --- 阶段1: A 抢锁查旧值 ---
    A->>Redis: 1. LuaGet (抢锁 owner=UUID_A)
    Redis-->>A: 返回 "LOCKED" (抢锁成功)
    
    A->>DB: 2. 查数据库 (耗时操作...)
    
    %% --- 阶段2: B 更新并删锁 ---
    B->>DB: 3. Update DB (Val = v2)
    B->>Redis: 4. 标记删除 (删除owner, lockEx=0)
    Note over Redis: 锁被强制释放! Key变脏

    %% --- 阶段3: A 尝试回写 (失败) ---
    DB-->>A: 返回 v1 (旧值)
    A->>Redis: 5. LuaSet (Val=v1, owner=UUID_A)
    Note right of Redis: 检查 owner: <br/>Redis存的是nil <br/>A传的是UUID_A <br/>不匹配!
    Redis-->>A: 写入失败 (旧值被丢弃)
    
    %% --- 阶段4: A 和 C 新一轮竞争 ---
    par A 重试 / C 新请求
        A->>Redis: 6. Retry LuaGet (抢锁 owner=UUID_A2)
        C->>Redis: 6. LuaGet (抢锁 owner=UUID_C)
    end

    alt A 抢到了
        Redis-->>A: "LOCKED"
        Redis-->>C: {nil, "LOCKED"} (等待)
        A->>DB: 查 DB (v2)
        A->>Redis: Set (Val=v2)
    else C 抢到了
        Redis-->>C: "LOCKED"
        Redis-->>A: {nil, "LOCKED"} (等待)
        C->>DB: 查 DB (v2)
        C->>Redis: Set (Val=v2)
    end

强一致性(CP)模式

对于一些一些不能容忍旧数据的场景,可选择强一致性模式

  • 上述AP场景下,为了可用性,读到窗口期的过期数据是直接返回的。但在强一致性场景下,所有读请求需要在更新窗口内等待,直到有查到最新数据或报错

🛑 核心设计:逻辑过期保证平滑过渡

逻辑过期

过期前主动刷新来代替过期后被动重建

  • 把每次的GET命令改为Lua: GET+TTL
  • 读线程返回Value后,根据TTL的值来做决定,如果ttl还剩2s以内,那么异步触发一个读DB+写缓存的操作
  • 假设读请求一直存在,那么读线程始终能拿到有效数据,无需等待DB查询

这样就不用等待key实际过期后,所有线程无数据可拿,开始竞争进行读DB+写缓存操作了。

  flowchart TD
    Start(读请求-最终一致模式) --> LuaCheck["Lua脚本 (Get+PTTL)"]
    LuaCheck --> CheckResult{结果分析}
    
    CheckResult -- "Value存在" --> ReturnData["直接返回 (Return)"]

    CheckResult -- "Value空" --> TryLockSync{尝试加锁}
    
    TryLockSync -- "抢到锁 (Locked)" --> SyncFetch["同步查DB更新"]
    SyncFetch --> ReturnNew["返回新值"]
    
    TryLockSync -- "没抢到 (Others holding)" --> Wait["自旋重试 (Spin & Retry)"]
    Wait --> LuaCheck

    ReturnData -.-> |"若TTL 0.5-2s (逻辑过期)"| TryLockAsync{尝试加锁}
    TryLockAsync -- "抢到锁" --> AsyncFetch["异步查DB更新"]

SDK函数签名

  • 封装好的函数,开箱即用,屏蔽所有干扰业务开发的逻辑,传入查询函数即可
  • 用泛型省去了各种序列化、反序列化、断言,返回的就是最终可用的类型
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type (
	Option struct {
		// 强一致性or最终一致性、过期时间、锁时间、等待redis从库时间...
	}
	OptionFunc        func(*Option)
	CacheStore[K, V any] struct {
		key    string
		redis *redis.Client
		Option
        Codec  // 编解码器 默认是json。对于值为基础类型的情况,也可以自定义 
	}
)

func (c *CacheStore[K, V]) Get(ctx context.Context, key K, getFn func(ctx context.Context, key K) (*V, error)) (*V, error){
    ...
}

func (c *CacheStore[K, V]) Delete(ctx context.Context, key string) error {
    ...
}

type User struct {
    ...
}

func example(ctx context.Context) {
	cache := CacheStore[int, User]{}

	user, err := cache.Get(ctx, 123, func(ctx context.Context, key int) (*User, error) {
		// 查询DB或下游
		return nil, nil
	})
}

轻量分布式延时队列

省流版

  • 使用Redis+Lua实现
  • 实现了无锁抢占任务,解决了传统定时任务抢锁 扫ZSET只能单机执行的问题,
  • 增加ACK和重试机制,实现At Least Once语义,保证消息不丢
  • 死信队列+各种监控,出故及时感知和处理
  • 实现方式是多个redis队列,用Lua保证任务在各个队列间流转的原子性

背景

  1. 需求:某一时刻产生任务、过一段时间后才执行
  2. 没这个轮子前
    • 内存里面实现时间轮算法
      • 直接pass:单点问题、这个调度服务无法加减机器
    • mysql延时队列表
      • qps上去了扛不住
      • 这种阅后即焚的场景,不需要一直存着
    • ZSET存储任务paylod和过期时间,起定时任务,拿出来一批后执行
      • 多个机器抢锁,谁抢到谁执行
        • 负载不均衡、网络快性能好的一直抢到锁
        • 超长任务+没有确认机制,遇到一个宕机或服务发布,后面任务全丢
      • 失败了没有好的处理方法,要么直接丢掉,要么写一大堆重试的代码
      • 实现方式不规范,导致各种隐藏bug,这里不展开
      • 常见的把body直接塞到member中,且没注意分片,执行一次就是一个大key读
        • zset分数相同的时候,会对member进行字典序,member越长开销越大
  3. 为什么不用RocketMQ、RabbitMQ等?
    • 公司现有的运维体系里面没这个东西
    • 成本:同配置价格相差十倍(RocketMQ vs Redis)
    • 宗旨:基于现有的东西,借鉴优秀方案,满足当下的需求

优化和架构

优化点

🛑 核心设计:无锁化并发消费与高可靠保障

  1. 方向:扩展性和高性能
    • 实现并发消费任务
      • 去掉了锁,在Lua内执行 取任务+删任务
      • 依然用zset做调度,但是取出来后不直接给上层使用,而是推到list
      • 消费节点用无锁的方式去list抢任务,这样就能横向扩容提升消费能力了
    • 其它优化
      • member存任务id,payload单独存string,减小zset体积
      • hashTag 支持集群版
  2. 方向:高可靠
    • 增加了ACK机制,保证每个任务至少被处理一次
    • 内部设有重试机制,失败次数到达设定值移入死信队列
    • 死信队列
      • 增加监控,有问题及时被感知
      • 配套的后台能直接看到body,处理好bug后一键推回重试队列,自动执行
    • 不同topic互相隔离、支持队列key分片,避免大key。
    • 队列长度加监控,有堆积就告警

具体实现

  • 数据结构(redis)
    1. 延迟调度 zset:delay
      • member: task_id
      • score: 时间戳
    2. 任务信息 string:message
      • key: task_id
      • value: payload
    3. 待处理队列 list:todo
    4. 待确认列表 zset:ack
    5. 死信队列: set:dead
    6. 重试次数计数器 retry_counter
  • 流转过程
    1. 生产者生产消息
      • delay_zset添加一条任务
      • 消息存储到payload
    2. 消费节点定时执行下面的过程
      1. 搬运:从delay_zset搬任务到todo_list,同时添加到ack_zset
      2. 获取任务:从todo_list pop任务出来执行
      3. 执行和确认
        • 执行函数返回err=nil表示确认
        • 从ack_zset移除
      4. 失败重试
        • 执行函数返回err!=nil表示失败
        • ack_zset取出来放回todo_list,并在retry_counter计数
        • 当retry_counter到达阈值,不再放回todo_list,而是放到dead_set
          • 触发告警
    3. 上述任务在不同队列中的流转,都用Lua脚本保证原子执行
  • 定时任务
    1. 上述展示的是消费任务
    2. 消费者还需要go出来一个监控定时任务,监测 delay_zset 和 todo_list,积压超过某个量级就告警

架构图

  flowchart
    %% --- 节点定义 ---
    subgraph Keys[Redis 核心数据结构]
        DelayZSet[("<b>ZSet: delay</b><br>延时等待队列")]
        TodoList[("<b>List: todo</b><br>就绪队列")]
        DeadSet[("<b>Set: dead</b><br>死信队列")]
        AckZSet[("<b>ZSet: ack</b><br>等待确认队列")]
        MsgString[("<b>String: message_string</b><br>消息体存储")]
    end

    subgraph Producer [生产者]
        StartProd(开始投递)
        GenID[生成 UUID]
        StorePayload[存储消息体<br>TTL=Delay+7d]
        AddToPending[加入延时队列<br>Score=Now+Delay]
    end

    subgraph Worker ["消费者(多节点并行)"]
        Transfer(1.任务搬运)
        CheckTimeout(6.兜底/超时检查)
        Fetch(2.拉取任务)
        RunHandler[3.执行业务回调]
        Ack["<b>4. ACK</b>"]
        Fail{5. 重试次数 < 阈值?}
        Retry["5a. 重回就绪队列"]
        Die["5b. 移入死信"]
    end

    %% --- 连线定义 (按颜色分组以保证 linkStyle 顺序) ---

    %% 1. 🔵 生产流 (Blue)
    StartProd --> GenID
    GenID --> StorePayload
    GenID --> AddToPending
    StorePayload -.-> |SET xx payload EX TTL|MsgString
    AddToPending -.-> |ZAdd xx Score UUID| DelayZSet

    %% 2. 🟠 调度流 - 搬运 & 兜底 (Orange)
    %%Transfer -->|"触发搬运Lua:<b>ZRangeByScore+ZRemRangeByScore+LPush</b><br>如果出现超大堆积,可以考虑改为<br>ZRangeByScore limit n+ZREM 1...n+LPush"| DelayZSet
    Transfer -->|"触发搬运Lua:<b>ZRangeByScore+ZRemRangeByScore+LPush</b>"| DelayZSet
    DelayZSet -.->|转移| TodoList
    CheckTimeout -->|"扫描超时"| AckZSet
    AckZSet -.->|"重回队列"| TodoList
    
    %% 3. 🟢 消费流 - 拉取与成功 (Green)
    Fetch -->|Lua: RPOP + ZADD| TodoList
    TodoList -.->|2.1 原子移动| AckZSet
    AckZSet -.- MsgString
    MsgString --> |2.2 GET payload+task_id|RunHandler
    RunHandler -->|Success| Ack
    Ack -.->|ZRem<br>移除 UnAck| AckZSet
    Ack -.->|Del<br>移除 MsgKey| MsgString

    %% 4. 🔴 失败与重试流 (Red)
    RunHandler -->|Error| Fail
    Fail -->|"是"| Retry
    Retry -->|"计数+1"| TodoList
    Fail -->|"否"| Die
    Die --> DeadSet
    
    %% 5. 🟣 管理流 (Purple)
    subgraph Admin [后台管理]
        ManualRepush(死信重推/清理)
    end
    DeadSet -.-> |介入| ManualRepush
    ManualRepush -.-> |SREM+LPUSH| TodoList

    %% --- 样式定义 ---
    %% 🔵 生产流样式 (索引 0-4)
    linkStyle 0,1,2,3,4 stroke:#2962FF,stroke-width:2px;
    
    %% 🟠 调度流样式 (索引 5-8)
    linkStyle 5,6,7,8 stroke:#FBC02D,stroke-width:2px;
    
    %% 🟢 消费流样式 (索引 9-15)
    linkStyle 9,10,11,13,14,15 stroke:#00C853,stroke-width:2px;
    
    %% 🔴 失败流样式 (索引 16-20)
    linkStyle 16,17,18,19,20 stroke:#D50000,stroke-width:2px,stroke-dasharray: 5 5;

    %% 🟣 管理流样式 (索引 21-22)
    linkStyle 21,22 stroke:#AA00FF,stroke-width:2px,stroke-dasharray: 5 5;

    %% 节点样式微调
    style TodoList fill:#c8e6c9,stroke:#00C853,stroke-width:2px
    style AckZSet fill:#ffecb3,stroke:#FF6D00,stroke-width:2px
    style DelayZSet fill:#fff9c4,stroke:#FBC02D,stroke-width:2px
    style ManualRepush fill:#e1bee7,stroke:#AA00FF,stroke-width:2px
    style CheckTimeout fill:#ffe0b2,stroke:#FF6D00,stroke-width:2px,stroke-dasharray: 5 5
    style DeadSet fill:#ffcdd2,stroke:#D50000,stroke-width:2px
  sequenceDiagram
    autonumber
    participant C as 消费者(Consumer)
    participant Z as ZSet(delay_zset)
    participant L as List(todo_list)
    participant A as Ack(ack_zset)
    participant S as String(payload)
    participant D as Set(dead_set)

    Note over C, D: 1. 搬运阶段
    C->>Z: ZRANGEBYSCORE AND ZREMBYSCORE
    Z-->>C: List[task_id]

    loop Every Task
        C->>L: Lua: LPUSH
        C->>A: Lua: ZADD
        Note right of C: 原子操作: 搬运+进入Ack队列
    end

    Note over C, D: 2. 消费阶段 (Loop)
    C->>L: BRPOP
    L-->>C: task_id

    C->>S: GET task_id
    S-->>C: payload_content

    C->>C: 执行业务逻辑(Execute)

    Note over C, S: 3.1 执行成功+ACK
    alt Success (无报错)
        C->>A: ZREM task_id (Ack)
        C->>S: DEL task message
    Note over C, D: 3.2 执行失败 触发重试
    else Failed (报错)
        C->>S: INCR retry_counter
        S-->>C: current_count

        alt count < limit
            C->>A: Lua: ZREM
            C->>L: Lua: RPUSH
            Note right of C: 重新放回待处理队列
        else count >= limit
            C->>A: Lua: ZREM
            C->>D: Lua: SADD
            Note right of C: 移入死信队列
            C->>C: Alert/Log
        end
    end

函数签名

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type (
	Option struct {
		// 重试次数、超时时间、是否死信等参数
		...
	}
	OptionFunc        func(*Option)
	DelayQueue[T any] struct {
		topic string
		redis *redis.Client
		Options
	}
)

// 生产
func (d *DelayQueue[T]) Produce(ctx context.Context, payload *T, delay time.Duration) error {
    ...
}

// 消费
func (d *DelayQueue[T]) Consume(ctx context.Context, taskFunc func(message *T) error, options ...OptionFunc) error {

}