在需要使用HDFS作为文件存储的大数据场景中,将HTTP上传的文件直接异步写入HDFS是一项常见需求。FastAPI基于async/await的异步特性,加上hdfs库对异步操作的支持,可以高效地实现这一功能。本文从环境准备到实际代码,详细讲解如何构建一条可靠的文件上传通道,并对比临时文件中转与直接流式上传两种方法的差异。
### 环境准备
首先安装必要的依赖:
- pip install fastapi uvicorn hdfs[async]
复制代码
fastapi提供Web框架,uvicorn是ASGI服务器,hdfs[async]安装包含异步扩展的HDFS客户端。确保你的HDFS集群地址和端口(默认8020)可访问。
### FastAPI应用初始化
创建一个Python文件,例如main.py,导入所需模块并设置一个FastAPI实例:
- from fastapi import FastAPI, File, UploadFile
- from fastapi.responses import JSONResponse
- import asyncio
- import tempfile
- import os
- from hdfs import InsecureClient
- app = FastAPI()
- # HDFS连接参数(请替换为实际值)
- HDFS_HOST = 'your_hdfs_host'
- HDFS_PORT = '8020' # 默认端口
- HDFS_USER = 'your_hdfs_user'
复制代码
InsecureClient用于开发测试环境,生产环境应使用基于Kerberos认证的安全Client。
### 异步HDFS客户端工厂
提供异步函数创建客户端连接,避免每次请求都重新初始化:
- async def get_hdfs_client():
- # InsecureClient构造函数需要完整HTTP地址
- hdfs_url = f'http://{HDFS_HOST}:{HDFS_PORT}'
- client = InsecureClient(hdfs_url, user=HDFS_USER)
- return client
复制代码
这里将HOST和PORT拼接为URL格式,这是hdfs库的正确调用方式。
### 文件上传路由:临时文件方案
FastAPI的UploadFile对象提供了异步读取方法。下面先给出基于临时文件中转的实现:
- @app.post('/upload/')
- async def upload_file(file: UploadFile = File(...)):
- client = await get_hdfs_client()
- try:
- # 创建临时文件保存上传内容
- with tempfile.NamedTemporaryFile(delete=False) as tmp:
- content = await file.read()
- tmp.write(content)
- tmp_path = tmp.name
-
- # 上传到HDFS目标目录
- hdfs_path = f'/user/{HDFS_USER}/{file.filename}'
- await client.upload(tmp_path, hdfs_path)
-
- return JSONResponse(content={
- 'message': 'File uploaded successfully',
- 'filename': file.filename
- })
- except Exception as e:
- return JSONResponse(content={'message': str(e)}, status_code=500)
- finally:
- # 删除本地临时文件
- if os.path.exists(tmp_path):
- os.remove(tmp_path)
复制代码
此方案适合中小文件。临时文件作为中间存储可降低对HDFS连接的要求,但增加了磁盘I/O开销。
### 文件上传路由:流式上传方案
对于大文件,推荐直接从上传流中读取数据并逐块写入HDFS,避免临时文件占满磁盘。hdfs库的write方法支持可迭代的字节流,配合UploadFile的异步迭代器可实现流式写入:
- @app.post('/upload/stream')
- async def upload_file_stream(file: UploadFile = File(...)):
- client = await get_hdfs_client()
- try:
- hdfs_path = f'/user/{HDFS_USER}/{file.filename}'
- # 以写入模式打开HDFS文件
- with client.write(hdfs_path, overwrite=True) as writer:
- # 逐块读取上传内容并写入
- while True:
- chunk = await file.read(1024 * 1024) # 每次读取1MB
- if not chunk:
- break
- writer.write(chunk)
- return JSONResponse(content={
- 'message': 'File uploaded via stream successfully',
- 'filename': file.filename
- })
- except Exception as e:
- return JSONResponse(content={'message': str(e)}, status_code=500)
复制代码
client.write返回的writer对象可以在异步上下文中使用,但注意hdfs[async]基于coroutine,实际使用时可能需要结合loop.run_in_executor。更安全的做法是通过asyncio.to_thread或直接同步调用(因为write本质是阻塞I/O,配合线程池可模拟异步)。不过简单场景下同步调用并结合FastAPI的线程池也可以接受。上述代码展示了流式思维,实际生产环境可结合asyncio.to_thread优化。
### 运行和测试
使用uvicorn启动应用:
- uvicorn main:app --reload
复制代码
访问http://127.0.0.1:8000/docs查看交互式文档,或用curl测试:
- curl -X POST -F "file=@test.txt" http://127.0.0.1:8000/upload/
复制代码
### 关键细节与优化建议
- **安全性**:InsecureClient不加密不认证,生产环境应使用hdfs.Client并配置Kerberos票据。
- **错误处理**:真实场景需增加详细日志记录(如logging模块),区分客户端错误与服务器错误。
- **超时与重试**:对大文件上传可设置HDFS连接超时,并实现重试机制。
- **并发控制**:FastAPI一个worker可同时处理多个上传,但HDFS客户端注意连接数限制,可考虑连接池。
- **兼容性**:hdfs库的异步版本可能滞后,如遇阻塞可改用aihdfs或通过线程池包装同步hdfs调用。
通过以上两种方案,你可以根据文件规模和性能要求选择合适方式。流式上传在内存和磁盘占用上更优,是处理大文件的首选。 |