format code with autopep8
This commit is contained in:
97
database.py
97
database.py
@@ -12,69 +12,86 @@ cursor = connection.cursor()
|
|||||||
|
|
||||||
password_context = CryptContext(schemes=["sha256_crypt"], deprecated="auto")
|
password_context = CryptContext(schemes=["sha256_crypt"], deprecated="auto")
|
||||||
|
|
||||||
|
|
||||||
def init() -> None:
|
def init() -> None:
|
||||||
# create users table
|
# create users table
|
||||||
cursor.execute('''
|
cursor.execute('''
|
||||||
CREATE TABLE IF NOT EXISTS users (
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
name TEXT NOT NULL UNIQUE,
|
name TEXT NOT NULL UNIQUE,
|
||||||
password TEXT NOT NULL
|
password TEXT NOT NULL
|
||||||
)
|
)
|
||||||
''')
|
''')
|
||||||
|
|
||||||
|
|
||||||
def close() -> None:
|
def close() -> None:
|
||||||
connection.close()
|
connection.close()
|
||||||
|
|
||||||
|
|
||||||
def register(user: User) -> None:
|
def register(user: User) -> None:
|
||||||
password = password_context.hash(user.password)
|
password = password_context.hash(user.password)
|
||||||
cursor.execute("INSERT INTO users (name, password) VALUES (?, ?)", (user.name, password))
|
cursor.execute(
|
||||||
connection.commit()
|
"INSERT INTO users (name, password) VALUES (?, ?)",
|
||||||
|
(user.name,
|
||||||
|
password))
|
||||||
|
connection.commit()
|
||||||
|
|
||||||
|
|
||||||
def get_user_by_token(request: Request) -> User:
|
def get_user_by_token(request: Request) -> User:
|
||||||
token = request.headers.get("Authorization")
|
token = request.headers.get("Authorization")
|
||||||
if not token or not token.startswith("Bearer "):
|
if not token or not token.startswith("Bearer "):
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
detail="Not authenticated"
|
detail="Not authenticated"
|
||||||
)
|
)
|
||||||
token = token.split(" ")[1]
|
token = token.split(" ")[1]
|
||||||
payload = jwt.decode(token, key=settings.jwt_secret, algorithms=[settings.jwt_algorithm])
|
payload = jwt.decode(
|
||||||
|
token,
|
||||||
|
key=settings.jwt_secret,
|
||||||
|
algorithms=[
|
||||||
|
settings.jwt_algorithm])
|
||||||
|
|
||||||
connection = sqlite3.connect('database.db')
|
connection = sqlite3.connect('database.db')
|
||||||
connection.row_factory = sqlite3.Row
|
connection.row_factory = sqlite3.Row
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
cursor.execute("SELECT id, name, password FROM users WHERE id = ?", (payload["id"],))
|
cursor.execute(
|
||||||
|
"SELECT id, name, password FROM users WHERE id = ?", (payload["id"],))
|
||||||
row = cursor.fetchone()
|
row = cursor.fetchone()
|
||||||
connection.close()
|
connection.close()
|
||||||
|
|
||||||
if not row:
|
if not row:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
detail="Not authenticated"
|
detail="Not authenticated"
|
||||||
)
|
)
|
||||||
|
|
||||||
return User(**row)
|
return User(**row)
|
||||||
|
|
||||||
|
|
||||||
def login(user: User) -> str:
|
def login(user: User) -> str:
|
||||||
cursor.execute("SELECT id, name, password FROM users WHERE name = ?", (user.name,))
|
cursor.execute(
|
||||||
row = cursor.fetchone()
|
"SELECT id, name, password FROM users WHERE name = ?", (user.name,))
|
||||||
|
row = cursor.fetchone()
|
||||||
if not row:
|
|
||||||
raise HTTPException(
|
if not row:
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
raise HTTPException(
|
||||||
detail="Invalid credentials"
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
)
|
detail="Invalid credentials"
|
||||||
if not password_context.verify(user.password, row["password"]):
|
)
|
||||||
raise HTTPException(
|
if not password_context.verify(user.password, row["password"]):
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
raise HTTPException(
|
||||||
detail="Invalid credentials"
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
)
|
detail="Invalid credentials"
|
||||||
|
)
|
||||||
exp = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1)
|
|
||||||
payload = {
|
exp = datetime.datetime.now(
|
||||||
"id": row["id"],
|
datetime.timezone.utc) + datetime.timedelta(hours=1)
|
||||||
"exp": exp
|
payload = {
|
||||||
}
|
"id": row["id"],
|
||||||
|
"exp": exp
|
||||||
return jwt.encode(payload=payload, key=settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
}
|
||||||
|
|
||||||
|
return jwt.encode(
|
||||||
|
payload=payload,
|
||||||
|
key=settings.jwt_secret,
|
||||||
|
algorithm=settings.jwt_algorithm)
|
||||||
|
|||||||
6
format.sh
Executable file
6
format.sh
Executable file
@@ -0,0 +1,6 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
which autopep8 &> /dev/null || { echo "autopep8 not found, please install it."; exit 1; }
|
||||||
|
|
||||||
|
autopep8 --in-place --aggressive --aggressive --recursive --exclude .venv,.git,__pycache__ .
|
||||||
|
|
||||||
|
echo "Code formatted with autopep8."
|
||||||
9
main.py
9
main.py
@@ -3,6 +3,7 @@ from contextlib import asynccontextmanager
|
|||||||
import database
|
import database
|
||||||
import models
|
import models
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
database.init()
|
database.init()
|
||||||
@@ -12,19 +13,17 @@ async def lifespan(app: FastAPI):
|
|||||||
app = FastAPI(lifespan=lifespan)
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/")
|
|
||||||
async def root():
|
|
||||||
return {"message": "Hello World"}
|
|
||||||
|
|
||||||
@app.get("/me")
|
@app.get("/me")
|
||||||
async def me(user: models.User = Depends(database.get_user_by_token)):
|
async def me(user: models.User = Depends(database.get_user_by_token)):
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
@app.post("/users")
|
@app.post("/users")
|
||||||
async def register(user: models.User):
|
async def register(user: models.User):
|
||||||
database.register(user)
|
database.register(user)
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
@app.post("/login")
|
@app.post("/login")
|
||||||
async def login(user: models.User):
|
async def login(user: models.User):
|
||||||
return database.login(user)
|
return database.login(user)
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
class User(BaseModel):
|
class User(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
password: str
|
password: str
|
||||||
|
|||||||
@@ -2,3 +2,4 @@ fastapi[standard]==0.121.2
|
|||||||
passlib==1.7.4
|
passlib==1.7.4
|
||||||
pyjwt==2.10.1
|
pyjwt==2.10.1
|
||||||
pydantic-settings==2.12.0
|
pydantic-settings==2.12.0
|
||||||
|
autopep8==2.3.2
|
||||||
|
|||||||
@@ -1,9 +1,12 @@
|
|||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
class Settings(BaseSettings):
|
||||||
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')
|
model_config = SettingsConfigDict(
|
||||||
|
env_file='.env', env_file_encoding='utf-8')
|
||||||
|
|
||||||
jwt_secret: str
|
jwt_secret: str
|
||||||
jwt_algorithm: str
|
jwt_algorithm: str
|
||||||
|
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
|
|||||||
Reference in New Issue
Block a user