查看: 99|回复: 1

Python操作Redis核心方法:连接池、Pipeline与五大类型详解

[复制链接]
发表于 1 小时前 | 显示全部楼层 |阅读模式
本文聚焦 Python 通过 redis-py 库操作 Redis 的核心技术,从环境准备到连接管理、五大数据类型 API、Pipeline 批量加速以及异常重试机制,提供可直接落地的代码实践。适合需要将 Redis 集成到 Python 后端、缓存系统或任务队列的开发人员。

一、环境准备与安装
redis-py 是 Redis 官方推荐的 Python 客户端,纯 Python 实现,API 与 redis-cli 命令几乎一一对应。安装命令:
  1. pip install redis
复制代码

安装后可使用 pip show redis 检查版本。本文以 redis-py 5.x 为例,建议使用 4.0 以上版本以获得更好的类型提示和异步支持。

二、创建连接:从单机到生产规范
2.1 基础连接
最简单的连接方式如下,decode_responses=True 开关必须开启,否则返回值会以 bytes 形式出现,需要手动解码。
  1. import redis
  2. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  3. print(r.ping())  # True
复制代码

2.2 带密码和数据库编号的连接
通过 db 参数指定数据库编号(默认 16 个数据库,0-15)。
  1. r = redis.Redis(
  2.     host='192.168.1.100',
  3.     port=6379,
  4.     password='your_password',
  5.     db=1,
  6.     decode_responses=True
  7. )
复制代码

2.3 使用 URL 连接(现代推荐)
从配置文件中读取连接信息时,URL 格式更方便。注意密码前加冒号。
  1. import os
  2. r = redis.from_url('redis://:mypassword@localhost:6379/0', decode_responses=True)
  3. print(r.ping())
  4. # 永远不要硬编码密码,使用环境变量
  5. r = redis.from_url(os.getenv('REDIS_URL'))
复制代码

三、五大基础数据类型 API 速查
redis-py 的方法名与 redis-cli 命令一致,参数顺序相同。以下逐一演示。
3.1 String 操作
  1. r = redis.Redis(decode_responses=True)
  2. r.set('name', 'Alice')
  3. print(r.get('name'))  # Alice
  4. r.mset({'age': 30, 'city': 'Beijing'})
  5. print(r.mget(['age', 'city']))  # ['30', 'Beijing']
  6. # 设置过期时间(ex = 秒)、仅当键不存在时设置(nx)
  7. r.set('token', 'abc', ex=10)
  8. r.set('lock', '1', nx=True, ex=30)
  9. # 数字操作
  10. r.set('counter', 0)
  11. r.incr('counter')
  12. r.incrby('counter', 5)
  13. print(r.get('counter'))  # 6
  14. # 字符串操作
  15. r.set('greet', 'Hello')
  16. r.append('greet', ' Redis')
  17. print(r.get('greet'))  # Hello Redis
  18. print(r.strlen('greet'))  # 11
  19. print(r.getrange('greet', 0, 4))  # Hello
复制代码

3.2 Hash 操作
  1. r.hset('user:1001', mapping={'name': 'Bob', 'age': '25', 'city': 'Shanghai'})
  2. print(r.hgetall('user:1001'))  # {'name': 'Bob', 'age': '25', 'city': 'Shanghai'}
  3. print(r.hget('user:1001', 'name'))  # Bob
  4. print(r.hmget('user:1001', ['name', 'age']))  # ['Bob', '25']
  5. r.hincrby('user:1001', 'age', 1)
  6. print(r.hget('user:1001', 'age'))  # 26
  7. print(r.hexists('user:1001', 'city'))  # True
  8. r.hdel('user:1001', 'city')
  9. print(r.hlen('user:1001'))  # 2
复制代码

