50 lines
1.6 KiB
Python
50 lines
1.6 KiB
Python
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.future import select
|
|
from passlib.context import CryptContext
|
|
from uuid import uuid4
|
|
from datetime import datetime, timedelta
|
|
from database.connection import SessionLocal
|
|
from database.models import User
|
|
from response import successRes, errorRes
|
|
|
|
async def get_db():
|
|
async with SessionLocal() as session:
|
|
yield session
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
async def loginService(username: str, password: str, db: AsyncSession):
|
|
result = await db.execute(select(User).where(User.username == username))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise errorRes(status_code=401, message="Invalid username or password")
|
|
|
|
# Verify password
|
|
if not pwd_context.verify(password, user.password_hash):
|
|
raise errorRes(status_code=401, message="Invalid username or password")
|
|
|
|
# Validation for institution user
|
|
if user.role != "admin" and not user.institution_id:
|
|
raise errorRes(status_code=403, message="User must belong to an institution")
|
|
|
|
# Generate single active token
|
|
token = str(uuid4())
|
|
expiry = datetime.utcnow() + timedelta(hours=4)
|
|
|
|
user.active_token = token
|
|
user.token_expired_at = expiry
|
|
user.last_login = datetime.utcnow()
|
|
await db.commit()
|
|
|
|
res = {
|
|
"status": "success",
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"institution_id": user.institution_id,
|
|
"token": token,
|
|
"token_expired_at": expiry.isoformat()
|
|
}
|
|
|
|
return successRes(message="Success Login", data=res)
|