Python asyncio实战:并发请求REST API并优雅处理错误
103
0
0
0
Python asyncio实战:并发请求REST API并优雅处理错误
在现代Web开发中,我们经常需要从多个REST API获取数据。如果串行请求这些API,效率会非常低下。Python的asyncio库提供了一种优雅的方式来实现并发请求,从而显著提高数据获取速度。本文将通过一个实际案例,演示如何使用asyncio并发请求多个REST API,并有效地处理潜在的错误。
场景设定
假设我们需要从两个不同的API获取数据:
- 用户API:
https://jsonplaceholder.typicode.com/users/{user_id},用于获取用户信息。 - 产品API:
https://jsonplaceholder.typicode.com/todos/{todo_id},用于获取产品信息。
我们需要并发地获取多个用户和产品的信息,并将它们整合起来。
准备工作
首先,我们需要安装aiohttp库,它是一个基于asyncio的HTTP客户端。
pip install aiohttp
核心代码
以下是使用asyncio并发请求REST API并处理错误的核心代码:
import asyncio
import aiohttp
import json
async def fetch_user(session, user_id):
url = f'https://jsonplaceholder.typicode.com/users/{user_id}'
try:
async with session.get(url) as response:
response.raise_for_status() # 检查HTTP状态码
return await response.json()
except aiohttp.ClientError as e:
print(f'Error fetching user {user_id}: {e}')
return None
async def fetch_todo(session, todo_id):
url = f'https://jsonplaceholder.typicode.com/todos/{todo_id}'
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
print(f'Error fetching todo {todo_id}: {e}')
return None
async def main():
async with aiohttp.ClientSession() as session:
user_ids = range(1, 4) # 获取用户ID 1, 2, 3
todo_ids = range(1, 4) # 获取产品ID 1, 2, 3
user_tasks = [fetch_user(session, user_id) for user_id in user_ids]
todo_tasks = [fetch_todo(session, todo_id) for todo_id in todo_ids]
# 并发执行所有任务
users = await asyncio.gather(*user_tasks)
todos = await asyncio.gather(*todo_tasks)
# 处理返回结果
print('Users:')
for user in users:
if user:
print(json.dumps(user, indent=4))
print('\nTodos:')
for todo in todos:
if todo:
print(json.dumps(todo, indent=4))
if __name__ == '__main__':
asyncio.run(main())
代码详解
fetch_user和fetch_todo函数: 这两个函数分别负责从用户API和产品API获取数据。它们使用aiohttp.ClientSession发送GET请求,并使用response.raise_for_status()检查HTTP状态码。如果发生任何aiohttp.ClientError异常(例如网络错误、连接超时等),则捕获异常并打印错误信息,然后返回None。main函数: 这是主函数,负责创建aiohttp.ClientSession,定义需要获取的用户ID和产品ID,创建任务列表,并使用asyncio.gather并发执行所有任务。asyncio.gather函数接受一个任务列表,并返回一个包含所有任务结果的列表。如果某个任务发生异常,asyncio.gather会抛出该异常。但由于我们在fetch_user和fetch_todo函数中已经处理了异常,所以asyncio.gather不会抛出异常,而是返回None。- 错误处理: 在
fetch_user和fetch_todo函数中,我们使用了try...except块来捕获aiohttp.ClientError异常。这使得程序在遇到网络错误或API异常时不会崩溃,而是能够继续执行其他任务。同时,我们打印了错误信息,方便调试。 - 结果处理: 在
main函数中,我们遍历asyncio.gather返回的结果列表,并检查每个结果是否为None。如果结果不为None,则打印结果。这样可以避免因某个API请求失败而导致整个程序崩溃。
运行结果
运行上述代码,你将会看到类似以下的输出:
Users:
{
"id": 1,
"name": "Leanne Graham",
"username": "Bret",
"email": "Sincere@april.biz",
"address": {
"street": "Kulas Light",
"suite": "Apt. 556",
"city": "Gwenborough",
"zipcode": "92998-3874",
"geo": {
"lat": "-37.3159",
"lng": "81.1496"
}
},
"phone": "1-770-736-8031 x56442",
"website": "hildegard.org",
"company": {
"name": "Romaguera-Crona",
"catchPhrase": "Multi-layered client-server neural-net",
"bs": "harness real-time e-markets"
}
}
{
"id": 2,
"name": "Ervin Howell",
"username": "Antonette",
"email": "Shanna@melissa.tv",
"address": {
"street": "Victor Plains",
"suite": "Suite 879",
"city": "Wisokyburgh",
"zipcode": "90566-7771",
"geo": {
"lat": "-43.9509",
"lng": "-34.4618"
}
},
"phone": "010-692-6593 x09125",
"website": "anastasia.net",
"company": {
"name": "Deckow-Crist",
"catchPhrase": "Proactive didactic contingency",
"bs": "synergize scalable supply-chains"
}
}
{
"id": 3,
"name": "Clementine Bauch",
"username": "Samantha",
"email": "Nathan@yesenia.net",
"address": {
"street": "Douglas Extension",
"suite": "Suite 847",
"city": "McKenziehaven",
"zipcode": "59590-4157",
"geo": {
"lat": "-68.6102",
"lng": "-47.0653"
}
},
"phone": "1-463-123-4447",
"website": "ramiro.info",
"company": {
"name": "Romaguera-Jacobson",
"catchPhrase": "Face to face bifurcated interface",
"bs": "e-enable strategic applications"
}
}
Todos:
{
"userId": 1,
"id": 1,
"title": "delectus aut autem",
"completed": false
}
{
"userId": 1,
"id": 2,
"title": "quis ut nam facilis et officia qui",
"completed": false
}
{
"userId": 1,
"id": 3,
"title": "fugiat veniam minus",
"completed": false
}
进阶技巧
- 使用信号量限制并发数: 如果API有并发限制,可以使用
asyncio.Semaphore来限制并发请求的数量。 - 添加重试机制: 对于可能因网络波动而失败的请求,可以添加重试机制,例如使用
asyncio.sleep在重试之间添加延迟。 - 使用更复杂的错误处理策略: 可以根据不同的错误类型采取不同的处理策略,例如对于404错误,可以忽略该错误;对于500错误,可以重试请求。
总结
本文演示了如何使用Python的asyncio库并发请求多个REST API,并有效地处理潜在的错误。通过使用aiohttp库发送异步HTTP请求,并使用try...except块捕获异常,我们可以编写出高效、健壮的并发程序。希望本文能够帮助你更好地理解和应用asyncio库。