"""Two-factor authentication (2FA) endpoints.""" import io import secrets import base64 from typing import Any from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from pydantic import BaseModel, Field import pyotp import qrcode from app.dependencies import get_db, get_current_user from app.models.user import User from app import crud from app.config import settings router = APIRouter() class TOTPSetupResponse(BaseModel): """Response for TOTP setup initiation.""" secret: str uri: str qr_code: str # Base64 encoded QR code image class TOTPVerifyRequest(BaseModel): """Request to verify TOTP code.""" code: str = Field(..., min_length=6, max_length=6) class TOTPDisableRequest(BaseModel): """Request to disable TOTP.""" password: str code: str = Field(..., min_length=6, max_length=6) class BackupCodesResponse(BaseModel): """Response with backup codes.""" backup_codes: list[str] def get_client_ip(request: Request) -> str: """Extract client IP from request.""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else "unknown" def generate_backup_codes(count: int = 10) -> list[str]: """Generate backup codes for 2FA recovery.""" return [secrets.token_hex(4).upper() for _ in range(count)] def generate_qr_code(uri: str) -> str: """Generate QR code as base64 encoded PNG.""" qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(uri) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") buffer.seek(0) return base64.b64encode(buffer.getvalue()).decode() @router.get("/status") def get_2fa_status( current_user: User = Depends(get_current_user), ) -> Any: """ Get 2FA status for the current user. """ return { "enabled": current_user.totp_enabled, "has_backup_codes": bool(current_user.backup_codes) } @router.post("/setup", response_model=TOTPSetupResponse) def setup_2fa( request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ) -> Any: """ Initiate 2FA setup by generating a new TOTP secret. Returns the secret, URI, and QR code for authenticator app setup. The user must verify the code before 2FA is enabled. """ if current_user.totp_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is already enabled" ) # Generate new secret secret = pyotp.random_base32() # Store secret temporarily (not enabled yet) current_user.totp_secret = secret db.add(current_user) db.commit() # Generate URI for authenticator app totp = pyotp.TOTP(secret) uri = totp.provisioning_uri( name=current_user.email, issuer_name=settings.APP_NAME ) # Generate QR code qr_code = generate_qr_code(uri) # Log the action crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_setup_initiated", resource_type="user", resource_id=current_user.id, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="success" ) return { "secret": secret, "uri": uri, "qr_code": qr_code } @router.post("/verify") def verify_and_enable_2fa( request: Request, verify_request: TOTPVerifyRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ) -> Any: """ Verify TOTP code and enable 2FA. This must be called after setup to complete the 2FA activation. """ if current_user.totp_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is already enabled" ) if not current_user.totp_secret: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA setup not initiated. Call /setup first." ) # Verify the code totp = pyotp.TOTP(current_user.totp_secret) if not totp.verify(verify_request.code, valid_window=1): crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_verify", resource_type="user", resource_id=current_user.id, details={"reason": "invalid_code"}, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="failure" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code" ) # Generate backup codes backup_codes = generate_backup_codes() # Enable 2FA current_user.totp_enabled = True current_user.backup_codes = backup_codes db.add(current_user) db.commit() # Log the action crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_enabled", resource_type="user", resource_id=current_user.id, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="success" ) return { "message": "2FA enabled successfully", "backup_codes": backup_codes } @router.post("/disable") def disable_2fa( request: Request, disable_request: TOTPDisableRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ) -> Any: """ Disable 2FA. Requires password and current TOTP code. """ if not current_user.totp_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is not enabled" ) # Verify password from app.core.security import verify_password if not verify_password(disable_request.password, current_user.hashed_password): crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_disable", resource_type="user", resource_id=current_user.id, details={"reason": "invalid_password"}, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="failure" ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid password" ) # Verify TOTP code totp = pyotp.TOTP(current_user.totp_secret) if not totp.verify(disable_request.code, valid_window=1): crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_disable", resource_type="user", resource_id=current_user.id, details={"reason": "invalid_code"}, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="failure" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code" ) # Disable 2FA current_user.totp_enabled = False current_user.totp_secret = None current_user.totp_backup_codes = None db.add(current_user) db.commit() # Log the action crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_disabled", resource_type="user", resource_id=current_user.id, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="success" ) return {"message": "2FA disabled successfully"} @router.post("/regenerate-backup-codes", response_model=BackupCodesResponse) def regenerate_backup_codes( request: Request, verify_request: TOTPVerifyRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ) -> Any: """ Regenerate backup codes. Requires current TOTP code. """ if not current_user.totp_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is not enabled" ) # Verify TOTP code totp = pyotp.TOTP(current_user.totp_secret) if not totp.verify(verify_request.code, valid_window=1): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code" ) # Generate new backup codes backup_codes = generate_backup_codes() current_user.backup_codes = backup_codes db.add(current_user) db.commit() # Log the action crud.audit_log.log_action( db, user_id=current_user.id, username=current_user.username, action="2fa_backup_codes_regenerated", resource_type="user", resource_id=current_user.id, ip_address=get_client_ip(request), user_agent=request.headers.get("User-Agent", "")[:500], status="success" ) return {"backup_codes": backup_codes} def verify_totp_or_backup(user: User, code: str, db: Session) -> bool: """ Verify TOTP code or backup code. Returns True if valid, False otherwise. If backup code is used, it's removed from the list. """ if not user.totp_enabled or not user.totp_secret: return True # 2FA not enabled # Try TOTP verification first totp = pyotp.TOTP(user.totp_secret) if totp.verify(code, valid_window=1): return True # Try backup code backup_codes = user.backup_codes code_upper = code.upper().replace("-", "") if code_upper in backup_codes: backup_codes.remove(code_upper) user.backup_codes = backup_codes db.add(user) db.commit() return True return False