108 lines
2.8 KiB
Python
108 lines
2.8 KiB
Python
from models import User
|
|
from settings import settings
|
|
from fastapi import HTTPException, status, Request
|
|
import sqlite3
|
|
import jwt
|
|
import datetime
|
|
import security
|
|
|
|
connection = sqlite3.connect('database.db')
|
|
connection.row_factory = sqlite3.Row
|
|
cursor = connection.cursor()
|
|
|
|
|
|
def init() -> None:
|
|
"""Initializes the database."""
|
|
|
|
# Create users table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL UNIQUE,
|
|
password TEXT NOT NULL
|
|
)
|
|
''')
|
|
|
|
# Create logs table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS logs (
|
|
id INTEGER PRIMARY KEY,
|
|
user_id INTEGER NOT NULL,
|
|
calories DOUBLE NOT NULL,
|
|
description TEXT,
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)
|
|
''')
|
|
|
|
|
|
def close() -> None:
|
|
"""Closes the database connection."""
|
|
connection.close()
|
|
|
|
|
|
def register(user: User) -> None:
|
|
"""Registers a new user in the database."""
|
|
cursor.execute(
|
|
"INSERT INTO users (name, password) VALUES (?, ?)",
|
|
(user.name,
|
|
security.hash_password(user.password))
|
|
connection.commit()
|
|
|
|
|
|
def get_user_by_token(request: Request) -> User:
|
|
"""Retrieves a user from the database using a JWT token."""
|
|
token=request.headers.get("Authorization")
|
|
if not token or not token.startswith("Bearer "):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated"
|
|
)
|
|
token=token.split(" ")[1]
|
|
payload=jwt.decode(
|
|
token,
|
|
key=settings.jwt_secret,
|
|
algorithms=[
|
|
settings.jwt_algorithm])
|
|
|
|
connection=sqlite3.connect('database.db')
|
|
connection.row_factory=sqlite3.Row
|
|
cursor=connection.cursor()
|
|
cursor.execute(
|
|
"SELECT id, name, password FROM users WHERE id = ?", (payload["id"],))
|
|
row=cursor.fetchone()
|
|
connection.close()
|
|
|
|
if not row:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated"
|
|
)
|
|
|
|
return User(**row)
|
|
|
|
|
|
def login(user: User) -> str:
|
|
"""Logs in a user and returns a JWT token."""
|
|
cursor.execute(
|
|
"SELECT id, name, password FROM users WHERE name = ?", (user.name,))
|
|
row=cursor.fetchone()
|
|
|
|
if not row or not security.verify_password(user.password, row["password"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid credentials"
|
|
)
|
|
|
|
exp=datetime.datetime.now(
|
|
datetime.timezone.utc) + datetime.timedelta(hours=1)
|
|
payload={
|
|
"id": row["id"],
|
|
"exp": exp
|
|
}
|
|
|
|
return jwt.encode(
|
|
payload=payload,
|
|
key=settings.jwt_secret,
|
|
algorithm=settings.jwt_algorithm)
|