import uuid from fastapi import HTTPException, status from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession from app.models.document import Document from app.utils.file_storage import delete_file async def create_document( db: AsyncSession, user_id: uuid.UUID, filename: str, original_filename: str, storage_path: str, mime_type: str, file_size: int, doc_type: str = "other", ) -> Document: doc = Document( user_id=user_id, filename=filename, original_filename=original_filename, storage_path=storage_path, mime_type=mime_type, file_size=file_size, doc_type=doc_type, processing_status="pending", ) db.add(doc) await db.flush() return doc async def get_document(db: AsyncSession, doc_id: uuid.UUID, user_id: uuid.UUID) -> Document: result = await db.execute( select(Document).where(Document.id == doc_id, Document.user_id == user_id) ) doc = result.scalar_one_or_none() if not doc: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Document not found") return doc async def get_user_documents( db: AsyncSession, user_id: uuid.UUID, doc_type: str | None = None, processing_status: str | None = None, ) -> list[Document]: stmt = select(Document).where(Document.user_id == user_id) if doc_type: stmt = stmt.where(Document.doc_type == doc_type) if processing_status: stmt = stmt.where(Document.processing_status == processing_status) stmt = stmt.order_by(Document.created_at.desc()) result = await db.execute(stmt) return list(result.scalars().all()) async def delete_document(db: AsyncSession, doc_id: uuid.UUID, user_id: uuid.UUID) -> None: doc = await get_document(db, doc_id, user_id) storage_path = doc.storage_path await db.delete(doc) await db.flush() delete_file(storage_path) async def search_documents(db: AsyncSession, user_id: uuid.UUID, query: str, limit: int = 5) -> list[Document]: stmt = ( select(Document) .where( Document.user_id == user_id, Document.processing_status == "completed", text("to_tsvector('english', coalesce(extracted_text, '')) @@ plainto_tsquery('english', :query)"), ) .params(query=query) .order_by( text("ts_rank(to_tsvector('english', coalesce(extracted_text, '')), plainto_tsquery('english', :query)) DESC") ) .params(query=query) .limit(limit) ) result = await db.execute(stmt) return list(result.scalars().all()) async def update_document_text( db: AsyncSession, doc_id: uuid.UUID, extracted_text: str, status_val: str = "completed" ) -> None: result = await db.execute(select(Document).where(Document.id == doc_id)) doc = result.scalar_one_or_none() if doc: doc.extracted_text = extracted_text doc.processing_status = status_val await db.flush()