Initial commit

This commit is contained in:
2025-12-04 22:24:47 +01:00
commit 453ce10494
106 changed files with 17145 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""API v1 package - exports the main v1 router."""
from app.api.v1.router import router
__all__ = ["router"]

155
backend/app/api/v1/auth.py Normal file
View File

@@ -0,0 +1,155 @@
"""Authentication endpoints."""
from datetime import datetime, timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.dependencies import get_db, get_current_user
from app.core.security import create_access_token
from app.config import settings
from app.models.user import User
router = APIRouter()
@router.post("/register", response_model=schemas.User, status_code=status.HTTP_201_CREATED)
def register(
*,
db: Session = Depends(get_db),
user_in: schemas.RegisterRequest
) -> Any:
"""
Register a new user.
Creates a new user account with the provided credentials.
Registration can be disabled by administrators via settings.
"""
# Check if this is the first user (always allow for initial setup)
user_count = db.query(User).count()
is_first_user = user_count == 0
# If not the first user, check if registration is enabled
if not is_first_user:
registration_enabled = crud.settings.get_setting_value(
db,
key="registration_enabled",
default=True # Default to enabled if setting doesn't exist
)
if not registration_enabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User registration is currently disabled"
)
# Check if username already exists
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Check if email already exists
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user (first user becomes superuser)
user_create = schemas.UserCreate(
username=user_in.username,
email=user_in.email,
password=user_in.password,
is_active=True,
is_superuser=is_first_user # First user is superuser
)
user = crud.user.create(db, obj_in=user_create)
return user
@router.post("/login", response_model=schemas.Token)
def login(
*,
db: Session = Depends(get_db),
credentials: schemas.LoginRequest
) -> Any:
"""
Login and get access token.
Authenticates user and returns a JWT access token.
"""
# Authenticate user
user = crud.user.authenticate(
db,
username=credentials.username,
password=credentials.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
# Update last_login timestamp
user.last_login = datetime.utcnow()
db.add(user)
db.commit()
# Create access token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.id},
expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer"
}
@router.get("/me", response_model=schemas.User)
def read_users_me(
current_user: User = Depends(get_current_user)
) -> Any:
"""
Get current user.
Returns the currently authenticated user's information.
Requires a valid JWT token in the Authorization header.
"""
return current_user
@router.get("/registration-status")
def get_registration_status(
db: Session = Depends(get_db)
) -> Any:
"""
Check if user registration is enabled.
This is a public endpoint that doesn't require authentication.
Returns the registration_enabled setting value.
"""
registration_enabled = crud.settings.get_setting_value(
db,
key="registration_enabled",
default=True
)
return {"registration_enabled": registration_enabled}

View File

@@ -0,0 +1,34 @@
"""Health check endpoints."""
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import text
from app.dependencies import get_db
from app.config import settings
router = APIRouter()
@router.get("")
async def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint that verifies database connectivity.
Returns:
Dictionary with health status and database connectivity
"""
try:
# Test database connection
db.execute(text("SELECT 1"))
db_status = "connected"
except Exception as e:
db_status = f"error: {str(e)}"
return {
"status": "healthy" if db_status == "connected" else "unhealthy",
"app": settings.APP_NAME,
"version": settings.APP_VERSION,
"database": db_status
}

View File

@@ -0,0 +1,15 @@
"""Main API v1 router that aggregates all sub-routers."""
from fastapi import APIRouter
from app.api.v1 import health, auth, users, settings
# Create main API v1 router
router = APIRouter()
# Include all sub-routers
router.include_router(health.router, prefix="/health", tags=["Health"])
router.include_router(auth.router, prefix="/auth", tags=["Authentication"])
router.include_router(users.router, prefix="/users", tags=["Users"])
router.include_router(settings.router, prefix="/settings", tags=["Settings"])

View File

