Backend: - Add 2FA authentication with TOTP support - Add API keys management system - Add audit logging for security events - Add file upload/management system - Add notifications system with preferences - Add session management - Add webhooks integration - Add analytics endpoints - Add export functionality - Add password policy enforcement - Add new database migrations for core tables Frontend: - Add module position system (top/bottom sidebar sections) - Add search and notifications module configuration tabs - Add mobile logo replacing hamburger menu - Center page title absolutely when no tabs present - Align sidebar footer toggles with navigation items - Add lighter icon color in dark theme for mobile - Add API keys management page - Add notifications page with context - Add admin analytics and audit logs pages 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
275 lines
8.1 KiB
Python
275 lines
8.1 KiB
Python
"""CRUD operations for User Session model."""
|
|
|
|
import hashlib
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import desc
|
|
|
|
from app.models.session import UserSession
|
|
from app.schemas.session import SessionCreate
|
|
|
|
|
|
def hash_token(token: str) -> str:
|
|
"""Hash a token for secure storage."""
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
|
|
def parse_user_agent(user_agent: str) -> dict:
|
|
"""Parse user agent string to extract device info."""
|
|
result = {
|
|
"device_type": "desktop",
|
|
"browser": "Unknown",
|
|
"os": "Unknown"
|
|
}
|
|
|
|
if not user_agent:
|
|
return result
|
|
|
|
ua_lower = user_agent.lower()
|
|
|
|
# Detect device type
|
|
if "mobile" in ua_lower or "android" in ua_lower and "mobile" in ua_lower:
|
|
result["device_type"] = "mobile"
|
|
elif "tablet" in ua_lower or "ipad" in ua_lower:
|
|
result["device_type"] = "tablet"
|
|
|
|
# Detect OS
|
|
if "windows" in ua_lower:
|
|
result["os"] = "Windows"
|
|
elif "mac os" in ua_lower or "macintosh" in ua_lower:
|
|
result["os"] = "macOS"
|
|
elif "linux" in ua_lower:
|
|
result["os"] = "Linux"
|
|
elif "android" in ua_lower:
|
|
result["os"] = "Android"
|
|
elif "iphone" in ua_lower or "ipad" in ua_lower:
|
|
result["os"] = "iOS"
|
|
|
|
# Detect browser
|
|
if "firefox" in ua_lower:
|
|
result["browser"] = "Firefox"
|
|
elif "edg" in ua_lower:
|
|
result["browser"] = "Edge"
|
|
elif "chrome" in ua_lower:
|
|
result["browser"] = "Chrome"
|
|
elif "safari" in ua_lower:
|
|
result["browser"] = "Safari"
|
|
elif "opera" in ua_lower:
|
|
result["browser"] = "Opera"
|
|
|
|
return result
|
|
|
|
|
|
class CRUDSession:
|
|
"""CRUD operations for User Session model."""
|
|
|
|
def create(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
user_id: str,
|
|
token: str,
|
|
user_agent: Optional[str] = None,
|
|
ip_address: Optional[str] = None,
|
|
expires_at: Optional[datetime] = None
|
|
) -> UserSession:
|
|
"""Create a new session."""
|
|
token_hash = hash_token(token)
|
|
parsed_ua = parse_user_agent(user_agent or "")
|
|
|
|
# Generate device name
|
|
device_name = f"{parsed_ua['browser']} on {parsed_ua['os']}"
|
|
|
|
db_obj = UserSession(
|
|
user_id=user_id,
|
|
token_hash=token_hash,
|
|
device_name=device_name,
|
|
device_type=parsed_ua["device_type"],
|
|
browser=parsed_ua["browser"],
|
|
os=parsed_ua["os"],
|
|
user_agent=user_agent[:500] if user_agent else None,
|
|
ip_address=ip_address,
|
|
expires_at=expires_at,
|
|
is_active=True
|
|
)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def get(self, db: Session, id: str) -> Optional[UserSession]:
|
|
"""Get a session by ID."""
|
|
return db.query(UserSession).filter(UserSession.id == id).first()
|
|
|
|
def get_by_token(self, db: Session, token: str) -> Optional[UserSession]:
|
|
"""Get a session by token."""
|
|
token_hash = hash_token(token)
|
|
return db.query(UserSession).filter(UserSession.token_hash == token_hash).first()
|
|
|
|
def get_multi_by_user(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
user_id: str,
|
|
active_only: bool = True
|
|
) -> List[UserSession]:
|
|
"""Get all sessions for a user."""
|
|
query = db.query(UserSession).filter(UserSession.user_id == user_id)
|
|
|
|
if active_only:
|
|
query = query.filter(UserSession.is_active == True)
|
|
|
|
return query.order_by(desc(UserSession.last_active_at)).all()
|
|
|
|
def count_by_user(self, db: Session, user_id: str, active_only: bool = True) -> int:
|
|
"""Count sessions for a user."""
|
|
query = db.query(UserSession).filter(UserSession.user_id == user_id)
|
|
if active_only:
|
|
query = query.filter(UserSession.is_active == True)
|
|
return query.count()
|
|
|
|
def count_active_by_user(self, db: Session, user_id: str) -> int:
|
|
"""Count active sessions for a user."""
|
|
return self.count_by_user(db, user_id, active_only=True)
|
|
|
|
def update_activity(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
token: str,
|
|
ip_address: Optional[str] = None
|
|
) -> Optional[UserSession]:
|
|
"""Update session last activity."""
|
|
token_hash = hash_token(token)
|
|
db_obj = db.query(UserSession).filter(UserSession.token_hash == token_hash).first()
|
|
|
|
if db_obj and db_obj.is_active:
|
|
db_obj.last_active_at = datetime.utcnow()
|
|
if ip_address:
|
|
db_obj.ip_address = ip_address
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
return db_obj
|
|
|
|
def mark_as_current(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
session_id: str,
|
|
user_id: str
|
|
) -> None:
|
|
"""Mark a session as current and unmark others."""
|
|
# Unmark all sessions for user
|
|
db.query(UserSession)\
|
|
.filter(UserSession.user_id == user_id)\
|
|
.update({"is_current": False})
|
|
|
|
# Mark specific session as current
|
|
db.query(UserSession)\
|
|
.filter(UserSession.id == session_id)\
|
|
.update({"is_current": True})
|
|
|
|
db.commit()
|
|
|
|
def revoke(self, db: Session, *, id: str, user_id: str) -> Optional[UserSession]:
|
|
"""Revoke a specific session."""
|
|
db_obj = db.query(UserSession)\
|
|
.filter(UserSession.id == id)\
|
|
.filter(UserSession.user_id == user_id)\
|
|
.first()
|
|
|
|
if db_obj:
|
|
db_obj.is_active = False
|
|
db_obj.revoked_at = datetime.utcnow()
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
return db_obj
|
|
|
|
def revoke_all_except(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
user_id: str,
|
|
except_session_id: Optional[str] = None
|
|
) -> int:
|
|
"""Revoke all sessions for a user except the specified one."""
|
|
query = db.query(UserSession)\
|
|
.filter(UserSession.user_id == user_id)\
|
|
.filter(UserSession.is_active == True)
|
|
|
|
if except_session_id:
|
|
query = query.filter(UserSession.id != except_session_id)
|
|
|
|
count = query.update({
|
|
"is_active": False,
|
|
"revoked_at": datetime.utcnow()
|
|
})
|
|
db.commit()
|
|
return count
|
|
|
|
def revoke_multiple(
|
|
self,
|
|
db: Session,
|
|
*,
|
|
user_id: str,
|
|
session_ids: List[str]
|
|
) -> int:
|
|
"""Revoke multiple sessions."""
|
|
count = db.query(UserSession)\
|
|
.filter(UserSession.id.in_(session_ids))\
|
|
.filter(UserSession.user_id == user_id)\
|
|
.filter(UserSession.is_active == True)\
|
|
.update({
|
|
"is_active": False,
|
|
"revoked_at": datetime.utcnow()
|
|
}, synchronize_session=False)
|
|
db.commit()
|
|
return count
|
|
|
|
def cleanup_expired(self, db: Session) -> int:
|
|
"""Clean up expired sessions."""
|
|
now = datetime.utcnow()
|
|
count = db.query(UserSession)\
|
|
.filter(UserSession.expires_at < now)\
|
|
.filter(UserSession.is_active == True)\
|
|
.update({
|
|
"is_active": False,
|
|
"revoked_at": now
|
|
})
|
|
db.commit()
|
|
return count
|
|
|
|
def is_valid(self, db: Session, token: str) -> bool:
|
|
"""Check if a session token is valid."""
|
|
session = self.get_by_token(db, token)
|
|
if not session:
|
|
return False
|
|
|
|
if not session.is_active:
|
|
return False
|
|
|
|
if session.expires_at and session.expires_at < datetime.utcnow():
|
|
return False
|
|
|
|
return True
|
|
|
|
def delete_old_inactive(self, db: Session, days: int = 30) -> int:
|
|
"""Delete old inactive sessions."""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
count = db.query(UserSession)\
|
|
.filter(UserSession.is_active == False)\
|
|
.filter(UserSession.revoked_at < cutoff)\
|
|
.delete()
|
|
db.commit()
|
|
return count
|
|
|
|
|
|
# Create instance
|
|
session = CRUDSession()
|