python:Fastapi - BackgroundTasks (后台任务)
今天主要说后台任务,它主要就是在返回响应后运行任务。对于需要在请求之后发生的操作很有用,但客户端实际上不必在接收响应之前等待操作完成。这包括,例如:电子邮件发送的通知,需要连接电子邮件服务器很慢的数据文件处理,因文件大写入时太慢使用BackgroundTasks首先,导入BackgroundTasks其次在路径操作函数中定义一个参数,其类型声明为:BackgroundTasksfromfastap
·
今天主要说后台任务,它主要就是在返回响应后运行任务。
对于需要在请求之后发生的操作很有用,但客户端实际上不必在接收响应之前等待操作完成。
这包括,例如:
- 电子邮件发送的通知,需要连接电子邮件服务器
- 很慢的数据文件处理,因文件大写入时太慢
使用BackgroundTasks
首先,导入BackgroundTasks
其次在路径操作函数中定义一个参数,其类型声明为:BackgroundTasks
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_file(data: str):
with open("log.txt", mode="w") as w:
w.write(data)
@app.post("/write-file/")
async def send_notification(data: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, data)
return {"message": "Notification sent in the background"}
在上述示例中,消息将在发送响应后log.txt写入文件。
如果请求成功,它将在后台任务中写入日志。
然后在路径操作函数中生成一个后台任务将使用data数据写入一条消息。
应用到项目中的话,可以放在登录往redis
中写入token
的操作,如下:
# -*- encoding: utf-8 -*-
"""
@__Author__: lifeng
@__Software__: PyCharm
@__File__: login.py
@__Date__: 2022/5/3 15:19
"""
from datetime import timedelta
from sc_app.databases import get_db
from sqlalchemy.orm import Session
from sc_app.model import user_models
from sc_app.redispy import redispy
from sc_app.dependencies import generate_access_token
from sc_app.schemas.users.login import UserPwd, UserToken
from sc_app.dependencies import encryption_password_or_decode
from fastapi import APIRouter, HTTPException, Depends, status, BackgroundTasks
ACCESS_TOKEN_EXPIRE_DAYS = 3
def check_user(user: str, pwd: str, db: Session = Depends(get_db)):
"""
根据输入的用户信息,数据库查询比对账号和密码,并对密码进行解密操作
:param user:
:param pwd:
:param db:
:return:
"""
username, password = db.query(
user_models.Users.username, user_models.Users.password
).filter(user_models.Users.username == user).first()
if not username and not encryption_password_or_decode(pwd=pwd, hashed_password=password):
return False
return True
router = APIRouter(
prefix="/user"
)
@router.post("/login/", response_model=UserToken)
def api_login_user(user: UserPwd, background_tasks:BackgroundTasks, db: Session = Depends(get_db)):
"""
登录接口
:param user:
:param background_tasks:
:param db:
:return:
"""
# 如果返回False则抛出错误
if not check_user(user.username, user.password, db):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"status": 0,
"data": {},
"error_msg": "密码或账号错误 !",
"error_code": 1000
}
)
# 判断redis中key是否存在
if not redispy.get_exists(user.username):
access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
# 创建token
access_token = generate_access_token(
data={"username": user.username}, expiration=access_token_expires
)
# 后台任务执行往redis中写入token
background_tasks.add_task(redispy.set_value, user.username, access_token, is_data=True)
return {"token": access_token, "username": user.username}
# 从redis中读取token
access_token = redispy.get_value(user.username, is_data=True)
return {"token": access_token, "username": user.username}
首先定义一个参数,并声明为:BackgroundTasks
@router.post("/login/", response_model=UserToken)
def api_login_user(user: UserPwd, background_tasks:BackgroundTasks, db: Session = Depends(get_db)):
其次调用并执行token
写入
# 后台任务执行往redis中写入token
background_tasks.add_task(redispy.set_value, user.username, access_token, is_data=True)
信息注释:
.add_task()
作为参数接收:redispy.set_value, user.username, access_token, is_data=True
测试一下,看看是否正常
登录接口
POST http://127.0.0.1:8000/user/login/
请求参数
{
"username": "debugfeng15@qq.com",
"password": "123456"
}
请求结果
{
"username": "debugfeng15@qq.com",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImRlYnVnZmVuZzE1QHFxLmNvbSIsImV4cCI6MTY1MjUyNjU5NH0.xdyLPzhMaQ9T1gWMslDCqfmhmwXaVC4Dbu3KCd4rcl8",
"x_token": "debugfeng"
}
首页接口
GET http://127.0.0.1:8000/index/
请求头
{
"x-token": "debugfeng",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImRlYnVnZmVuZzE1QHFxLmNvbSIsImV4cCI6MTY1MjUyNjU5NH0.xdyLPzhMaQ9T1gWMslDCqfmhmwXaVC4Dbu3KCd4rcl8",
"x_token": "debugfeng"
}
请求结果
{
"message": "Welcome to the home page !"
}
今天先聊到这里吧,以上总结或许能帮助到你,或许帮助不到你,但还是希望能帮助到你,如有疑问、歧义,直接私信留言会及时修正发布;非常期待你的一键 3 连【 点赞、收藏、分享 】哟,谢谢!
未完成,待续……
一直在努力,希望你也是!
微信搜索公众号:就用python
更多推荐
所有评论(0)