@@ -0,0 +1,184 @@
"""Settings endpoints."""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.dependencies import get_db, get_current_superuser, get_current_user
from app.models.user import User
from app.core.settings_registry import (
THEME_KEYS,
MODULE_KEYS,
SETTINGS_REGISTRY,
get_default_value,
)
router = APIRouter()
@router.get("/theme", response_model=dict[str, Any])
def get_theme_settings(
*,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> Any:
"""
Get theme settings (accessible to all authenticated users).
Returns theme-related settings that apply to all users.
"""
result = {}
for key in THEME_KEYS:
setting = crud.settings.get_setting(db, key=key)
if setting:
result[key] = setting.get_value()
else:
# Use default from registry
result[key] = get_default_value(key)
return result
@router.put("/theme", response_model=dict[str, Any])
def update_theme_settings(
*,
db: Session = Depends(get_db),
theme_data: dict[str, Any],
current_user: User = Depends(get_current_superuser)
) -> Any:
"""
Update theme settings (admin only).
Updates multiple theme settings at once.
"""
result = {}
for key, value in theme_data.items():
if key in THEME_KEYS or key.startswith("theme_"):
setting = crud.settings.update_setting(db, key=key, value=value)
result[key] = setting.get_value()
return result
@router.get("/modules", response_model=dict[str, Any])
def get_module_settings(
*,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> Any:
"""
Get module settings (accessible to all authenticated users).
Returns module enabled/disabled states.
"""
result = {}
for key in MODULE_KEYS:
setting = crud.settings.get_setting(db, key=key)
if setting:
result[key] = setting.get_value()
else:
# Use default from registry
result[key] = get_default_value(key)
return result
@router.put("/modules", response_model=dict[str, Any])
def update_module_settings(
*,
db: Session = Depends(get_db),
module_data: dict[str, Any],
current_user: User = Depends(get_current_superuser)
) -> Any:
"""
Update module settings (admin only).
Enable or disable modules for all users.
"""
result = {}
for key, value in module_data.items():
if key in MODULE_KEYS or key.startswith("module_"):
setting = crud.settings.update_setting(db, key=key, value=value)
result[key] = setting.get_value()
return result
@router.get("/user_mode_enabled", response_model=dict[str, Any])
def get_user_mode_enabled(
*,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
) -> Any:
"""
Get user mode enabled setting (accessible to all authenticated users).
"""
setting = crud.settings.get_setting(db, key="user_mode_enabled")
if setting:
return {"key": "user_mode_enabled", "value": setting.get_value()}
# Use default from registry
return {"key": "user_mode_enabled", "value": get_default_value("user_mode_enabled")}
@router.put("/user_mode_enabled", response_model=dict[str, Any])
def update_user_mode_enabled(
*,
db: Session = Depends(get_db),
data: dict[str, Any],
current_user: User = Depends(get_current_superuser)
) -> Any:
"""
Update user mode enabled setting (admin only).
"""
value = data.get("value", True)
setting = crud.settings.update_setting(db, key="user_mode_enabled", value=value)
return {"key": "user_mode_enabled", "value": setting.get_value()}
@router.get("", response_model=dict[str, Any])
def get_all_settings(
*,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_superuser)
) -> Any:
"""
Get all settings (admin only).
Returns all application settings as a dictionary.
"""
settings_list = crud.settings.get_all_settings(db)
return {s.key: s.get_value() for s in settings_list}
@router.get("/{key}", response_model=schemas.Setting)
def get_setting(
*,
db: Session = Depends(get_db),
key: str,
current_user: User = Depends(get_current_superuser)
) -> Any:
"""
Get a specific setting by key (admin only).
"""
setting = crud.settings.get_setting(db, key=key)
if not setting:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Setting '{key}' not found"
)
return schemas.Setting.from_orm(setting)
@router.put("/{key}", response_model=schemas.Setting)
def update_setting(
*,
db: Session = Depends(get_db),
key: str,
setting_in: schemas.SettingUpdate,
current_user: User = Depends(get_current_superuser)
) -> Any:
"""
Update a setting (admin only).
Creates the setting if it doesn't exist.
"""
setting = crud.settings.update_setting(db, key=key, value=setting_in.value)
return schemas.Setting.from_orm(setting)

160
backend/app/api/v1/users.py Normal file
View File

@@ -0,0 +1,160 @@
"""User management endpoints (admin only)."""
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app import crud, schemas
from app.dependencies import get_db, get_current_user, get_current_active_superuser
from app.models.user import User
router = APIRouter()
@router.get("", response_model=List[schemas.User])
def list_users(
*,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_superuser),
skip: int = 0,
limit: int = 100
) -> Any:
"""
List all users (admin only).
Requires superuser privileges.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.get("/{user_id}", response_model=schemas.User)
def get_user(
*,
db: Session = Depends(get_db),
user_id: str,
current_user: User = Depends(get_current_user)
) -> Any:
"""
Get user by ID.
Users can view their own profile, admins can view any profile.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Check if user is viewing their own profile or is superuser
if user.id != current_user.id and not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(get_db),
user_id: str,
user_in: schemas.UserUpdate,
current_user: User = Depends(get_current_user)
) -> Any:
"""
Update user.
Users can update their own profile, admins can update any profile.
Only admins can change is_superuser and is_active flags.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Check permissions
if user.id != current_user.id and not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
# Only superusers can change is_superuser and is_active
if not current_user.is_superuser:
if user_in.is_superuser is not None or user_in.is_active is not None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can change user status"
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user
@router.delete("/{user_id}", response_model=schemas.User)
def delete_user(
*,
db: Session = Depends(get_db),
user_id: str,
current_user: User = Depends(get_current_active_superuser)
) -> Any:
"""
Delete user (admin only).
Requires superuser privileges.
Users cannot delete themselves.
"""
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Users cannot delete themselves"
)
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user = crud.user.remove(db, id=user_id)
return user
@router.post("", response_model=schemas.User, status_code=status.HTTP_201_CREATED)
def create_user(
*,
db: Session = Depends(get_db),
user_in: schemas.UserCreate,
current_user: User = Depends(get_current_active_superuser)
) -> Any:
"""
Create new user (admin only).
Requires superuser privileges.
"""
# Check if username already exists
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Check if email already exists
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
user = crud.user.create(db, obj_in=user_in)
return user