Redis使用模式
数据结构选型
| 数据结构 | 使用场景 | 常用命令 |
|---|---|---|
| String | 缓存、计数器、标志位、JSON 数据 | GET/SET/INCR/SETEX |
| Hash | 用户信息、对象属性 | HGET/HSET/HMGET/HINCRBY |
| List | 消息队列、动态列表、栈 | LPUSH/RPUSH/LPOP/LRANGE/BLPOP |
| Set | 独立访客、标签、社交关系 | SADD/SMEMBERS/SINTER/SUNION |
| Sorted Set | 排行榜、限流、优先级队列 | ZADD/ZRANGE/ZRANK/ZINCRBY |
| HyperLogLog | 近似唯一计数(误差 0.81%) | PFADD/PFCOUNT/PFMERGE |
| Stream | 事件日志、消息代理 | XADD/XREAD/XGROUP/XACK |
| Bitmap | 日活用户、按用户的功能标志 | SETBIT/GETBIT/BITCOUNT |
缓存模式
# Cache-Aside(旁路缓存)——最常用
def get_user(user_id):
key = f"user:{user_id}"
cached = redis.get(key)
if cached:
return json.loads(cached) # 命中缓存
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
redis.setex(key, 3600, json.dumps(user)) # TTL 1小时
return user
# 写穿(Write-Through)——写时同步更新缓存
def update_user(user_id, data):
db.update("users", user_id, data)
redis.setex(f"user:{user_id}", 3600, json.dumps(data))
# 防缓存击穿(互斥锁)
def get_with_lock(key, ttl, fetch_fn):
val = redis.get(key)
if val: return val
lock_key = f"lock:{key}"
if redis.set(lock_key, 1, nx=True, ex=10):
try:
val = fetch_fn()
redis.setex(key, ttl, val)
return val
finally:
redis.delete(lock_key)
else:
time.sleep(0.1)
return redis.get(key)
排行榜与限流
# 排行榜(有序集合)
ZADD leaderboard:weekly 1500 "user:42" # 设置分数
ZINCRBY leaderboard:weekly 100 "user:42" # 增加分数
ZREVRANK leaderboard:weekly "user:42" # 名次(从0开始)
ZREVRANGE leaderboard:weekly 0 9 WITHSCORES # 前10名
# 滑动窗口限流
def is_rate_limited(user_id, max_requests=100, window_sec=60):
key = f"ratelimit:{user_id}"
now = time.time()
pipe = redis.pipeline()
pipe.zremrangebyscore(key, 0, now - window_sec)
pipe.zadd(key, {str(now): now})
pipe.zcard(key)
pipe.expire(key, window_sec)
_, _, count, _ = pipe.execute()
return count > max_requests
分布式锁
# 单实例锁(简单,非 HA)
lock_key = "lock:resource:42"
lock_val = str(uuid.uuid4()) # 唯一值防止误删
# 原子获取锁:SET NX + EX
acquired = redis.set(lock_key, lock_val, nx=True, ex=30)
# 释放锁(Lua 保证原子性)
RELEASE_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
redis.eval(RELEASE_SCRIPT, 1, lock_key, lock_val)
# 会话存储
def create_session(user_id):
token = secrets.token_hex(32)
data = json.dumps({"user_id": user_id, "created": time.time()})
redis.setex(f"session:{token}", 86400, data) # 24小时
return token