from models import User, TokenResponse from settings import settings from fastapi import HTTPException, status, Request import sqlite3 import security def connect() -> (sqlite3.Connection, sqlite3.Cursor): """Connects to the database and returns the connection and cursor.""" connection = sqlite3.connect('database.db') connection.row_factory = sqlite3.Row cursor = connection.cursor() return connection, cursor connection, cursor = connect() def init() -> None: """Initializes the database.""" # Create users table cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, password TEXT NOT NULL ) ''') # Create logs table cursor.execute(''' CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL, calories DOUBLE NOT NULL, description TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') def close() -> None: """Closes the database connection.""" connection.close() def register(user: User) -> None: """Registers a new user in the database.""" try: cursor.execute( "INSERT INTO users (name, password) VALUES (?, ?)", (user.name, security.hash_password(user.password))) connection.commit() except sqlite3.IntegrityError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User already exists" ) def get_user_by_token(request: Request) -> User: """Retrieves a user from the database using a JWT token.""" payload = security.decode_jwt( request.headers.get("Authorization")) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" ) connection, cursor = connect() cursor.execute( "SELECT id, name, password FROM users WHERE id = ?", (payload["id"],)) row = cursor.fetchone() connection.close() if not row: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" ) return User(**row) def login(user: User) -> str: """Logs in a user and returns a JWT token.""" cursor.execute( "SELECT id, name, password FROM users WHERE name = ?", (user.name,)) row = cursor.fetchone() if not row or not security.verify_password(user.password, row["password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" ) return TokenResponse(token=security.sign_jwt(row))