satupeta-main/api/deps/auth_dependency.py

35 lines
1.1 KiB
Python

from fastapi import Depends, Header
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from datetime import datetime
from response import errorRes
from database.connection import SessionLocal
from database.models import User
async def get_db():
async with SessionLocal() as session:
yield session
async def get_current_user(
authorization: str = Header(None),
db: AsyncSession = Depends(get_db)
):
if not authorization or not authorization.startswith("Bearer "):
raise errorRes(status_code=401, message="Missing or invalid token")
token = authorization.split(" ")[1]
result = await db.execute(select(User).where(User.active_token == token))
user = result.scalar_one_or_none()
# Case 1: Token not found → maybe replaced by new login
if not user:
raise errorRes(status_code=401, message="Token invalid or used by another login")
# Case 2: Token expired
if user.token_expired_at and user.token_expired_at < datetime.utcnow():
raise errorRes(status_code=401, message="Token expired")
return user