Files
CalCalculator/database.py

105 lines
2.7 KiB
Python

from models import User, TokenResponse
from settings import settings
from fastapi import HTTPException, status, Request
import sqlite3
import security
def connect() -> (sqlite3.Connection, sqlite3.Cursor):
"""Connects to the database and returns the connection and cursor."""
connection = sqlite3.connect('database.db')
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
return connection, cursor
connection, cursor = connect()
def init() -> None:
"""Initializes the database."""
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
password TEXT NOT NULL
)
''')
# Create logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
calories DOUBLE NOT NULL,
description TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
def close() -> None:
"""Closes the database connection."""
connection.close()
def register(user: User) -> None:
"""Registers a new user in the database."""
try:
cursor.execute(
"INSERT INTO users (name, password) VALUES (?, ?)",
(user.name,
security.hash_password(user.password)))
connection.commit()
except sqlite3.IntegrityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User already exists"
)
def get_user_by_token(request: Request) -> User:
"""Retrieves a user from the database using a JWT token."""
payload = security.decode_jwt(
request.headers.get("Authorization"))
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
connection, cursor = connect()
cursor.execute(
"SELECT id, name, password FROM users WHERE id = ?", (payload["id"],))
row = cursor.fetchone()
connection.close()
if not row:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
return User(**row)
def login(user: User) -> str:
"""Logs in a user and returns a JWT token."""
cursor.execute(
"SELECT id, name, password FROM users WHERE name = ?", (user.name,))
row = cursor.fetchone()
if not row or not security.verify_password(user.password, row["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
return TokenResponse(token=security.sign_jwt(row))