Backend: - Add 2FA authentication with TOTP support - Add API keys management system - Add audit logging for security events - Add file upload/management system - Add notifications system with preferences - Add session management - Add webhooks integration - Add analytics endpoints - Add export functionality - Add password policy enforcement - Add new database migrations for core tables Frontend: - Add module position system (top/bottom sidebar sections) - Add search and notifications module configuration tabs - Add mobile logo replacing hamburger menu - Center page title absolutely when no tabs present - Align sidebar footer toggles with navigation items - Add lighter icon color in dark theme for mobile - Add API keys management page - Add notifications page with context - Add admin analytics and audit logs pages 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
"""Shared dependencies for FastAPI dependency injection."""
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Generator
|
|
from fastapi import Depends, HTTPException, status, Request
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.orm import Session
|
|
from jose import JWTError, jwt
|
|
|
|
from app.db.session import SessionLocal
|
|
from app.config import settings
|
|
from app.core.security import decode_access_token
|
|
from app import models, crud
|
|
|
|
|
|
# Database dependency
|
|
def get_db() -> Generator[Session, None, None]:
|
|
"""
|
|
Dependency that provides a database session.
|
|
Automatically closes the session after the request.
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Security
|
|
security = HTTPBearer()
|
|
|
|
def _get_client_ip(request: Request) -> str:
|
|
forwarded = request.headers.get("X-Forwarded-For")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
def get_current_user(
|
|
request: Request,
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db)
|
|
) -> models.User:
|
|
"""
|
|
Dependency that validates JWT token and returns current user.
|
|
Raises HTTPException if token is invalid or user not found.
|
|
"""
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
try:
|
|
token = credentials.credentials
|
|
|
|
# API key authentication (programmatic access)
|
|
if token.startswith("sk_"):
|
|
api_key = crud.api_key.authenticate(db, plain_key=token, ip_address=_get_client_ip(request))
|
|
if not api_key:
|
|
raise credentials_exception
|
|
|
|
user = crud.user.get(db, id=api_key.user_id)
|
|
if user is None or not user.is_active:
|
|
raise credentials_exception
|
|
return user
|
|
|
|
payload = decode_access_token(token)
|
|
user_id: str = payload.get("sub")
|
|
if user_id is None:
|
|
raise credentials_exception
|
|
if payload.get("temp") is True:
|
|
raise credentials_exception
|
|
except JWTError:
|
|
raise credentials_exception
|
|
|
|
user = crud.user.get(db, id=user_id)
|
|
if user is None:
|
|
raise credentials_exception
|
|
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Inactive user"
|
|
)
|
|
|
|
session = crud.session.get_by_token(db, token)
|
|
if not session:
|
|
# Backward compatibility: if a valid JWT exists without a session row,
|
|
# create a session record on first use so it can be managed/revoked.
|
|
exp = payload.get("exp")
|
|
expires_at = None
|
|
if isinstance(exp, (int, float)):
|
|
expires_at = datetime.fromtimestamp(exp, tz=timezone.utc).replace(tzinfo=None)
|
|
|
|
created = crud.session.create(
|
|
db,
|
|
user_id=user.id,
|
|
token=token,
|
|
user_agent=request.headers.get("User-Agent", "")[:500],
|
|
ip_address=_get_client_ip(request),
|
|
expires_at=expires_at,
|
|
)
|
|
crud.session.mark_as_current(db, session_id=created.id, user_id=user.id)
|
|
else:
|
|
if not session.is_active:
|
|
raise credentials_exception
|
|
if session.expires_at and session.expires_at < datetime.utcnow():
|
|
raise credentials_exception
|
|
|
|
# Update last seen opportunistically to reduce write amplification.
|
|
now = datetime.utcnow()
|
|
if session.last_active_at and (now - session.last_active_at).total_seconds() >= 60:
|
|
crud.session.update_activity(db, token=token, ip_address=_get_client_ip(request))
|
|
if not session.is_current:
|
|
crud.session.mark_as_current(db, session_id=session.id, user_id=user.id)
|
|
|
|
return user
|
|
|
|
|
|
def get_current_active_superuser(
|
|
current_user: models.User = Depends(get_current_user)
|
|
) -> models.User:
|
|
"""
|
|
Dependency that requires the current user to be a superuser.
|
|
"""
|
|
if not current_user.is_superuser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
return current_user
|
|
|
|
|
|
# Alias for backward compatibility
|
|
get_current_superuser = get_current_active_superuser
|