学习过程分为几个部分

提示: 本文章为个人笔记,可能存在较多的错误理解

  • OAuth2在FastAPI的使用及OAuth2是什么?
  • ORM sqlalchemy 在FastAPI
  • template jinja2 在FastAPI
  • FastAPI对websocket的支持及自定义websocket适配器

还有一些小的知识点: - Depends - 全局依赖注入 - 中间件 @app.middleware('http')

参考链接

理解OAuth 2.0-阮一峰的网络日志
Fastapi之OAuth2登录认证-马上读初一

OAuth2与JWT

我的理解是,在用户登录后将token信息响应给用户,token使用JWT生成与验证。里面可以包含用户id等信息,在下次用户请求时解析token字段即可。 在这里我用的是OAuth2PasswordBearer类来获取请求头中的token字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from typing import List, Optional
from datetime import timedelta, datetime
from jose import jwt
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

def create_jwt_token(data: dict, expire_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + expire_delta if expire_delta else \
datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
data.update({'exp': expire})
token = jwt.encode(claims=data, key=SECRET_KEY, algorithm=ALGORITHM)
return token


async def auth_token(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(
token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get('username')
user_type = payload.get('user_type')
return schemas.AdminAuthorization(username=username, user_type=user_type)
except:
return None


@app.post('/login')
async def login(user: schemas.AdminAuthorization,
db: Session = Depends(get_db)):
admin_user = crud.get_admin_authorization(
db, user.username, user.password)
if admin_user:
token = create_jwt_token(
{'username': admin_user.username, 'user_type': admin_user.admin_type})
return {'username': admin_user.username,
'user_type': admin_user.admin_type,
'Token': token}
return {}

# 在使用时
@app.get('/guns', response_model=List[schemas.GunDetail])
async def get_guns(user: str = Depends(auth_token),
db: Session = Depends(get_db)):
pass

解释解释Depends

是FastAPI实现的一个依赖注入的方法,可以传入function或者是class。
这里解释一下

1
2
3
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
async def test(token: str = Depends(oauth2_scheme)):
pass

首先发现OAuth2PasswordBearer的__call__方法, __call__方法是可以将一个类的实例变成一个可调用对象。
所以oauth2_scheme也相当于是一个class, 它的__init__返回param

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# fastapi程序源码
class OAuth2PasswordBearer(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
description: Optional[str] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)

async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
print('in oauth2.py: ', authorization)
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
在上面源码处增加一个print可以发现输出
1
in oauth2.py:  Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImNoZW5nIiwiZXhwIjoxNjU3NjE3OTg3fQ.jL0DUdXCIz7TK-UTNd1QW5_SXIYohAzBTg4thIoZbIU
以及token字段为
1
token:  eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImNoZW5nIiwiZXhwIjoxNjU3NjE3OTg3fQ.jL0DUdXCIz7TK-UTNd1QW5_SXIYohAzBTg4thIoZbIU

FUNCTION get_authorization_scheme_param

1
2
3
4
5
6
7
8
from typing import Tuple


def get_authorization_scheme_param(authorization_header_value: str) -> Tuple[str, str]:
if not authorization_header_value:
return "", ""
scheme, _, param = authorization_header_value.partition(" ")
return scheme, param

其中FUNCTION partition

1
2
3
4
5
6
e.g.
str = "www.runoob.com"
print str.partition(".")

# 输出
('www', '.', 'runoob.com')

所以可以看到源程序CLASS OAuth2PasswordBearer 中__call__的返回param就是请求头中带的Bearer类型的Token。

以上大部分都是源码程序

ORM sqlalchemy

在FastAPI中使用时,一般结构如下: - 配置连接 - 数据模型 models - schemas 继承pydantic.BaseModel - CRUD 执行数据库操作的自定义函数

database

  • create_engine
  • declarative_base
  • sessionmaker

models

  • Column

schemas

crud

参考

操作关系型数据库-小菠萝测试笔记
关于HTTP基本原理

子查询

目的:我需要将内连接后的表数据分组,且筛选组里面的数据,再筛选分组后的数据。 数据库关系图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 合并charger、gun_status、price、admin_chargers
-- 根据admin_chargers.username筛选charger
-- 获取每个charger的最新price
-- 思路是先查询出每个桩最新price的datetime(根据GROUP BY charger_id)
-- 再筛选合并后的数据
-- 所以是一个子查询
SELECT
charger.charger_id,
charger.location,
price.unit_price,
gun_status.`status`,
gun_status.gun_id,
gun_status.last_update
FROM
charger
INNER JOIN gun_status ON charger.charger_id = gun_status.charger_id
INNER JOIN price ON charger.charger_id = price.charger_id
INNER JOIN admin_chargers ON charger.charger_id = admin_chargers.charger_id
INNER JOIN (
SELECT
charger.charger_id,
max( price.datetime ) AS current_datetime
FROM
charger
INNER JOIN price ON charger.charger_id = price.charger_id
GROUP BY
charger.charger_id
) current_price ON charger.charger_id = current_price.charger_id
WHERE
admin_chargers.username = 'cheng'
AND price.datetime = current_price.current_datetime

对应代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
current_price = db.query(
models.Charger.charger_id.label('charger_id'),
func.max(models.Price.datetime).label('max_datetime')
).filter(
models.Charger.charger_id == models.Price.charger_id
).group_by(models.Charger.charger_id).subquery()

cps = db.query(
models.Charger.charger_id,
models.GunStatus.gun_id,
models.Charger.charger_point,
models.Charger.location,
models.GunStatus.status,
models.GunStatus.meter_value,
models.GunStatus.last_update,
models.Price.unit_price
).filter(
models.Charger.charger_id == models.GunStatus.charger_id,
models.Charger.charger_id == models.Price.charger_id,
models.Charger.charger_id == models.AdminChargers.charger_id,
models.AdminChargers.username == username,
current_price.c.max_datetime == models.Price.datetime,
current_price.c.charger_id == models.Charger.charger_id
).all()

add 与 update 与 delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# models
class Test(Base):
__tablename__ = 'test'
uuid = Column(Integer, primary_key=True, autoincrement=True)
detail = Column(String(128), nullable=False)
last_update = Column(DateTime, nullable=False)

# schemas
class Test(BaseModel):
uuid: int = None
detail: str

# add
def post_test(db: Session, test: schemas.Test):
db.add(
models.Test(
detail=test.detail,
last_update=datetime.now()
))
db.commit()

# put
def put_test(db: Session, test: schemas.Test):
db.query(models.Test).filter(
models.Test.uuid == test.uuid
).update({
models.Test.detail: test.detail,
models.Test.last_update: datetime.now()
})
db.commit()

FastAPI 中间件与全局依赖

一开始我想使用中间件来实现token验证,结果发现FastAPI的中间件的返回值时一个<function BaseHTTPMiddleware.__call__.<locals>.call_next, return None是不行的...(我太菜了...),发现应该使用全局依赖而不是中间件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
app = FastAPI(dependencies=[Depends(auth)])

# 而不是
# 中间件是需要await call_next(request)
# 是<function BaseHTTPMiddleware.__call__.<locals>.call_next

@app.middleware('http')
async def authorization(request: Request, call_next):
token: str = await oauth2_scheme(request)
try:
payload = jwt.decode(
token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get('username')
if not username:
pass
except:
pass
return await call_next(request)

jinja2

官网

FastAPI内置的websocket

pass

APIRouter

作用:为了FastAPI中路由汇总管理,类似于Django中的urls
我这里的项目结构:
项目结构

1
2
3
4
5
6
7
8
9
10
11
12
# 使用方式
# admin.py
from fastapi import APIRouter

router = APIRouter()
@router.post('/dome')
async def dome()
return {'code': 1000}

# main.py
from api.v1.admin import router as admin_router
app.include_router(admin_router, prefix='/admin')

CORS(跨域资源共享)

官方文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = [
"http://localhost.tiangolo.com",
"https://localhost.tiangolo.com",
"http://localhost",
"http://localhost:8080",
]

app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

报错与解决

sqlalchemy.exc.OperationalError: pymysql.err.OperationalError 1055

参考链接
报错解释,是mysql group by引发的错误。

解决方法 修改my.cnf文件在[mysqld]下添加如下:

1
2
# 设置sql_mode,关闭ONLY_FULL_GROUP_BY,避免使用group by函数导致1055错误
sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
my.cnf文件地址一般在/etc/mysql下

我这里的mysql放在了docker里面。所以有一些麻烦。 - 进入docker - 修改docker内的my.cnf - restart

由于docker容器内部没有vim,安装vim可能也有很多问题。所以我先将docker容器内部的my.cnf文件copy出来,修改后再copy进去。

1
2
3
4
5
6
7
8
9
10
11
12
# 2cf83af5c391是容器id
# 进入容器
docker exec -it 2cf83af5c391 /bin/bash

#将容器中的文件拷贝出来
docker cp 2cf83af5c391:/etc/my.cnf /home/docker_mysql/

#将容器中的文件拷贝回去
docker cp /home/docker_mysql/my.cnf 2cf83af5c391:/etc/

# 重启容器
docker restart 2cf83af5c391

放对位置

结尾

更新历史 第一次写文章的时间 2022-07-15 23:00
第二次写文章的时间 未知
第三次写文章的时间 2022-07-23 18:00 成都成华区疫情挺严重,所在小区被封了...
第四次写文章的时间 2022-07-29 17:00 成华区封了,不知道需要等多久,今天是最后一天上班了。现在这种情况怎么离职呢...