from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from passlib.context import CryptContext from uuid import uuid4 from datetime import datetime, timedelta from database.connection import SessionLocal from database.models import User from response import successRes, errorRes async def get_db(): async with SessionLocal() as session: yield session pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") async def loginService(username: str, password: str, db: AsyncSession): result = await db.execute(select(User).where(User.username == username)) user = result.scalar_one_or_none() if not user: raise errorRes(status_code=401, message="Invalid username or password") # Verify password if not pwd_context.verify(password, user.password_hash): raise errorRes(status_code=401, message="Invalid username or password") # Validation for organization user if user.role != "admin" and not user.organization_id: raise errorRes(status_code=403, message="User must belong to an organization") # Generate single active token token = str(uuid4()) expiry = datetime.utcnow() + timedelta(hours=4) user.active_token = token user.token_expired_at = expiry user.last_login = datetime.utcnow() await db.commit() res = { "status": "success", "username": user.username, "role": user.role, "organization_id": user.organization_id, "token": token, "token_expired_at": expiry.isoformat() } return successRes(message="Success Login", data=res)