3.3 List 操作
  1. r.lpush('queue', 'task1', 'task2')  # queue: ['task2', 'task1']
  2. r.rpush('queue', 'task3')          # queue: ['task2', 'task1', 'task3']
  3. print(r.lrange('queue', 0, -1))    # ['task2', 'task1', 'task3']
  4. print(r.lpop('queue'))  # task2
  5. print(r.rpop('queue'))  # task3
  6. # 阻塞弹出:r.brpop('queue', timeout=5)  # 返回 (key, value) 或 None
  7. print(r.llen('queue'))  # 1
  8. print(r.lindex('queue', 0))  # task1
  9. r.lpush('log', *['a','b','c','d','e'])
  10. r.ltrim('log', 0, 2)
  11. print(r.lrange('log', 0, -1))  # ['e', 'd', 'c']
复制代码

3.4 Set 操作
  1. r.sadd('tags:python', 'redis', 'django', 'flask')
  2. print(r.smembers('tags:python'))  # {'flask', 'django', 'redis'}(无序)
  3. print(r.sismember('tags:python', 'django'))  # True
  4. print(r.scard('tags:python'))  # 3
  5. r.srem('tags:python', 'flask')
  6. # 集合运算
  7. r.sadd('tags:web', 'django', 'fastapi')
  8. print(r.sinter('tags:python', 'tags:web'))  # {'django'}
  9. print(r.sunion('tags:python', 'tags:web'))  # {'redis', 'django', 'fastapi'}
  10. print(r.sdiff('tags:python', 'tags:web'))  # {'redis'}
  11. print(r.spop('tags:python'))  # 随机弹出一个元素
复制代码

3.5 Sorted Set 操作
  1. r.zadd('leaderboard', {'Alice': 100, 'Bob': 85, 'Charlie': 92})
  2. print(r.zrange('leaderboard', 0, -1, withscores=True))
  3. # [('Bob', 85.0), ('Charlie', 92.0), ('Alice', 100.0)]
  4. print(r.zrevrange('leaderboard', 0, 1, withscores=True))
  5. # [('Alice', 100.0), ('Charlie', 92.0)]
  6. r.zincrby('leaderboard', 10, 'Bob')
  7. print(r.zscore('leaderboard', 'Bob'))  # 95.0
  8. print(r.zrank('leaderboard', 'Bob'))   # 0(从低到高排名)
  9. print(r.zrevrank('leaderboard', 'Bob')) # 2(从高到低排名)
  10. print(r.zrangebyscore('leaderboard', 90, 100))
  11. # ['Charlie', 'Alice']
复制代码

四、连接池管理:告别频繁握手
每次新建 Redis 连接都需要 TCP 三次握手和认证,高并发场景下会浪费资源并可能超过 maxclients 限制。连接池预先创建一组连接,请求时借、用完还,连接复用。
  1. pool = redis.ConnectionPool(
  2.     host='localhost',
  3.     port=6379,
  4.     decode_responses=True,
  5.     max_connections=20  # 池中最大连接数
  6. )
  7. r1 = redis.Redis(connection_pool=pool)
  8. r2 = redis.Redis(connection_pool=pool)
  9. print(r1.ping())
  10. print(r2.ping())
  11. pool.disconnect()  # 应用退出时释放所有连接
复制代码

生产环境建议封装为工厂函数,从环境变量读取配置,并设置超时和健康检查:
  1. import os
  2. def get_redis_client():
  3.     pool = redis.ConnectionPool(
  4.         host=os.getenv('REDIS_HOST', 'localhost'),
  5.         port=int(os.getenv('REDIS_PORT', 6379)),
  6.         password=os.getenv('REDIS_PASSWORD', None),
  7.         db=int(os.getenv('REDIS_DB', 0)),
  8.         decode_responses=True,
  9.         max_connections=int(os.getenv('REDIS_MAX_CONNS', 50)),
  10.         socket_timeout=5,
  11.         socket_connect_timeout=5,
  12.         health_check_interval=30,
  13.     )
  14.     return redis.Redis(connection_pool=pool)
复制代码

注意:在多线程环境下应全局共享一个连接池和一个 Redis 实例(redis-py 的 Redis 对象是线程安全的)。多进程(如 gunicorn)则每个 worker 进程创建自己的连接池。

