Projet VanLife — Local First — Sécurisation de l’API et gestion des utilisateurs
Introduction
Cet article est les deuxième article d’une série concernant la mise en place d’un ensemble des capteurs dans un van, liés à un Raspberry Pi et connecté à une app.
Introduction : Projet Van Life — Introduction — Une semaine de « hackaton » pour OnTheBeach.dev
Ces développements ont été fait lors d’un “hackaton/road trip” par OnTheBeach.dev.
I. Sécurisation de l’API
Maintenant que nous avons mis en place la base de notre API, nous pouvons l’utiliser pour communiquer avec nos apps mobile. Mais à ce stade, toute personne ayant accès au réseau peut le faire. Nous allons donc ajouter une couche de sécurité avec la mise en place d’une authentification OAuth 2.
Tout est prévu dans FastAPI. Nous allons donc mettre à jour notre fichier api.py
from typing import Annotated
from fastapi import FastAPI, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordBearer
from tinydb import TinyDB, Query
from pydantic import BaseModelimport uuid#Gestion des DB
temperature_db = TinyDB('DB/temperatures.json')#Gestion du modèle
class Position(BaseModel):
lat: float
long: floatclass Temperature(BaseModel):
id: str
value: float
unity: str
ts: int
position: Position#Gestion de l'API
app = FastAPI()#Gestion de la sécurité
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")@app.post("/temperature", status_code=status.HTTP_201_CREATED)
def add_temperature(temperature: Temperature):
new_entry = temperature.dict()
new_id = uuid.uuid4()
new_entry["id"] = str(new_id)
temperature_db.insert(new_entry)
return {"success": True}@app.get("/temperatures")
async def get_temperature(token: Annotated[str, Depends(oauth2_scheme)]):
all_temperatures = temperature_db.all()
return {"success":True, "data":all_temperatures}
La gestion de la sécurité est gérée par oauth2_scheme, une instance de OAuth2PasswordBearer (fournit par fastapi.security). En modifiant notre méthode get_temperature, en une méthode asynchrone, et en demandant qu’elle reçoive un token en tant que string, et récupéré par l’instance oauth2_scheme. Celle-ci vérifie la valeur de l’attribut Authorization du Header de l’appel fait au endpoint temperatures. Si cet attribut contient un Bearer associé à un token, il renvoie ce token en tant que chaine de caractère.
Une fois le fichier modifié, et le serveur relancé, nous allons tester à nouveau l’appel à notre endpoint temperatures.
On voit que j’ai une réponse 401 Unauthorized, et en aucun cas mes données dans la réponse. Je vais maintenant ajouter mon Bearer avec un token factice (abcdef), et voir ce que cela donne.
On voit que je suis autorisé à passer, et que j’ai bien mes données. La méthode fonctionne, mais ce n’est évidemment que le début, étant donné que n’importe quel token fonctionnera.
Pour sécuriser l’API, il faut commencer par créer une table utilisateur.
II. Gestion des utilisateurs
Je vais donc utiliser à nouveau TinyDB pour stocker les données “utilisateurs”.
Nous allons devoir stocker des mots de passe. Il est évident que nous n’allons pas les stocker en clair, donc nous allons utiliser la bibliothèque cryptography pour Python.
Si ce n’est pas déjà le cas, nous allons installer la bibliothèque.
pip install cryptography
Je me suis créé un bout de code Python pour créer un utilisateur dans la base de données, avec un mot de passe chiffré.
from cryptography.fernet import Fernet
from tinydb import TinyDB, Queryimport uuid#Gestion des DB
users_db = TinyDB('DB/users.json')#crypto_key = Fernet.generate_key()
crypto_key = "ZxI7iSISbOrDnYoc7YWXHVoUAC6UGZsNGorkRwVbfkU="
crypto = Fernet(crypto_key)
password = crypto.encrypt(b"monsupermotdepasse")new_id = str(uuid.uuid4())users_db.insert({"id": new_id, "name":"Admin", "firstname":"Axel", "email":"axel@onthebeach.dev", "password":password.decode("utf-8") })
Attention : Comme vous le voyez j’ai généré une clé de chiffrage grâce à la méthode generates_key. Attention à cette clé. Avec cette méthode, elle doit être utilisée pour chiffrer les mots de passes. Si vous la perdez, vous ne pourrez plus gérer l’authentification de vos utilisateurs. Si quelqu’un d’autre a accès à cette clé, plus aucun compte ne sera protégé.
Nous avons donc maintenant un utilisateur dans notre base. Nous allons pouvoir continuer la sécurisation de l’API.
III. Authentification
Nous allons donc modifier notre API, pour avoir le processus suivant :
- L’utilisateur s’authentifie avec son login et son mot de passe
- On vérifie la validité du mot de passe de l’utilisateur
- Si le mot de passe est valide, et on renvoie les données de l’utilisateur
Pour gérer la réception du username et du password de l’utilisateur, nous allons utiliser OAuth2PasswordRequestForm fournit par fastapi.security. Pour pouvoir l’utiliser il faut s’assurer que python-multipart est installé. Si ce n’est pas le cas, installez le.
pip install python-multipart
On va maintenant modifier le code de notre API pour qu’elle effectue les point 1, 2 et 3.
from typing import Annotated
from cryptography.fernet import Fernetfrom fastapi import FastAPI, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from tinydb import TinyDB, Query
from pydantic import BaseModelimport uuid#Gestion des DB
temperature_db = TinyDB('DB/temperatures.json')
users_db = TinyDB('DB/users.json')
UserQuery = Query()crypto_key = "ZxI7iSISbOrDnYoc7YWXHVoUAC6UGZsNGorkRwVbfkU="
crypto = Fernet(crypto_key)#Gestion du modèle
class Position(BaseModel):
lat: float
long: floatclass Temperature(BaseModel):
id: str
value: float
unity: str
ts: int
position: Positionclass Token(BaseModel):
access_token: str
token_type: strclass User(BaseModel):
id: str
name: str
firstname: str
email: strclass DBUser(User):
password: str#Gestion de l'API
app = FastAPI()#Gestion de la sécurité
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")#Méthodes d'authentification
def verify_password(password: str, encrypted_password: str):
decrypted_password_as_bytes = crypto.decrypt(bytes(encrypted_password, "utf-8"))
decrypted_password_as_str = decrypted_password_as_bytes.decode("utf-8")
return decrypted_password_as_str == passworddef authenticate_user(username: str, password: str):
user = users_db.search(UserQuery.email == username)
if len(user) == 0:
return False
else:
aUser = DBUser(**user[0])
if verify_password(password, aUser.password) == False :
return False
else:
return aUser@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> User:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return user#Méthodes CRUD pour les données
@app.post("/temperature", status_code=status.HTTP_201_CREATED)
def add_temperature(temperature: Temperature):
new_entry = temperature.dict()
new_id = uuid.uuid4()
new_entry["id"] = str(new_id)
temperature_db.insert(new_entry)
return {"success": True}@app.get("/temperatures")
async def get_temperature(token: Annotated[str, Depends(oauth2_scheme)]):
all_temperatures = temperature_db.all()
return {"success":True, "data":all_temperatures}
Lorsqu’un utilisateur va saisir son login et son mot de passe, nous allons appeler le endpoint token. L’attribut form_data récupère les données username et password provenant de la requête (la magie opère grâce à OAuth2PasswordRequestForm).
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> User:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return user
On va ensuite appeler la méthode authenticate_user avec les attributes username et password, afin que cette méthode vérifie si un utilisateur existe en base avec ce username et si le mot de passe envoyé est bien celui enregistré en base.
def authenticate_user(username: str, password: str):
user = users_db.search(UserQuery.email == username)
if len(user) == 0:
return False
else:
aUser = DBUser(**user[0])
if verify_password(password, aUser.password) == False :
return False
else:
return aUser
Si aucun utilisateur ne correspond à ce username (que j’ai enregistré sous l’attribut email dans la base users), on renvoie False. Si non, on va appeler la méthode verify_password pour qu’elle compare le mot de passe envoyé avec celui crypté et stocké en base.
def verify_password(password: str, encrypted_password: str):
decrypted_password_as_bytes = crypto.decrypt(bytes(encrypted_password, "utf-8"))
decrypted_password_as_str = decrypted_password_as_bytes.decode("utf-8")
return decrypted_password_as_str == password
On va donc déchiffrer le mot de passe présent en base et le comparer avec celui envoyé. On renvoie le booléen résultant de la comparaison. Si le résultat est égale à True on renvoie les données de l’utilisateur.
On notera que mon objet User a été divisé en 2 objets : User et DBUser. DBUser étant un objet étendu de User, ne contenant en plus que le mot de passe chiffré. Cela permet de renvoyer un objet qui ne contient pas le mot de passe.
Grâce à la documentation FastAPI, on peut facilement tester nos requêtes.
Par exemple, si on envoie une demande avec un mot de passe erroné, nous obtenons une réponse 401.
Alors qu’avec le véritable mot de passe de l’utilisateur, nous obtenons bien les données de celui-ci.
Mission accomplie !!
IV. Génération d’un token grâce à JWT Token
Nous allons maintenant générer un token et le renvoyer à l’utilisateur en lieu et place des données utilisateurs renvoyées dans le code ci-dessus. Ce token sera alors le moyen d’authentifier ce dernier à chaque requête envoyée à l’API durant la durée de validation du token.
Pour générer un token, nous allons encore modifier le code de notre API.
Tout d’abord nous allons importer quelques outils
from datetime import datetime, timedelta, timezone
from typing import Annotated, Union
import jwt
Nous ajoutons quelques méthodes provenant de datetime pour la gestion de la durée de validité du token. Nous ajoutons l’import d’Union pour donner la possibilité à un attribut d’être de différents types.
Enfin nous importons jwt pour la gestion du token.
#Gestion du token
SECRET_KEY = "c5ec639f697d581da9f56c5a6b5e59606c05591383cd66a1560542e2c417b67b"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
Pour générer un token, on va utiliser une clé secrète (attention, à bien protéger et à ne pas perdre), un algorithme de chiffrage et une durée de validité du token. Pour générer une clé secrète, on peut exécuter la commande suivante :
openssl rand -hex 32
Nous allons ensuite créer une méthode de création d’un token d’accès, et modifier le endpoint token pour qu’il crée un token si l’utilisateur est bien identifié, puis le renvoie.
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
Il nous reste plus qu’à tester ces modifications.
Et voilà. L’application peut alors stocker le token et l’utiliser pour interroger toutes les méthodes de l’API.
La prochaine étape va donc consister à gérer l’interprétation de ce token dans l’API.
V. Sécurisation par le token
On va rajouter 2 méthodes permettant de récupérer l’utilisateur correspondant au token envoyé par l’application interrogeant l’API.
def get_user(username: str):
user = users_db.search(UserQuery.email == username)
if len(user) == 0:
return False
else:
return user[0]
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
#token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(username)
if user is None:
raise credentials_exception
return user
La méthode get_current_user récupère le token envoyé par la requête, le déchiffre, et récupère le username contenu dans l’attribut sub (voir la méthode login_for_access_token créée précédemment). Une fois récupéré, on vérifie qu’il correspond à un utilisateur via la méthode get_user. Si toutes ces étapes répondent positivement, on renvoie les données à l’app ayant envoyé la requête.
La méthode get_current_user est donc maintenant celle qui gère la sécurité pour chaque endpoint (à l’exception de token évidemment), il faut donc mettre à jour les méthodes pour utiliser get_current_user.
#Méthodes CRUD pour les données
@app.post("/temperature", status_code=status.HTTP_201_CREATED)
def add_temperature(temperature: Temperature, current_user: Annotated[User, Depends(get_current_user)]):
new_entry = temperature.dict()
new_id = uuid.uuid4()
new_entry["id"] = str(new_id)
temperature_db.insert(new_entry)
return {"success": True}
@app.get("/temperatures")
async def get_temperature(current_user: Annotated[User, Depends(get_current_user)]):
all_temperatures = temperature_db.all()
return {"success":True, "data":all_temperatures}
Et voilà, notre API est maintenant sécurisée. Dans ce code, la durée de validité du token est de 30 minutes. Toutes les 30 minutes, on demandera donc à l’utilisateur de se reconnecter. En fonction des situations, on pourra augmenter ou diminuer cette durée en modifiant la constant ACCESS_TOKEN_EXPIRE_MINUTES.
Dans le prochain article, nous verrons simplement comment récupérer les données d’un capteur de température, pour les enregistrer dans la base de données, via l’API créée ici.