Files
app-service/backend/app/dependencies.py
matteoscrugli 8c4a555b88 Add comprehensive backend features and mobile UI improvements
Backend:
- Add 2FA authentication with TOTP support
- Add API keys management system
- Add audit logging for security events
- Add file upload/management system
- Add notifications system with preferences
- Add session management
- Add webhooks integration
- Add analytics endpoints
- Add export functionality
- Add password policy enforcement
- Add new database migrations for core tables

Frontend:
- Add module position system (top/bottom sidebar sections)
- Add search and notifications module configuration tabs
- Add mobile logo replacing hamburger menu
- Center page title absolutely when no tabs present
- Align sidebar footer toggles with navigation items
- Add lighter icon color in dark theme for mobile
- Add API keys management page
- Add notifications page with context
- Add admin analytics and audit logs pages

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 22:27:32 +01:00

137 lines
4.3 KiB
Python

"""Shared dependencies for FastAPI dependency injection."""
from datetime import datetime, timezone
from typing import Generator
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from app.db.session import SessionLocal
from app.config import settings
from app.core.security import decode_access_token
from app import models, crud
# Database dependency
def get_db() -> Generator[Session, None, None]:
"""
Dependency that provides a database session.
Automatically closes the session after the request.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
# Security
security = HTTPBearer()
def _get_client_ip(request: Request) -> str:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> models.User:
"""
Dependency that validates JWT token and returns current user.
Raises HTTPException if token is invalid or user not found.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
# API key authentication (programmatic access)
if token.startswith("sk_"):
api_key = crud.api_key.authenticate(db, plain_key=token, ip_address=_get_client_ip(request))
if not api_key:
raise credentials_exception
user = crud.user.get(db, id=api_key.user_id)
if user is None or not user.is_active:
raise credentials_exception
return user
payload = decode_access_token(token)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
if payload.get("temp") is True:
raise credentials_exception
except JWTError:
raise credentials_exception
user = crud.user.get(db, id=user_id)
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
session = crud.session.get_by_token(db, token)
if not session:
# Backward compatibility: if a valid JWT exists without a session row,
# create a session record on first use so it can be managed/revoked.
exp = payload.get("exp")
expires_at = None
if isinstance(exp, (int, float)):
expires_at = datetime.fromtimestamp(exp, tz=timezone.utc).replace(tzinfo=None)
created = crud.session.create(
db,
user_id=user.id,
token=token,
user_agent=request.headers.get("User-Agent", "")[:500],
ip_address=_get_client_ip(request),
expires_at=expires_at,
)
crud.session.mark_as_current(db, session_id=created.id, user_id=user.id)
else:
if not session.is_active:
raise credentials_exception
if session.expires_at and session.expires_at < datetime.utcnow():
raise credentials_exception
# Update last seen opportunistically to reduce write amplification.
now = datetime.utcnow()
if session.last_active_at and (now - session.last_active_at).total_seconds() >= 60:
crud.session.update_activity(db, token=token, ip_address=_get_client_ip(request))
if not session.is_current:
crud.session.mark_as_current(db, session_id=session.id, user_id=user.id)
return user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user)
) -> models.User:
"""
Dependency that requires the current user to be a superuser.
"""
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
# Alias for backward compatibility
get_current_superuser = get_current_active_superuser