Files
gartenmanager/backend/app/core/deps.py

162 lines
5.2 KiB
Python
Raw Normal View History

from typing import Annotated
from uuid import UUID
from fastapi import Depends, Header, HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import TOKEN_TYPE_ACCESS, decode_token
from app.db.session import get_session
from app.models.user import User, UserTenant, TenantRole
bearer_scheme = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials | None, Security(bearer_scheme)],
db: Annotated[AsyncSession, Depends(get_session)],
) -> User:
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Nicht authentifiziert. Bitte melden Sie sich an.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(credentials.credentials)
if payload.get("type") != TOKEN_TYPE_ACCESS:
raise JWTError("Falscher Token-Typ")
user_id: str | None = payload.get("sub")
if user_id is None:
raise JWTError("Kein Subject im Token")
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Ungültiger oder abgelaufener Token.",
headers={"WWW-Authenticate": "Bearer"},
)
from app.crud.user import crud_user
user = await crud_user.get(db, id=UUID(user_id))
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Benutzer nicht gefunden.",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Benutzerkonto ist deaktiviert.",
)
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
async def get_tenant_context(
current_user: CurrentUser,
db: Annotated[AsyncSession, Depends(get_session)],
x_tenant_id: Annotated[str | None, Header(alias="X-Tenant-ID")] = None,
) -> tuple[User, UUID]:
"""Returns (current_user, tenant_id). Validates tenant membership."""
if x_tenant_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Header 'X-Tenant-ID' fehlt.",
)
try:
tenant_id = UUID(x_tenant_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ungültige Tenant-ID.",
)
if current_user.is_superadmin:
# Superadmin: verify tenant exists
from sqlalchemy import select
from app.models.tenant import Tenant
result = await db.execute(select(Tenant).where(Tenant.id == tenant_id))
tenant = result.scalar_one_or_none()
if tenant is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tenant nicht gefunden.",
)
return current_user, tenant_id
# Regular user: check membership
from sqlalchemy import select
result = await db.execute(
select(UserTenant).where(
UserTenant.user_id == current_user.id,
UserTenant.tenant_id == tenant_id,
)
)
membership = result.scalar_one_or_none()
if membership is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Kein Zugriff auf diesen Tenant.",
)
return current_user, tenant_id
async def get_tenant_role(
current_user: CurrentUser,
db: Annotated[AsyncSession, Depends(get_session)],
x_tenant_id: Annotated[str | None, Header(alias="X-Tenant-ID")] = None,
) -> tuple[User, UUID, TenantRole | None]:
"""Returns (current_user, tenant_id, role). Role is None for superadmins."""
user, tenant_id = await get_tenant_context(current_user, db, x_tenant_id)
if user.is_superadmin:
return user, tenant_id, None
from sqlalchemy import select
result = await db.execute(
select(UserTenant).where(
UserTenant.user_id == user.id,
UserTenant.tenant_id == tenant_id,
)
)
membership = result.scalar_one_or_none()
return user, tenant_id, membership.role if membership else None
def require_min_role(min_role: TenantRole):
"""Dependency factory: ensures the user has at least the given role."""
async def _check(
ctx: Annotated[
tuple[User, UUID, TenantRole | None], Depends(get_tenant_role)
],
) -> tuple[User, UUID, TenantRole | None]:
user, tenant_id, role = ctx
if user.is_superadmin:
return ctx
if role is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Kein Zugriff auf diesen Tenant.",
)
role_order = {
TenantRole.READ_ONLY: 0,
TenantRole.READ_WRITE: 1,
TenantRole.TENANT_ADMIN: 2,
}
if role_order[role] < role_order[min_role]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Unzureichende Berechtigungen.",
)
return ctx
return _check