Files
app-service/backend/app/crud/file.py
matteoscrugli fc605f03c9 Improve file listing and fix notification metadata field
Backend:
- Optimize file listing for non-superusers with dedicated CRUD methods
- Add get_visible_for_user and count_visible_for_user for efficient queries
- Move /allowed-types/ and /max-size/ routes before /{file_id} for proper matching
- Rename notification 'metadata' field to 'extra_data' for clarity
- Fix settings export to use get_value() method

Frontend:
- Update NotificationItem interface to use extra_data field

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-20 22:27:08 +01:00

312 lines
10 KiB
Python

"""CRUD operations and storage service for files."""
import os
import json
import hashlib
import shutil
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional, List, BinaryIO
from sqlalchemy.orm import Session
from sqlalchemy import or_
from app.models.file import StoredFile
from app.schemas.file import FileCreate, FileUpdate, ALLOWED_CONTENT_TYPES, MAX_FILE_SIZE
from app.config import settings
class FileStorageService:
"""Service for handling file storage operations."""
def __init__(self, storage_path: str = None):
"""Initialize the storage service."""
configured_path = storage_path or os.getenv("FILE_STORAGE_PATH")
if configured_path:
self.storage_path = Path(configured_path)
else:
# Prefer persistent storage when running in the container (bind-mounted /config).
self.storage_path = Path("/config/uploads") if Path("/config").exists() else Path("./uploads")
self.storage_path.mkdir(parents=True, exist_ok=True)
def _get_file_path(self, file_id: str, filename: str) -> Path:
"""Generate the storage path for a file."""
# Organize files by date and ID for better management
date_prefix = datetime.utcnow().strftime("%Y/%m")
dir_path = self.storage_path / date_prefix
dir_path.mkdir(parents=True, exist_ok=True)
# Use file ID + original extension
ext = Path(filename).suffix
return dir_path / f"{file_id}{ext}"
def _calculate_hash(self, file: BinaryIO) -> str:
"""Calculate SHA-256 hash of file contents."""
sha256 = hashlib.sha256()
for chunk in iter(lambda: file.read(8192), b""):
sha256.update(chunk)
file.seek(0) # Reset file position
return sha256.hexdigest()
def save_file(
self,
file: BinaryIO,
filename: str,
file_id: str
) -> tuple[str, str]:
"""
Save a file to storage.
Returns (relative_path, file_hash).
"""
# Calculate hash
file_hash = self._calculate_hash(file)
# Get storage path
file_path = self._get_file_path(file_id, filename)
relative_path = str(file_path.relative_to(self.storage_path))
# Save file
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
return relative_path, file_hash
def get_file_path(self, relative_path: str) -> Path:
"""Get the full path for a stored file."""
return self.storage_path / relative_path
def delete_file(self, relative_path: str) -> bool:
"""Delete a file from storage."""
try:
file_path = self.storage_path / relative_path
if file_path.exists():
file_path.unlink()
return True
return False
except Exception:
return False
def file_exists(self, relative_path: str) -> bool:
"""Check if a file exists in storage."""
return (self.storage_path / relative_path).exists()
class CRUDFile:
"""CRUD operations for stored files."""
def __init__(self):
self.storage = FileStorageService()
def get(self, db: Session, id: str) -> Optional[StoredFile]:
"""Get a file by ID."""
return db.query(StoredFile).filter(
StoredFile.id == id,
StoredFile.is_deleted == False
).first()
def get_by_hash(self, db: Session, file_hash: str) -> Optional[StoredFile]:
"""Get a file by its hash (for deduplication)."""
return db.query(StoredFile).filter(
StoredFile.file_hash == file_hash,
StoredFile.is_deleted == False
).first()
def get_multi(
self,
db: Session,
*,
skip: int = 0,
limit: int = 100,
uploaded_by: Optional[str] = None,
is_public: Optional[bool] = None,
content_type: Optional[str] = None
) -> List[StoredFile]:
"""Get multiple files with filtering."""
query = db.query(StoredFile).filter(StoredFile.is_deleted == False)
if uploaded_by:
query = query.filter(StoredFile.uploaded_by == uploaded_by)
if is_public is not None:
query = query.filter(StoredFile.is_public == is_public)
if content_type:
query = query.filter(StoredFile.content_type.like(f"{content_type}%"))
return query.order_by(StoredFile.created_at.desc()).offset(skip).limit(limit).all()
def get_visible_for_user(
self,
db: Session,
*,
user_id: str,
skip: int = 0,
limit: int = 100,
is_public: Optional[bool] = None,
content_type: Optional[str] = None
) -> List[StoredFile]:
"""Get files visible to a user (own + public) with optional filtering."""
query = db.query(StoredFile).filter(
StoredFile.is_deleted == False,
or_(StoredFile.uploaded_by == user_id, StoredFile.is_public == True)
)
if is_public is not None:
query = query.filter(StoredFile.is_public == is_public)
if content_type:
query = query.filter(StoredFile.content_type.like(f"{content_type}%"))
return query.order_by(StoredFile.created_at.desc()).offset(skip).limit(limit).all()
def count(
self,
db: Session,
*,
uploaded_by: Optional[str] = None,
is_public: Optional[bool] = None,
content_type: Optional[str] = None
) -> int:
"""Count files with optional filtering."""
query = db.query(StoredFile).filter(StoredFile.is_deleted == False)
if uploaded_by:
query = query.filter(StoredFile.uploaded_by == uploaded_by)
if is_public is not None:
query = query.filter(StoredFile.is_public == is_public)
if content_type:
query = query.filter(StoredFile.content_type.like(f"{content_type}%"))
return query.count()
def count_visible_for_user(
self,
db: Session,
*,
user_id: str,
is_public: Optional[bool] = None,
content_type: Optional[str] = None
) -> int:
"""Count files visible to a user (own + public) with optional filtering."""
query = db.query(StoredFile).filter(
StoredFile.is_deleted == False,
or_(StoredFile.uploaded_by == user_id, StoredFile.is_public == True)
)
if is_public is not None:
query = query.filter(StoredFile.is_public == is_public)
if content_type:
query = query.filter(StoredFile.content_type.like(f"{content_type}%"))
return query.count()
def create(
self,
db: Session,
*,
file: BinaryIO,
filename: str,
content_type: Optional[str],
size_bytes: int,
uploaded_by: Optional[str] = None,
metadata: Optional[FileCreate] = None
) -> StoredFile:
"""Create a new file record and save the file."""
file_id = str(uuid.uuid4())
# Save file to storage
storage_path, file_hash = self.storage.save_file(file, filename, file_id)
# Create database record
db_obj = StoredFile(
id=file_id,
original_filename=filename,
content_type=content_type,
size_bytes=size_bytes,
storage_path=storage_path,
storage_type="local",
file_hash=file_hash,
uploaded_by=uploaded_by,
description=metadata.description if metadata else None,
tags=json.dumps(metadata.tags) if metadata and metadata.tags else None,
is_public=metadata.is_public if metadata else False
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: StoredFile,
obj_in: FileUpdate
) -> StoredFile:
"""Update file metadata."""
update_data = obj_in.model_dump(exclude_unset=True)
if "tags" in update_data and update_data["tags"] is not None:
update_data["tags"] = json.dumps(update_data["tags"])
for field, value in update_data.items():
setattr(db_obj, field, value)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def soft_delete(self, db: Session, *, id: str) -> Optional[StoredFile]:
"""Soft delete a file (marks as deleted but keeps record)."""
obj = db.query(StoredFile).filter(StoredFile.id == id).first()
if obj:
obj.is_deleted = True
obj.deleted_at = datetime.utcnow()
db.add(obj)
db.commit()
db.refresh(obj)
return obj
def hard_delete(self, db: Session, *, id: str) -> bool:
"""Permanently delete a file and its record."""
obj = db.query(StoredFile).filter(StoredFile.id == id).first()
if obj:
# Delete physical file
self.storage.delete_file(obj.storage_path)
# Delete database record
db.delete(obj)
db.commit()
return True
return False
def get_file_content(self, db_obj: StoredFile) -> Optional[Path]:
"""Get the path to the actual file."""
file_path = self.storage.get_file_path(db_obj.storage_path)
if file_path.exists():
return file_path
return None
def validate_upload(
self,
content_type: Optional[str],
size_bytes: int,
allowed_types: List[str] = None,
max_size: int = None
) -> tuple[bool, Optional[str]]:
"""
Validate a file upload.
Returns (is_valid, error_message).
"""
allowed = allowed_types or ALLOWED_CONTENT_TYPES
max_size = max_size or MAX_FILE_SIZE
if size_bytes > max_size:
return False, f"File size exceeds maximum allowed ({max_size // (1024*1024)} MB)"
if content_type and content_type not in allowed:
return False, f"File type '{content_type}' is not allowed"
return True, None
# Singleton instances
file_storage = CRUDFile()