查看: 211|回复: 1

Python FastAPI异步上传文件到HDFS:InsecureClient实现与流式优化

[复制链接]
发表于 昨天 17:00 | 显示全部楼层 |阅读模式
在需要使用HDFS作为文件存储的大数据场景中,将HTTP上传的文件直接异步写入HDFS是一项常见需求。FastAPI基于async/await的异步特性,加上hdfs库对异步操作的支持,可以高效地实现这一功能。本文从环境准备到实际代码,详细讲解如何构建一条可靠的文件上传通道,并对比临时文件中转与直接流式上传两种方法的差异。

### 环境准备

首先安装必要的依赖:
  1. pip install fastapi uvicorn hdfs[async]
复制代码

fastapi提供Web框架,uvicorn是ASGI服务器,hdfs[async]安装包含异步扩展的HDFS客户端。确保你的HDFS集群地址和端口(默认8020)可访问。

### FastAPI应用初始化

创建一个Python文件,例如main.py,导入所需模块并设置一个FastAPI实例:
  1. from fastapi import FastAPI, File, UploadFile
  2. from fastapi.responses import JSONResponse
  3. import asyncio
  4. import tempfile
  5. import os
  6. from hdfs import InsecureClient
  7. app = FastAPI()
  8. # HDFS连接参数(请替换为实际值)
  9. HDFS_HOST = 'your_hdfs_host'
  10. HDFS_PORT = '8020'  # 默认端口
  11. HDFS_USER = 'your_hdfs_user'
复制代码

InsecureClient用于开发测试环境,生产环境应使用基于Kerberos认证的安全Client。

### 异步HDFS客户端工厂

提供异步函数创建客户端连接,避免每次请求都重新初始化:
  1. async def get_hdfs_client():
  2.     # InsecureClient构造函数需要完整HTTP地址
  3.     hdfs_url = f'http://{HDFS_HOST}:{HDFS_PORT}'
  4.     client = InsecureClient(hdfs_url, user=HDFS_USER)
  5.     return client
复制代码

这里将HOST和PORT拼接为URL格式,这是hdfs库的正确调用方式。

### 文件上传路由:临时文件方案

FastAPI的UploadFile对象提供了异步读取方法。下面先给出基于临时文件中转的实现:
  1. @app.post('/upload/')
  2. async def upload_file(file: UploadFile = File(...)):
  3.     client = await get_hdfs_client()
  4.     try:
  5.         # 创建临时文件保存上传内容
  6.         with tempfile.NamedTemporaryFile(delete=False) as tmp:
  7.             content = await file.read()
  8.             tmp.write(content)
  9.             tmp_path = tmp.name
  10.         
  11.         # 上传到HDFS目标目录
  12.         hdfs_path = f'/user/{HDFS_USER}/{file.filename}'
  13.         await client.upload(tmp_path, hdfs_path)
  14.         
  15.         return JSONResponse(content={
  16.             'message': 'File uploaded successfully',
  17.             'filename': file.filename
  18.         })
  19.     except Exception as e:
  20.         return JSONResponse(content={'message': str(e)}, status_code=500)
  21.     finally:
  22.         # 删除本地临时文件
  23.         if os.path.exists(tmp_path):
  24.             os.remove(tmp_path)
复制代码

此方案适合中小文件。临时文件作为中间存储可降低对HDFS连接的要求,但增加了磁盘I/O开销。

### 文件上传路由:流式上传方案

对于大文件,推荐直接从上传流中读取数据并逐块写入HDFS,避免临时文件占满磁盘。hdfs库的write方法支持可迭代的字节流,配合UploadFile的异步迭代器可实现流式写入:
  1. @app.post('/upload/stream')
  2. async def upload_file_stream(file: UploadFile = File(...)):
  3.     client = await get_hdfs_client()
  4.     try:
  5.         hdfs_path = f'/user/{HDFS_USER}/{file.filename}'
  6.         # 以写入模式打开HDFS文件
  7.         with client.write(hdfs_path, overwrite=True) as writer:
  8.             # 逐块读取上传内容并写入
  9.             while True:
  10.                 chunk = await file.read(1024 * 1024)  # 每次读取1MB
  11.                 if not chunk:
  12.                     break
  13.                 writer.write(chunk)
  14.         return JSONResponse(content={
  15.             'message': 'File uploaded via stream successfully',
  16.             'filename': file.filename
  17.         })
  18.     except Exception as e:
  19.         return JSONResponse(content={'message': str(e)}, status_code=500)
复制代码

client.write返回的writer对象可以在异步上下文中使用,但注意hdfs[async]基于coroutine,实际使用时可能需要结合loop.run_in_executor。更安全的做法是通过asyncio.to_thread或直接同步调用(因为write本质是阻塞I/O,配合线程池可模拟异步)。不过简单场景下同步调用并结合FastAPI的线程池也可以接受。上述代码展示了流式思维,实际生产环境可结合asyncio.to_thread优化。

### 运行和测试

使用uvicorn启动应用:
  1. uvicorn main:app --reload
复制代码

访问http://127.0.0.1:8000/docs查看交互式文档,或用curl测试:
  1. curl -X POST -F "file=@test.txt" http://127.0.0.1:8000/upload/
复制代码

### 关键细节与优化建议

- **安全性**:InsecureClient不加密不认证,生产环境应使用hdfs.Client并配置Kerberos票据。
- **错误处理**:真实场景需增加详细日志记录(如logging模块),区分客户端错误与服务器错误。
- **超时与重试**:对大文件上传可设置HDFS连接超时,并实现重试机制。
- **并发控制**:FastAPI一个worker可同时处理多个上传,但HDFS客户端注意连接数限制,可考虑连接池。
- **兼容性**:hdfs库的异步版本可能滞后,如遇阻塞可改用aihdfs或通过线程池包装同步hdfs调用。

通过以上两种方案,你可以根据文件规模和性能要求选择合适方式。流式上传在内存和磁盘占用上更优,是处理大文件的首选。
回复

使用道具 举报

发表于 昨天 17:10 | 显示全部楼层

Re: Python FastAPI异步上传文件到HDFS:InsecureClient实现与流式优化

感谢楼主分享,讲得很清晰!我正好在做一个类似的项目,用FastAPI上传文件到HDFS。你的流式方案对大文件确实更合理,不过有个疑问:`client.write` 和 `writer.write` 在原生的 `hdfs` 库里似乎是同步阻塞的(即使 `AsyncClient` 反而底层是异步?),如果在FastAPI的协程里直接调用,会不会阻塞事件循环导致其他请求响应变慢?你是建议用 `asyncio.to_thread` 丢到线程池,还是说 `InsecureClient` 的 write 内部已经做了异步处理?期望进一步聊聊最佳实践。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-15 01:57 , Processed in 0.027379 second(s), 17 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部