五、Pipeline:批量命令的加速器
每次 Redis 命令都要经历一次网络往返(RTT)。Pipeline 将多个命令打包一次性发送,大幅减少网络开销。性能对比:
  1. import time
  2. r = redis.Redis(decode_responses=True)
  3. # 无 Pipeline:1000 次 SET
  4. start = time.perf_counter()
  5. for i in range(1000):
  6.     r.set(f'key:{i}', f'value:{i}')
  7. print(f'无 Pipeline 耗时: {time.perf_counter() - start:.3f}s')
  8. # 使用 Pipeline
  9. r.flushdb()
  10. start = time.perf_counter()
  11. pipe = r.pipeline()
  12. for i in range(1000):
  13.     pipe.set(f'key:{i}', f'value:{i}')
  14. pipe.execute()
  15. print(f'使用 Pipeline 耗时: {time.perf_counter() - start:.3f}s')
复制代码
本地测试中 Pipeline 耗时往往仅为循环写法的几十分之一。Pipeline 还支持链式调用并获取每个命令的返回结果:
  1. pipe = r.pipeline()
  2. pipe.set('name', 'Alice').incr('counter').get('name')
  3. results = pipe.execute()
  4. print(results)  # [True, 1, 'Alice']
复制代码

如果需要原子性事务,可在 Pipeline 中启用 transaction=True:
  1. pipe = r.pipeline(transaction=True)
  2. pipe.set('balance', 100)
  3. pipe.incrby('balance', 50)
  4. pipe.execute()  # 在 MULTI/EXEC 中执行
复制代码

最佳实践:一次 Pipeline 中的命令数量建议控制在 1000 以内,避免客户端内存溢出和长时间阻塞 Redis 主线程。Pipeline 会独占一个连接,执行完毕后归还连接池。

六、异常处理与重试机制
网络抖动时,需要捕获 ConnectionError 和 TimeoutError 并实现重试。手工实现退避重试:
  1. from redis.exceptions import ConnectionError, TimeoutError
  2. import time
  3. def safe_incr(r, key, retries=3):
  4.     for attempt in range(retries):
  5.         try:
  6.             return r.incr(key)
  7.         except (ConnectionError, TimeoutError) as e:
  8.             print(f'第 {attempt+1} 次重试: {e}')
  9.             time.sleep(0.1 * (attempt + 1))
  10.     raise Exception(f'操作失败,已重试 {retries} 次')
复制代码

redis-py 4.x 以上版本也内置了 retry_on_timeout 和 retry_on_error 参数,可直接在连接时配置:
  1. r = redis.Redis(
  2.     host='localhost',
  3.     retry_on_timeout=True,
  4.     retry_on_error=[ConnectionError, TimeoutError],
  5.     health_check_interval=30,
  6. )
复制代码

七、实战练习建议
1. 连接池挑战:创建 max_connections=5 的连接池,启动 10 个线程同时执行 GET 操作,观察线程是否阻塞,验证连接复用行为。
2. Pipeline 性能测试:准备 5000 个键值对,分别用 for 循环 SET 和 Pipeline 写入,再分别用 GET 循环和 MGET 读取,记录耗时。
3. 缓存读写函数:编写 get_cached_data(key),先从 Redis 读取,不存在时模拟从数据库查询并写入 Redis(带 TTL),第二次调用应命中缓存。

八、总结
本文完整覆盖了 Python 通过 redis-py 操作 Redis 的核心技术:安装、连接(包括 URL 方式)、五大数据类型 API、连接池管理、Pipeline 批量加速以及异常重试。掌握这些内容后,你已能写出工程级的 Redis 交互逻辑。后续可深入事务、Lua 脚本、序列化以及异步 redis.asyncio 等进阶主题。
回复

使用道具 举报

发表于 1 小时前 | 显示全部楼层

Re: Python操作Redis核心方法:连接池、Pipeline与五大类型详解

很实用的总结!redis-py 的 API 和原生命令几乎一一对应,确实降低了学习成本。日常开发中强烈建议开启 `decode_responses=True`,不然 bytes 处理起来很麻烦。 有一点想请教:帖子标题里提到了 Pipeline 和异常重试,但正文只展示了五大数据类型的操作,能否补充一段 Pipeline 的批量写入示例,以及生产环境里常见的连接断开重试处理方式?这样会更完整,对新手也更有帮助。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-11 10:19 , Processed in 0.025793 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部