|
|
|
|
|
"""
|
|
|
file_pipeline.py
|
|
|
|
|
|
Universal file pipeline for GAKR AI.
|
|
|
|
|
|
Responsibilities:
|
|
|
- Create dataupload/ folder structure (if not present)
|
|
|
- Save uploaded files to disk
|
|
|
- Detect file type
|
|
|
- Run type-specific extractors
|
|
|
- Return structured, text-friendly context for Phi-3
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import os
|
|
|
import mimetypes
|
|
|
from datetime import datetime
|
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
import pandas as pd
|
|
|
import pdfplumber
|
|
|
import docx
|
|
|
import fitz
|
|
|
from PIL import Image
|
|
|
import pytesseract
|
|
|
import whisper
|
|
|
import ffmpeg
|
|
|
from fastapi import UploadFile
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
UPLOAD_ROOT = os.path.join(BASE_DIR, "dataupload")
|
|
|
|
|
|
FOLDERS = {
|
|
|
"image": "images",
|
|
|
"video": "videos",
|
|
|
"audio": "audio",
|
|
|
"document": "documents",
|
|
|
"tabular": "tabular",
|
|
|
"other": "other",
|
|
|
}
|
|
|
|
|
|
|
|
|
def ensure_folders() -> None:
|
|
|
"""
|
|
|
Ensure base upload folder and all subfolders exist.
|
|
|
"""
|
|
|
os.makedirs(UPLOAD_ROOT, exist_ok=True)
|
|
|
for sub in FOLDERS.values():
|
|
|
os.makedirs(os.path.join(UPLOAD_ROOT, sub), exist_ok=True)
|
|
|
|
|
|
|
|
|
ensure_folders()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_kind(filename: str, content_type: str | None) -> str:
|
|
|
"""
|
|
|
Decide logical kind: tabular, document, image, audio, video, other.
|
|
|
"""
|
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
|
|
|
|
if ext in [".csv", ".xlsx", ".xls", ".json"]:
|
|
|
return "tabular"
|
|
|
if ext in [".pdf", ".txt"]:
|
|
|
return "document"
|
|
|
if ext in [".docx"]:
|
|
|
return "document"
|
|
|
if ext in [".png", ".jpg", ".jpeg", ".webp", ".bmp"]:
|
|
|
return "image"
|
|
|
if ext in [".mp3", ".wav", ".m4a"]:
|
|
|
return "audio"
|
|
|
if ext in [".mp4", ".mkv", ".mov", ".avi"]:
|
|
|
return "video"
|
|
|
|
|
|
if content_type:
|
|
|
if content_type.startswith("image/"):
|
|
|
return "image"
|
|
|
if content_type.startswith("audio/"):
|
|
|
return "audio"
|
|
|
if content_type.startswith("video/"):
|
|
|
return "video"
|
|
|
|
|
|
return "other"
|
|
|
|
|
|
|
|
|
def make_target_path(kind: str, filename: str) -> str:
|
|
|
"""
|
|
|
Build a safe, timestamped filepath under dataupload/{sub}/.
|
|
|
"""
|
|
|
sub = FOLDERS.get(kind, "other")
|
|
|
safe_name = os.path.basename(filename)
|
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
|
|
|
final_name = f"{timestamp}_{safe_name}"
|
|
|
return os.path.join(UPLOAD_ROOT, sub, final_name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def process_files(files: List[UploadFile]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Save all files, run analyses, and return structured context.
|
|
|
|
|
|
Output example:
|
|
|
{
|
|
|
"files": [
|
|
|
{
|
|
|
"original_name": "...",
|
|
|
"stored_path": "dataupload/documents/...",
|
|
|
"kind": "document",
|
|
|
"summary": { ... }
|
|
|
},
|
|
|
...
|
|
|
]
|
|
|
}
|
|
|
"""
|
|
|
ensure_folders()
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
|
|
|
for uf in files:
|
|
|
try:
|
|
|
kind = detect_kind(uf.filename, uf.content_type)
|
|
|
target_path = make_target_path(kind, uf.filename)
|
|
|
|
|
|
|
|
|
try:
|
|
|
with open(target_path, "wb") as out:
|
|
|
data = await uf.read()
|
|
|
out.write(data)
|
|
|
except Exception as save_err:
|
|
|
results.append(
|
|
|
{
|
|
|
"original_name": uf.filename,
|
|
|
"stored_path": None,
|
|
|
"kind": kind,
|
|
|
"summary": {
|
|
|
"error": f"Failed to save file: {save_err}"
|
|
|
},
|
|
|
}
|
|
|
)
|
|
|
continue
|
|
|
|
|
|
|
|
|
try:
|
|
|
summary = analyze_file(target_path, kind)
|
|
|
except Exception as analyze_err:
|
|
|
summary = {
|
|
|
"error": f"Unexpected error in analysis: {analyze_err}"
|
|
|
}
|
|
|
|
|
|
results.append(
|
|
|
{
|
|
|
"original_name": uf.filename,
|
|
|
"stored_path": os.path.relpath(target_path, BASE_DIR),
|
|
|
"kind": kind,
|
|
|
"summary": summary,
|
|
|
}
|
|
|
)
|
|
|
|
|
|
except Exception as outer_err:
|
|
|
results.append(
|
|
|
{
|
|
|
"original_name": getattr(uf, "filename", "unknown"),
|
|
|
"stored_path": None,
|
|
|
"kind": "unknown",
|
|
|
"summary": {
|
|
|
"error": f"Fatal error while handling file: {outer_err}"
|
|
|
},
|
|
|
}
|
|
|
)
|
|
|
|
|
|
return {"files": results}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_file(path: str, kind: str) -> Dict[str, Any]:
|
|
|
if kind == "tabular":
|
|
|
return analyze_tabular(path)
|
|
|
if kind == "document":
|
|
|
return analyze_document(path)
|
|
|
if kind == "image":
|
|
|
return analyze_image(path)
|
|
|
if kind == "audio":
|
|
|
return analyze_audio(path)
|
|
|
if kind == "video":
|
|
|
return analyze_video(path)
|
|
|
return {"type": "other", "note": "Unsupported or unknown file type"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_tabular(path: str) -> Dict[str, Any]:
|
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
df = None
|
|
|
|
|
|
try:
|
|
|
if ext == ".csv":
|
|
|
df = pd.read_csv(path)
|
|
|
elif ext in [".xlsx", ".xls"]:
|
|
|
df = pd.read_excel(path)
|
|
|
elif ext == ".json":
|
|
|
df = pd.read_json(path)
|
|
|
else:
|
|
|
return {
|
|
|
"type": "tabular",
|
|
|
"error": f"Unsupported tabular format: {ext}",
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {"type": "tabular", "error": f"Failed to load table: {e}"}
|
|
|
|
|
|
summary: Dict[str, Any] = {
|
|
|
"type": "tabular",
|
|
|
"rows": int(df.shape[0]),
|
|
|
"columns": [str(c) for c in df.columns],
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
summary["missing_values"] = df.isna().sum().to_dict()
|
|
|
except Exception as e:
|
|
|
summary["missing_values_error"] = str(e)
|
|
|
|
|
|
try:
|
|
|
summary["numeric_stats"] = df.describe(include="number").to_dict()
|
|
|
except Exception:
|
|
|
summary["numeric_stats"] = {}
|
|
|
|
|
|
return summary
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_document(path: str) -> Dict[str, Any]:
|
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
text = ""
|
|
|
|
|
|
try:
|
|
|
if ext == ".pdf":
|
|
|
|
|
|
try:
|
|
|
with pdfplumber.open(path) as pdf:
|
|
|
pages = []
|
|
|
for page in pdf.pages[:10]:
|
|
|
t = page.extract_text()
|
|
|
if t:
|
|
|
pages.append(t)
|
|
|
text = "\n".join(pages)
|
|
|
except Exception:
|
|
|
|
|
|
doc = fitz.open(path)
|
|
|
chunks = []
|
|
|
for page in doc[:10]:
|
|
|
chunks.append(page.get_text())
|
|
|
text = "\n".join(chunks)
|
|
|
elif ext == ".docx":
|
|
|
d = docx.Document(path)
|
|
|
paras = [p.text for p in d.paragraphs if p.text.strip()]
|
|
|
text = "\n".join(paras)
|
|
|
else:
|
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
|
|
text = f.read()
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
"type": "document",
|
|
|
"error": f"Failed to extract document text: {e}",
|
|
|
}
|
|
|
|
|
|
short = text[:4000]
|
|
|
return {
|
|
|
"type": "document",
|
|
|
"char_count": len(text),
|
|
|
"preview": short,
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_image(path: str) -> Dict[str, Any]:
|
|
|
try:
|
|
|
img = Image.open(path)
|
|
|
except Exception as e:
|
|
|
return {"type": "image", "error": f"Failed to open image: {e}"}
|
|
|
|
|
|
try:
|
|
|
text = pytesseract.image_to_string(img)
|
|
|
except Exception as e:
|
|
|
text = ""
|
|
|
ocr_error = str(e)
|
|
|
else:
|
|
|
ocr_error = None
|
|
|
|
|
|
short = text[:2000]
|
|
|
|
|
|
result: Dict[str, Any] = {
|
|
|
"type": "image",
|
|
|
"size": {"width": img.width, "height": img.height},
|
|
|
"ocr_preview": short,
|
|
|
}
|
|
|
if ocr_error:
|
|
|
result["ocr_error"] = ocr_error
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_whisper_model = None
|
|
|
|
|
|
|
|
|
def get_whisper_model():
|
|
|
global _whisper_model
|
|
|
if _whisper_model is None:
|
|
|
try:
|
|
|
_whisper_model = whisper.load_model("small")
|
|
|
except Exception as e:
|
|
|
raise RuntimeError(f"Failed to load Whisper model: {e}")
|
|
|
return _whisper_model
|
|
|
|
|
|
|
|
|
def analyze_audio(path: str) -> Dict[str, Any]:
|
|
|
try:
|
|
|
model = get_whisper_model()
|
|
|
except Exception as e:
|
|
|
return {"type": "audio", "error": str(e)}
|
|
|
|
|
|
try:
|
|
|
result = model.transcribe(path)
|
|
|
except Exception as e:
|
|
|
return {"type": "audio", "error": f"Whisper transcription failed: {e}"}
|
|
|
|
|
|
text = result.get("text", "") or ""
|
|
|
short = text[:4000]
|
|
|
duration = None
|
|
|
try:
|
|
|
if result.get("segments"):
|
|
|
duration = result["segments"][-1].get("end", None)
|
|
|
except Exception:
|
|
|
duration = None
|
|
|
|
|
|
return {
|
|
|
"type": "audio",
|
|
|
"duration_sec": duration,
|
|
|
"transcript_preview": short,
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_video(path: str) -> Dict[str, Any]:
|
|
|
audio_path = path + ".tmp_audio.wav"
|
|
|
audio_summary: Dict[str, Any]
|
|
|
|
|
|
try:
|
|
|
(
|
|
|
ffmpeg
|
|
|
.input(path)
|
|
|
.output(audio_path, ac=1, ar=16000)
|
|
|
.overwrite_output()
|
|
|
.run(quiet=True)
|
|
|
)
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
"type": "video",
|
|
|
"error": f"Failed to extract audio from video: {e}",
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
audio_summary = analyze_audio(audio_path)
|
|
|
finally:
|
|
|
try:
|
|
|
if os.path.exists(audio_path):
|
|
|
os.remove(audio_path)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
return {
|
|
|
"type": "video",
|
|
|
"audio_analysis": audio_summary,
|
|
|
}
|
|
|
|