81 lines
2.5 KiB
Python
81 lines
2.5 KiB
Python
from models import User
|
|
from passlib.context import CryptContext
|
|
from settings import settings
|
|
from fastapi import HTTPException, status, Request
|
|
import sqlite3
|
|
import jwt
|
|
import datetime
|
|
|
|
connection = sqlite3.connect('database.db')
|
|
connection.row_factory = sqlite3.Row
|
|
cursor = connection.cursor()
|
|
|
|
password_context = CryptContext(schemes=["sha256_crypt"], deprecated="auto")
|
|
|
|
def init() -> None:
|
|
# create users table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL UNIQUE,
|
|
password TEXT NOT NULL
|
|
)
|
|
''')
|
|
|
|
def close() -> None:
|
|
connection.close()
|
|
|
|
|
|
def register(user: User) -> None:
|
|
password = password_context.hash(user.password)
|
|
cursor.execute("INSERT INTO users (name, password) VALUES (?, ?)", (user.name, password))
|
|
connection.commit()
|
|
|
|
def get_user_by_token(request: Request) -> User:
|
|
token = request.headers.get("Authorization")
|
|
if not token or not token.startswith("Bearer "):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated"
|
|
)
|
|
token = token.split(" ")[1]
|
|
payload = jwt.decode(token, key=settings.jwt_secret, algorithms=[settings.jwt_algorithm])
|
|
|
|
connection = sqlite3.connect('database.db')
|
|
connection.row_factory = sqlite3.Row
|
|
cursor = connection.cursor()
|
|
cursor.execute("SELECT id, name, password FROM users WHERE id = ?", (payload["id"],))
|
|
row = cursor.fetchone()
|
|
connection.close()
|
|
|
|
if not row:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated"
|
|
)
|
|
|
|
return User(**row)
|
|
|
|
def login(user: User) -> str:
|
|
cursor.execute("SELECT id, name, password FROM users WHERE name = ?", (user.name,))
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid credentials"
|
|
)
|
|
if not password_context.verify(user.password, row["password"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid credentials"
|
|
)
|
|
|
|
exp = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1)
|
|
payload = {
|
|
"id": row["id"],
|
|
"exp": exp
|
|
}
|
|
|
|
return jwt.encode(payload=payload, key=settings.jwt_secret, algorithm=settings.jwt_algorithm)
|