add connect method

This commit is contained in:
2025-11-16 16:45:40 +01:00
parent a120512baf
commit 622a14ce66

View File

@@ -6,9 +6,17 @@ import jwt
import datetime import datetime
import security import security
connection = sqlite3.connect('database.db')
connection.row_factory = sqlite3.Row def connect() -> (sqlite3.Connection, sqlite3.Cursor):
cursor = connection.cursor() """Connects to the database and returns the connection and cursor."""
connection = sqlite3.connect('database.db')
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
return connection, cursor
connection, cursor = connect()
def init() -> None: def init() -> None:
@@ -46,31 +54,29 @@ def register(user: User) -> None:
cursor.execute( cursor.execute(
"INSERT INTO users (name, password) VALUES (?, ?)", "INSERT INTO users (name, password) VALUES (?, ?)",
(user.name, (user.name,
security.hash_password(user.password)) security.hash_password(user.password)))
connection.commit() connection.commit()
def get_user_by_token(request: Request) -> User: def get_user_by_token(request: Request) -> User:
"""Retrieves a user from the database using a JWT token.""" """Retrieves a user from the database using a JWT token."""
token=request.headers.get("Authorization") token = request.headers.get("Authorization")
if not token or not token.startswith("Bearer "): if not token or not token.startswith("Bearer "):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated" detail="Not authenticated"
) )
token=token.split(" ")[1] token = token.split(" ")[1]
payload=jwt.decode( payload = jwt.decode(
token, token,
key=settings.jwt_secret, key=settings.jwt_secret,
algorithms=[ algorithms=[
settings.jwt_algorithm]) settings.jwt_algorithm])
connection=sqlite3.connect('database.db') connection, cursor = connect()
connection.row_factory=sqlite3.Row
cursor=connection.cursor()
cursor.execute( cursor.execute(
"SELECT id, name, password FROM users WHERE id = ?", (payload["id"],)) "SELECT id, name, password FROM users WHERE id = ?", (payload["id"],))
row=cursor.fetchone() row = cursor.fetchone()
connection.close() connection.close()
if not row: if not row:
@@ -86,7 +92,7 @@ def login(user: User) -> str:
"""Logs in a user and returns a JWT token.""" """Logs in a user and returns a JWT token."""
cursor.execute( cursor.execute(
"SELECT id, name, password FROM users WHERE name = ?", (user.name,)) "SELECT id, name, password FROM users WHERE name = ?", (user.name,))
row=cursor.fetchone() row = cursor.fetchone()
if not row or not security.verify_password(user.password, row["password"]): if not row or not security.verify_password(user.password, row["password"]):
raise HTTPException( raise HTTPException(
@@ -94,9 +100,9 @@ def login(user: User) -> str:
detail="Invalid credentials" detail="Invalid credentials"
) )
exp=datetime.datetime.now( exp = datetime.datetime.now(
datetime.timezone.utc) + datetime.timedelta(hours=1) datetime.timezone.utc) + datetime.timedelta(hours=1)
payload={ payload = {
"id": row["id"], "id": row["id"],
"exp": exp "exp": exp
} }