Files
fast_api_template/backend/app/api/routes/users.py
T

220 lines
6.4 KiB
Python
Raw Normal View History

2024-02-25 19:39:33 +01:00
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import col, delete, func, select
2023-11-29 12:13:15 -05:00
from app import crud
from app.api.deps import (
CurrentUser,
SessionDep,
get_current_active_superuser,
)
from app.core.config import settings
from app.core.security import get_password_hash, verify_password
from app.models import (
Item,
2024-02-13 09:25:58 -05:00
Message,
UpdatePassword,
User,
UserCreate,
UserOut,
UserRegister,
UsersOut,
UserUpdate,
UserUpdateMe,
)
from app.utils import generate_new_account_email, send_email
router = APIRouter()
2023-12-27 13:37:05 -05:00
@router.get(
2024-02-25 19:39:33 +01:00
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut
2023-12-27 13:37:05 -05:00
)
def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
"""
Retrieve users.
"""
count_statement = select(func.count()).select_from(User)
count = session.exec(count_statement).one()
2023-11-29 12:13:15 -05:00
statement = select(User).offset(skip).limit(limit)
users = session.exec(statement).all()
return UsersOut(data=users, count=count)
2023-12-27 13:37:05 -05:00
@router.post(
"/", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut
)
def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
"""
Create new user.
"""
2023-11-29 12:13:15 -05:00
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
2024-03-12 21:25:03 +09:00
detail="The user with this email already exists in the system.",
)
2023-11-29 12:13:15 -05:00
user = crud.create_user(session=session, user_create=user_in)
if settings.emails_enabled and user_in.email:
email_data = generate_new_account_email(
2019-02-23 18:44:29 +04:00
email_to=user_in.email, username=user_in.email, password=user_in.password
)
send_email(
email_to=user_in.email,
subject=email_data.subject,
html_content=email_data.html_content,
)
2023-12-27 13:37:05 -05:00
return user
2023-11-29 12:13:15 -05:00
@router.patch("/me", response_model=UserOut)
def update_user_me(
*, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser
) -> Any:
"""
Update own user.
"""
2024-03-12 17:29:49 +01:00
if user_in.email:
existing_user = crud.get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != current_user.id:
2024-03-12 17:29:49 +01:00
raise HTTPException(
status_code=409, detail="User with this email already exists"
)
user_data = user_in.model_dump(exclude_unset=True)
current_user.sqlmodel_update(user_data)
session.add(current_user)
session.commit()
session.refresh(current_user)
return current_user
@router.patch("/me/password", response_model=Message)
def update_password_me(
*, session: SessionDep, body: UpdatePassword, current_user: CurrentUser
) -> Any:
"""
Update own password.
"""
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect password")
if body.current_password == body.new_password:
raise HTTPException(
status_code=400, detail="New password cannot be the same as the current one"
)
hashed_password = get_password_hash(body.new_password)
current_user.hashed_password = hashed_password
session.add(current_user)
session.commit()
return Message(message="Password updated successfully")
2023-11-29 12:13:15 -05:00
2023-12-27 13:37:05 -05:00
@router.get("/me", response_model=UserOut)
def read_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
"""
Get current user.
"""
2023-12-27 13:37:05 -05:00
return current_user
@router.post("/signup", response_model=UserOut)
def register_user(session: SessionDep, user_in: UserRegister) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
2020-01-19 22:40:50 +01:00
detail="Open user registration is forbidden on this server",
)
2023-11-29 12:13:15 -05:00
user = crud.get_user_by_email(session=session, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
2024-03-12 21:25:03 +09:00
detail="The user with this email already exists in the system",
)
user_create = UserCreate.model_validate(user_in)
2023-11-29 12:13:15 -05:00
user = crud.create_user(session=session, user_create=user_create)
2023-12-27 13:37:05 -05:00
return user
2023-12-27 13:37:05 -05:00
@router.get("/{user_id}", response_model=UserOut)
2019-02-23 18:44:29 +04:00
def read_user_by_id(
2023-11-29 12:13:15 -05:00
user_id: int, session: SessionDep, current_user: CurrentUser
2023-12-27 13:37:05 -05:00
) -> Any:
"""
Get a specific user by id.
"""
2023-11-29 12:13:15 -05:00
user = session.get(User, user_id)
if user == current_user:
2023-12-27 13:37:05 -05:00
return user
2023-11-29 12:13:15 -05:00
if not current_user.is_superuser:
raise HTTPException(
status_code=403,
2023-11-29 12:13:15 -05:00
detail="The user doesn't have enough privileges",
)
2023-12-27 13:37:05 -05:00
return user
2023-11-29 12:13:15 -05:00
@router.patch(
"/{user_id}",
dependencies=[Depends(get_current_active_superuser)],
response_model=UserOut,
)
def update_user(
*,
session: SessionDep,
user_id: int,
user_in: UserUpdate,
) -> Any:
"""
Update a user.
"""
2024-03-12 17:29:49 +01:00
db_user = session.get(User, user_id)
if not db_user:
raise HTTPException(
status_code=404,
2024-03-12 21:25:03 +09:00
detail="The user with this id does not exist in the system",
)
2024-03-12 17:29:49 +01:00
if user_in.email:
existing_user = crud.get_user_by_email(session=session, email=user_in.email)
if existing_user and existing_user.id != user_id:
2024-03-12 17:29:49 +01:00
raise HTTPException(
status_code=409, detail="User with this email already exists"
)
db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in)
return db_user
2024-02-13 09:25:58 -05:00
@router.delete("/{user_id}")
def delete_user(
session: SessionDep, current_user: CurrentUser, user_id: int
) -> Message:
"""
Delete a user.
"""
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
elif user != current_user and not current_user.is_superuser:
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
elif user == current_user and current_user.is_superuser:
2024-02-13 09:25:58 -05:00
raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves"
2024-02-13 09:25:58 -05:00
)
statement = delete(Item).where(col(Item.owner_id) == user_id)
session.exec(statement) # type: ignore
session.delete(user)
session.commit()
return Message(message="User deleted successfully")