本文聚焦 Python 通过 redis-py 库操作 Redis 的核心技术,从环境准备到连接管理、五大数据类型 API、Pipeline 批量加速以及异常重试机制,提供可直接落地的代码实践。适合需要将 Redis 集成到 Python 后端、缓存系统或任务队列的开发人员。
一、环境准备与安装
redis-py 是 Redis 官方推荐的 Python 客户端,纯 Python 实现,API 与 redis-cli 命令几乎一一对应。安装命令:
安装后可使用 pip show redis 检查版本。本文以 redis-py 5.x 为例,建议使用 4.0 以上版本以获得更好的类型提示和异步支持。
二、创建连接:从单机到生产规范
2.1 基础连接
最简单的连接方式如下,decode_responses=True 开关必须开启,否则返回值会以 bytes 形式出现,需要手动解码。
- import redis
- r = redis.Redis(host='localhost', port=6379, decode_responses=True)
- print(r.ping()) # True
复制代码
2.2 带密码和数据库编号的连接
通过 db 参数指定数据库编号(默认 16 个数据库,0-15)。
- r = redis.Redis(
- host='192.168.1.100',
- port=6379,
- password='your_password',
- db=1,
- decode_responses=True
- )
复制代码
2.3 使用 URL 连接(现代推荐)
从配置文件中读取连接信息时,URL 格式更方便。注意密码前加冒号。
- import os
- r = redis.from_url('redis://:mypassword@localhost:6379/0', decode_responses=True)
- print(r.ping())
- # 永远不要硬编码密码,使用环境变量
- r = redis.from_url(os.getenv('REDIS_URL'))
复制代码
三、五大基础数据类型 API 速查
redis-py 的方法名与 redis-cli 命令一致,参数顺序相同。以下逐一演示。
3.1 String 操作- r = redis.Redis(decode_responses=True)
- r.set('name', 'Alice')
- print(r.get('name')) # Alice
- r.mset({'age': 30, 'city': 'Beijing'})
- print(r.mget(['age', 'city'])) # ['30', 'Beijing']
- # 设置过期时间(ex = 秒)、仅当键不存在时设置(nx)
- r.set('token', 'abc', ex=10)
- r.set('lock', '1', nx=True, ex=30)
- # 数字操作
- r.set('counter', 0)
- r.incr('counter')
- r.incrby('counter', 5)
- print(r.get('counter')) # 6
- # 字符串操作
- r.set('greet', 'Hello')
- r.append('greet', ' Redis')
- print(r.get('greet')) # Hello Redis
- print(r.strlen('greet')) # 11
- print(r.getrange('greet', 0, 4)) # Hello
复制代码
3.2 Hash 操作- r.hset('user:1001', mapping={'name': 'Bob', 'age': '25', 'city': 'Shanghai'})
- print(r.hgetall('user:1001')) # {'name': 'Bob', 'age': '25', 'city': 'Shanghai'}
- print(r.hget('user:1001', 'name')) # Bob
- print(r.hmget('user:1001', ['name', 'age'])) # ['Bob', '25']
- r.hincrby('user:1001', 'age', 1)
- print(r.hget('user:1001', 'age')) # 26
- print(r.hexists('user:1001', 'city')) # True
- r.hdel('user:1001', 'city')
- print(r.hlen('user:1001')) # 2
复制代码
3.3 List 操作- r.lpush('queue', 'task1', 'task2') # queue: ['task2', 'task1']
- r.rpush('queue', 'task3') # queue: ['task2', 'task1', 'task3']
- print(r.lrange('queue', 0, -1)) # ['task2', 'task1', 'task3']
- print(r.lpop('queue')) # task2
- print(r.rpop('queue')) # task3
- # 阻塞弹出:r.brpop('queue', timeout=5) # 返回 (key, value) 或 None
- print(r.llen('queue')) # 1
- print(r.lindex('queue', 0)) # task1
- r.lpush('log', *['a','b','c','d','e'])
- r.ltrim('log', 0, 2)
- print(r.lrange('log', 0, -1)) # ['e', 'd', 'c']
复制代码
3.4 Set 操作- r.sadd('tags:python', 'redis', 'django', 'flask')
- print(r.smembers('tags:python')) # {'flask', 'django', 'redis'}(无序)
- print(r.sismember('tags:python', 'django')) # True
- print(r.scard('tags:python')) # 3
- r.srem('tags:python', 'flask')
- # 集合运算
- r.sadd('tags:web', 'django', 'fastapi')
- print(r.sinter('tags:python', 'tags:web')) # {'django'}
- print(r.sunion('tags:python', 'tags:web')) # {'redis', 'django', 'fastapi'}
- print(r.sdiff('tags:python', 'tags:web')) # {'redis'}
- print(r.spop('tags:python')) # 随机弹出一个元素
复制代码
3.5 Sorted Set 操作- r.zadd('leaderboard', {'Alice': 100, 'Bob': 85, 'Charlie': 92})
- print(r.zrange('leaderboard', 0, -1, withscores=True))
- # [('Bob', 85.0), ('Charlie', 92.0), ('Alice', 100.0)]
- print(r.zrevrange('leaderboard', 0, 1, withscores=True))
- # [('Alice', 100.0), ('Charlie', 92.0)]
- r.zincrby('leaderboard', 10, 'Bob')
- print(r.zscore('leaderboard', 'Bob')) # 95.0
- print(r.zrank('leaderboard', 'Bob')) # 0(从低到高排名)
- print(r.zrevrank('leaderboard', 'Bob')) # 2(从高到低排名)
- print(r.zrangebyscore('leaderboard', 90, 100))
- # ['Charlie', 'Alice']
复制代码
四、连接池管理:告别频繁握手
每次新建 Redis 连接都需要 TCP 三次握手和认证,高并发场景下会浪费资源并可能超过 maxclients 限制。连接池预先创建一组连接,请求时借、用完还,连接复用。
- pool = redis.ConnectionPool(
- host='localhost',
- port=6379,
- decode_responses=True,
- max_connections=20 # 池中最大连接数
- )
- r1 = redis.Redis(connection_pool=pool)
- r2 = redis.Redis(connection_pool=pool)
- print(r1.ping())
- print(r2.ping())
- pool.disconnect() # 应用退出时释放所有连接
复制代码
生产环境建议封装为工厂函数,从环境变量读取配置,并设置超时和健康检查:
- import os
- def get_redis_client():
- pool = redis.ConnectionPool(
- host=os.getenv('REDIS_HOST', 'localhost'),
- port=int(os.getenv('REDIS_PORT', 6379)),
- password=os.getenv('REDIS_PASSWORD', None),
- db=int(os.getenv('REDIS_DB', 0)),
- decode_responses=True,
- max_connections=int(os.getenv('REDIS_MAX_CONNS', 50)),
- socket_timeout=5,
- socket_connect_timeout=5,
- health_check_interval=30,
- )
- return redis.Redis(connection_pool=pool)
复制代码
注意:在多线程环境下应全局共享一个连接池和一个 Redis 实例(redis-py 的 Redis 对象是线程安全的)。多进程(如 gunicorn)则每个 worker 进程创建自己的连接池。
五、Pipeline:批量命令的加速器
每次 Redis 命令都要经历一次网络往返(RTT)。Pipeline 将多个命令打包一次性发送,大幅减少网络开销。性能对比:
- import time
- r = redis.Redis(decode_responses=True)
- # 无 Pipeline:1000 次 SET
- start = time.perf_counter()
- for i in range(1000):
- r.set(f'key:{i}', f'value:{i}')
- print(f'无 Pipeline 耗时: {time.perf_counter() - start:.3f}s')
- # 使用 Pipeline
- r.flushdb()
- start = time.perf_counter()
- pipe = r.pipeline()
- for i in range(1000):
- pipe.set(f'key:{i}', f'value:{i}')
- pipe.execute()
- print(f'使用 Pipeline 耗时: {time.perf_counter() - start:.3f}s')
复制代码 本地测试中 Pipeline 耗时往往仅为循环写法的几十分之一。Pipeline 还支持链式调用并获取每个命令的返回结果:
- pipe = r.pipeline()
- pipe.set('name', 'Alice').incr('counter').get('name')
- results = pipe.execute()
- print(results) # [True, 1, 'Alice']
复制代码
如果需要原子性事务,可在 Pipeline 中启用 transaction=True:
- pipe = r.pipeline(transaction=True)
- pipe.set('balance', 100)
- pipe.incrby('balance', 50)
- pipe.execute() # 在 MULTI/EXEC 中执行
复制代码
最佳实践:一次 Pipeline 中的命令数量建议控制在 1000 以内,避免客户端内存溢出和长时间阻塞 Redis 主线程。Pipeline 会独占一个连接,执行完毕后归还连接池。
六、异常处理与重试机制
网络抖动时,需要捕获 ConnectionError 和 TimeoutError 并实现重试。手工实现退避重试:
- from redis.exceptions import ConnectionError, TimeoutError
- import time
- def safe_incr(r, key, retries=3):
- for attempt in range(retries):
- try:
- return r.incr(key)
- except (ConnectionError, TimeoutError) as e:
- print(f'第 {attempt+1} 次重试: {e}')
- time.sleep(0.1 * (attempt + 1))
- raise Exception(f'操作失败,已重试 {retries} 次')
复制代码
redis-py 4.x 以上版本也内置了 retry_on_timeout 和 retry_on_error 参数,可直接在连接时配置:
- r = redis.Redis(
- host='localhost',
- retry_on_timeout=True,
- retry_on_error=[ConnectionError, TimeoutError],
- health_check_interval=30,
- )
复制代码
七、实战练习建议
1. 连接池挑战:创建 max_connections=5 的连接池,启动 10 个线程同时执行 GET 操作,观察线程是否阻塞,验证连接复用行为。
2. Pipeline 性能测试:准备 5000 个键值对,分别用 for 循环 SET 和 Pipeline 写入,再分别用 GET 循环和 MGET 读取,记录耗时。
3. 缓存读写函数:编写 get_cached_data(key),先从 Redis 读取,不存在时模拟从数据库查询并写入 Redis(带 TTL),第二次调用应命中缓存。
八、总结
本文完整覆盖了 Python 通过 redis-py 操作 Redis 的核心技术:安装、连接(包括 URL 方式)、五大数据类型 API、连接池管理、Pipeline 批量加速以及异常重试。掌握这些内容后,你已能写出工程级的 Redis 交互逻辑。后续可深入事务、Lua 脚本、序列化以及异步 redis.asyncio 等进阶主题。 |