Compare commits

..

2 Commits

Author SHA1 Message Date
2c57c9cdc9 handle already existing users 2025-11-16 17:00:40 +01:00
2d751551b7 add logging to security 2025-11-16 16:59:35 +01:00
3 changed files with 72 additions and 26 deletions

View File

@@ -2,8 +2,6 @@ from models import User
from settings import settings
from fastapi import HTTPException, status, Request
import sqlite3
import jwt
import datetime
import security
@@ -51,27 +49,30 @@ def close() -> None:
def register(user: User) -> None:
"""Registers a new user in the database."""
cursor.execute(
"INSERT INTO users (name, password) VALUES (?, ?)",
(user.name,
security.hash_password(user.password)))
connection.commit()
try:
cursor.execute(
"INSERT INTO users (name, password) VALUES (?, ?)",
(user.name,
security.hash_password(user.password)))
connection.commit()
except sqlite3.IntegrityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User already exists"
)
def get_user_by_token(request: Request) -> User:
"""Retrieves a user from the database using a JWT token."""
token = request.headers.get("Authorization")
if not token or not token.startswith("Bearer "):
payload = security.decode_jwt(
request.headers.get("Authorization"))
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
token = token.split(" ")[1]
payload = jwt.decode(
token,
key=settings.jwt_secret,
algorithms=[
settings.jwt_algorithm])
connection, cursor = connect()
cursor.execute(
@@ -100,14 +101,4 @@ def login(user: User) -> str:
detail="Invalid credentials"
)
exp = datetime.datetime.now(
datetime.timezone.utc) + datetime.timedelta(hours=1)
payload = {
"id": row["id"],
"exp": exp
}
return jwt.encode(
payload=payload,
key=settings.jwt_secret,
algorithm=settings.jwt_algorithm)
return security.sign_jwt(row)

View File

@@ -2,6 +2,13 @@ from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
import database
import models
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
@asynccontextmanager

View File

@@ -1,6 +1,11 @@
from passlib.context import CryptContext
from settings import settings
import jwt
import datetime
import logging
password_context = CryptContext(schemes=["sha256_crypt"], deprecated="auto")
logger = logging.getLogger(__name__)
def hash_password(password: str) -> str:
@@ -11,3 +16,46 @@ def hash_password(password: str) -> str:
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain text password against a hashed password."""
return password_context.verify(plain_password, hashed_password)
def sign_jwt(row: dict) -> str:
"""Signs a JWT token with the given payload."""
exp = datetime.datetime.now(
datetime.timezone.utc) + datetime.timedelta(hours=1)
payload = {
"id": row["id"],
"exp": exp
}
return jwt.encode(
payload,
key=settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
def decode_jwt(token: str | None) -> dict | None:
"""Decodes a JWT token and returns the payload."""
if not token or not token.startswith("Bearer "):
logger.warning(
"No token provided or token does not start with 'Bearer '")
return None
try:
payload = jwt.decode(
token.replace("Bearer ", ""),
key=settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
return payload
except jwt.ExpiredSignatureError:
logger.warning("Token has expired")
return None
except jwt.InvalidTokenError:
logger.warning("Invalid token")
return None
logger.warning("Unexpected error in token decoding")
return None