Python の Web Framework FastAPI でトークン認証

記事
IT・テクノロジー

# はじめに


Web サイトにおいて、アカウント認証が必要となる機会はよくあることかと思います。

先日、FastAPI について調べたので、今回は FastAPI を使ったトークン認証について、調べてみました。

python では JWT(Json Web Token)を使う際に `jose` パッケージを使っていきます。

他のフレームワークを使っても同じような仕組みでトークン認証は可能となりますので、参考になれば幸いです。

# トークン認証とは


一般的なトークン認証&それを使ったリクエストの流れは以下になります。

トークン認証.png


トークンを作る際は、日付情報など一意になるような値を混ぜてハッシュ化する必要があります。

トークンの中身は、`ヘッダー.ペイロード.署名` という形になっています。

また、JWT の中身をチェックするには、ttps://jwt.io/ を使ってみてください。

今回は、なるべく簡潔にトークン発行、検証、更新、削除のコードを示していきます。

# 実際のコード


## 準備

1. requirements.txt に以下を記述

```
uvicorn==0.13.3
fastapi==0.63.0
python-multipart==0.0.5
peewee==3.14.3
python-jose==3.2.0
```

2. 必要なパッケージをインストール

```
$ pip install -r requirements.txt
```

## DBのモデルを作成

User モデルを作成します。 今回は、アカウント作成はスコープ外なので、実行時に tester アカウントを作るようにしています。

```python
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

db = SqliteDatabase('db.sqlite3')


class User(Model):
id: int = AutoField(primary_key=True)
name: str = CharField(100)
password: str = CharField(100)
refresh_token: str = TextField(null=True)

class Meta:
database = db


db.create_tables([User])
User.create(name='tester', password='tester')
```

## トークン発行

```python
from datetime import datetime, timedelta

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from jose import jwt
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
id: int = AutoField(primary_key=True)
name: str = CharField(100)
password: str = CharField(100)
refresh_token: str = TextField(null=True)

class Meta:
database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_user(username: str, password: str) -> bool:
user = User.get(name=username)
return user.password == password


def create_new_token(user_id: int) -> str:
payload = {
'exp': datetime.utcnow() + timedelta(days=90),
'user_id': user_id,
}
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
User.update(refresh_token=token).where(User.id == user_id).execute()
return token


[at]app.post("/token/")
async def create(form: OAuth2PasswordRequestForm = Depends()):
if not check_valid_user(form.username, form.password):
raise HTTPException(status_code=401, detail='Invalid password')

user = User.get(name=form.username)
token = create_new_token(user.id)

return {"detail": "Success", "token": token}
```

## トークン検証

```python
from fastapi import FastAPI, HTTPException
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
id: int = AutoField(primary_key=True)
name: str = CharField(100)
password: str = CharField(100)
refresh_token: str = TextField(null=True)

class Meta:
database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_token(token: str) -> bool:
return len(User.select().where(User.refresh_token == token)) != 0


[at]app.put("/token/validate/{token}")
async def validate(token: str):
if not check_valid_token(token):
raise HTTPException(status_code=401, detail='Invalid token')

return {"detail": "Valid"}
```

## トークン更新

```python
from datetime import datetime, timedelta

from fastapi import FastAPI, HTTPException
from jose import jwt
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
id: int = AutoField(primary_key=True)
name: str = CharField(100)
password: str = CharField(100)
refresh_token: str = TextField(null=True)

class Meta:
database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_token(token: str) -> bool:
return len(User.select().where(User.refresh_token == token)) != 0


def create_new_token(user_id: int) -> str:
payload = {
'exp': datetime.utcnow() + timedelta(days=90),
'user_id': user_id,
}
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
User.update(refresh_token=token).where(User.id == user_id).execute()
return token


def get_user_from_token(token: str) -> User:
users = User.select().where(User.refresh_token == token)

return None if len(users) == 0 else users[0]


[at]app.put("/token/refresh/{token}")
async def refresh(token: str):
if not check_valid_token(token):
raise HTTPException(status_code=401, detail='Invalid token')

user = get_user_from_token(token)
token = create_new_token(user.id)

return {"detail": "Success", "token": token}
```

## トークン削除

```python
from datetime import datetime, timedelta

from fastapi import FastAPI, HTTPException
from jose import jwt
from peewee import SqliteDatabase, Model, AutoField, CharField, TextField

SECRET_KEY = "4qbqcd_iqxk-y6(gr8l^9elsr1acj+t7zohf7v8reqp&^e7%6p"

db = SqliteDatabase('db.sqlite3')


class User(Model):
id: int = AutoField(primary_key=True)
name: str = CharField(100)
password: str = CharField(100)
refresh_token: str = TextField(null=True)

class Meta:
database = db


db.create_tables([User])
User.create(name='tester', password='tester')

app = FastAPI()


def check_valid_token(token: str) -> bool:
return len(User.select().where(User.refresh_token == token)) != 0


def create_new_token(user_id: int) -> str:
payload = {
'exp': datetime.utcnow() + timedelta(days=90),
'user_id': user_id,
}
token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
User.update(refresh_token=token).where(User.id == user_id).execute()
return token


def get_user_from_token(token: str) -> User:
users = User.select().where(User.refresh_token == token)

return None if len(users) == 0 else users[0]


def delete_token(user_id: int):
User.update(refresh_token=None).where(User.id == user_id).execute()


[at]app.delete("/token/{token}")
async def delete(token: str):
if not check_valid_token(token):
raise HTTPException(status_code=401, detail='Invalid token')

user = get_user_from_token(token)
delete_token(user.id)

return {"detail": "Success"}
```

## 実行

```
$ uvicorn main:app
```

# おわりに

いかがでしたか。

そんなに詳しくない方でも比較的簡単に理解出来たかと思います。

Django などのフルスタックフレームワークを使うとデフォルトのアカウント管理機能がありますが、

バックエンドのフレームワークでは、自分でアカウント管理機能を実装しないといけないこともあるかと思います。

今回示した例では、出来る限り簡単に、理解できるように記述しましたので、リファクタリングしたい点はいくつもあるかと思います・・

参考程度に見ていただけると良いかと思います。

# 参考

ttps://fabeee.co.jp/blog/mattsun01/

サービス数40万件のスキルマーケット、あなたにぴったりのサービスを探す