satupeta-main/app/services/user_service.py
2026-01-27 09:11:58 +07:00

151 lines
5.7 KiB
Python

from typing import Dict, List, Tuple, Union, override
from fastapi import HTTPException, status
from uuid6 import UUID
from app.core.exceptions import NotFoundException
from app.core.security import get_password_hash
from app.models import UserModel
from app.repositories import UserRepository
from app.repositories.role_repository import RoleRepository
from app.schemas.user_schema import UserSchema
from . import BaseService
class UserService(BaseService[UserModel, UserRepository]):
access_control_level = {
"data_viewer": {"data_viewer"},
"data_manager": {"data_manager", "data_viewer"},
"data_validator": {"data_validator", "data_manager", "data_viewer"},
"administrator": {
"administrator",
"data_validator",
"data_manager",
"data_viewer",
},
}
def __init__(self, repository: UserRepository, role_repository: RoleRepository):
super().__init__(UserModel, repository)
self.role_repository = role_repository
self.forbidden_exception = HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to access this resource",
)
async def find_by_username(self, username: str) -> UserModel | None:
user = await self.repository.find_by_username(username)
if not user:
raise NotFoundException("User not found")
return user
async def find_by_email(self, email: str) -> UserModel | None:
user = await self.repository.find_by_email(email)
if not user:
raise NotFoundException("User not found")
return user
@override
async def find_all(
self,
user: UserSchema,
filters: Union[str, list[str]] = None,
sort: Union[str, list[str]] = None,
search: str = "",
group_by: str = None,
limit: int = 100,
offset: int = 0,
relationships: List[str] = None,
searchable_columns: List[str] = None,
) -> Tuple[List[UserModel], int]:
if user.role.name not in self.access_control_level["administrator"]:
raise self.forbidden_exception
role_instances = await self.role_repository.get_list_by_names(self.access_control_level[user.role.name])
temp_filters = []
for role in role_instances:
temp_filters.append(f"role_id={role.id}")
filters.append(temp_filters)
return await super().find_all(
filters,
sort,
search,
group_by,
limit,
offset,
relationships,
searchable_columns,
)
async def find_by_id(self, id: UUID, user: UserSchema = None) -> UserSchema | None:
user_instance = await self.repository.find_by_id(id)
if not user_instance:
raise NotFoundException("User not found")
if user is not None:
if user.role.name not in self.access_control_level["administrator"]:
raise self.forbidden_exception
if user_instance.role.name not in self.access_control_level[user.role.name]:
raise self.forbidden_exception
return user_instance
async def create(self, user_data: Dict, user: UserSchema) -> UserModel:
role_instance = await self.role_repository.find_by_id(user.role.id)
if not role_instance:
raise NotFoundException("Role not found")
role_name = role_instance.name
if role_name not in ["administrator", "data_validator"]:
raise self.forbidden_exception
if await self.repository.find_by_username(user_data["username"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists",
)
if await self.repository.find_by_email(user_data["email"]):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists")
user_data["password"] = get_password_hash(user_data["password"])
return await self.repository.create(user_data)
async def update(self, id: UUID, user_data: Dict, user: UserSchema) -> UserModel:
if user.role.name not in self.access_control_level["administrator"]:
raise self.forbidden_exception
user_instance = await self.find_by_id(id)
if not user_instance:
raise NotFoundException("User not found")
if user_instance.role.name not in self.access_control_level[user.role.name]:
raise self.forbidden_exception
if "username" in user_data and await self.repository.find_by_username(user_data["username"]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists",
)
if "email" in user_data and await self.repository.find_by_email(user_data["email"]):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists")
if "password" in user_data:
user_data["password"] = get_password_hash(user_data["password"])
return await self.repository.update(id, user_data)
async def bulk_update_activation(self, user_ids: List[UUID], is_active: bool, user: UserSchema) -> None:
if user.role.name not in self.access_control_level["administrator"]:
raise self.forbidden_exception
existing_user_ids = await self.repository.find_all_ids(user_ids)
for user_id in user_ids:
if user_id not in existing_user_ids:
raise NotFoundException(f"User with id {user_id} not found")
await self.repository.bulk_update_activation(user_ids, is_active)