from typing import Annotated from uuid import UUID from fastapi import Depends, Header, HTTPException, Security, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import TOKEN_TYPE_ACCESS, decode_token from app.db.session import get_session from app.models.user import User, UserTenant, TenantRole bearer_scheme = HTTPBearer(auto_error=False) async def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials | None, Security(bearer_scheme)], db: Annotated[AsyncSession, Depends(get_session)], ) -> User: if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Nicht authentifiziert. Bitte melden Sie sich an.", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = decode_token(credentials.credentials) if payload.get("type") != TOKEN_TYPE_ACCESS: raise JWTError("Falscher Token-Typ") user_id: str | None = payload.get("sub") if user_id is None: raise JWTError("Kein Subject im Token") except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Ungültiger oder abgelaufener Token.", headers={"WWW-Authenticate": "Bearer"}, ) from app.crud.user import crud_user user = await crud_user.get(db, id=UUID(user_id)) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Benutzer nicht gefunden.", ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Benutzerkonto ist deaktiviert.", ) return user CurrentUser = Annotated[User, Depends(get_current_user)] async def get_tenant_context( current_user: CurrentUser, db: Annotated[AsyncSession, Depends(get_session)], x_tenant_id: Annotated[str | None, Header(alias="X-Tenant-ID")] = None, ) -> tuple[User, UUID]: """Returns (current_user, tenant_id). Validates tenant membership.""" if x_tenant_id is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Header 'X-Tenant-ID' fehlt.", ) try: tenant_id = UUID(x_tenant_id) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Ungültige Tenant-ID.", ) if current_user.is_superadmin: # Superadmin: verify tenant exists from sqlalchemy import select from app.models.tenant import Tenant result = await db.execute(select(Tenant).where(Tenant.id == tenant_id)) tenant = result.scalar_one_or_none() if tenant is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Tenant nicht gefunden.", ) return current_user, tenant_id # Regular user: check membership from sqlalchemy import select result = await db.execute( select(UserTenant).where( UserTenant.user_id == current_user.id, UserTenant.tenant_id == tenant_id, ) ) membership = result.scalar_one_or_none() if membership is None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Kein Zugriff auf diesen Tenant.", ) return current_user, tenant_id async def get_tenant_role( current_user: CurrentUser, db: Annotated[AsyncSession, Depends(get_session)], x_tenant_id: Annotated[str | None, Header(alias="X-Tenant-ID")] = None, ) -> tuple[User, UUID, TenantRole | None]: """Returns (current_user, tenant_id, role). Role is None for superadmins.""" user, tenant_id = await get_tenant_context(current_user, db, x_tenant_id) if user.is_superadmin: return user, tenant_id, None from sqlalchemy import select result = await db.execute( select(UserTenant).where( UserTenant.user_id == user.id, UserTenant.tenant_id == tenant_id, ) ) membership = result.scalar_one_or_none() return user, tenant_id, membership.role if membership else None def require_min_role(min_role: TenantRole): """Dependency factory: ensures the user has at least the given role.""" async def _check( ctx: Annotated[ tuple[User, UUID, TenantRole | None], Depends(get_tenant_role) ], ) -> tuple[User, UUID, TenantRole | None]: user, tenant_id, role = ctx if user.is_superadmin: return ctx if role is None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Kein Zugriff auf diesen Tenant.", ) role_order = { TenantRole.READ_ONLY: 0, TenantRole.READ_WRITE: 1, TenantRole.TENANT_ADMIN: 2, } if role_order[role] < role_order[min_role]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Unzureichende Berechtigungen.", ) return ctx return _check