chore: save session state – feature/phase-1 ready to implement

- Update session-context.md with exact resume point for next session
- Update settings.local.json with broader git permissions
- feature/grundstruktur merged to develop
- PAT authentication configured

Version: 0.2.3
This commit is contained in:
Faultier314
2026-04-05 23:24:21 +02:00
parent 5d9d517d18
commit b58edfc6eb
23 changed files with 838 additions and 14 deletions

View File

View File

@@ -0,0 +1,21 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
)
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/gartenmanager"
SECRET_KEY: str = "change-me-in-production-use-a-long-random-string"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
CORS_ORIGINS: list[str] = ["*"]
APP_TITLE: str = "Gartenmanager API"
APP_VERSION: str = "1.0.0"
settings = Settings()

161
backend/app/core/deps.py Normal file
View File

@@ -0,0 +1,161 @@
from typing import Annotated
from uuid import UUID
from fastapi import Depends, Header, HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import TOKEN_TYPE_ACCESS, decode_token
from app.db.session import get_session
from app.models.user import User, UserTenant, TenantRole
bearer_scheme = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials | None, Security(bearer_scheme)],
db: Annotated[AsyncSession, Depends(get_session)],
) -> User:
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Nicht authentifiziert. Bitte melden Sie sich an.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(credentials.credentials)
if payload.get("type") != TOKEN_TYPE_ACCESS:
raise JWTError("Falscher Token-Typ")
user_id: str | None = payload.get("sub")
if user_id is None:
raise JWTError("Kein Subject im Token")
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Ungültiger oder abgelaufener Token.",
headers={"WWW-Authenticate": "Bearer"},
)
from app.crud.user import crud_user
user = await crud_user.get(db, id=UUID(user_id))
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Benutzer nicht gefunden.",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Benutzerkonto ist deaktiviert.",
)
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
async def get_tenant_context(
current_user: CurrentUser,
db: Annotated[AsyncSession, Depends(get_session)],
x_tenant_id: Annotated[str | None, Header(alias="X-Tenant-ID")] = None,
) -> tuple[User, UUID]:
"""Returns (current_user, tenant_id). Validates tenant membership."""
if x_tenant_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Header 'X-Tenant-ID' fehlt.",
)
try:
tenant_id = UUID(x_tenant_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ungültige Tenant-ID.",
)
if current_user.is_superadmin:
# Superadmin: verify tenant exists
from sqlalchemy import select
from app.models.tenant import Tenant
result = await db.execute(select(Tenant).where(Tenant.id == tenant_id))
tenant = result.scalar_one_or_none()
if tenant is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tenant nicht gefunden.",
)
return current_user, tenant_id
# Regular user: check membership
from sqlalchemy import select
result = await db.execute(
select(UserTenant).where(
UserTenant.user_id == current_user.id,
UserTenant.tenant_id == tenant_id,
)
)
membership = result.scalar_one_or_none()
if membership is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Kein Zugriff auf diesen Tenant.",
)
return current_user, tenant_id
async def get_tenant_role(
current_user: CurrentUser,
db: Annotated[AsyncSession, Depends(get_session)],
x_tenant_id: Annotated[str | None, Header(alias="X-Tenant-ID")] = None,
) -> tuple[User, UUID, TenantRole | None]:
"""Returns (current_user, tenant_id, role). Role is None for superadmins."""
user, tenant_id = await get_tenant_context(current_user, db, x_tenant_id)
if user.is_superadmin:
return user, tenant_id, None
from sqlalchemy import select
result = await db.execute(
select(UserTenant).where(
UserTenant.user_id == user.id,
UserTenant.tenant_id == tenant_id,
)
)
membership = result.scalar_one_or_none()
return user, tenant_id, membership.role if membership else None
def require_min_role(min_role: TenantRole):
"""Dependency factory: ensures the user has at least the given role."""
async def _check(
ctx: Annotated[
tuple[User, UUID, TenantRole | None], Depends(get_tenant_role)
],
) -> tuple[User, UUID, TenantRole | None]:
user, tenant_id, role = ctx
if user.is_superadmin:
return ctx
if role is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Kein Zugriff auf diesen Tenant.",
)
role_order = {
TenantRole.READ_ONLY: 0,
TenantRole.READ_WRITE: 1,
TenantRole.TENANT_ADMIN: 2,
}
if role_order[role] < role_order[min_role]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Unzureichende Berechtigungen.",
)
return ctx
return _check

View File

@@ -0,0 +1,53 @@
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
TOKEN_TYPE_ACCESS = "access"
TOKEN_TYPE_REFRESH = "refresh"
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(subject: str | Any, extra_claims: dict | None = None) -> str:
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode: dict[str, Any] = {
"sub": str(subject),
"exp": expire,
"type": TOKEN_TYPE_ACCESS,
}
if extra_claims:
to_encode.update(extra_claims)
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(subject: str | Any) -> str:
expire = datetime.now(timezone.utc) + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
)
to_encode: dict[str, Any] = {
"sub": str(subject),
"exp": expire,
"type": TOKEN_TYPE_REFRESH,
}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> dict[str, Any]:
"""Decode and validate a JWT. Raises JWTError on failure."""
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
return payload