base2 / file_pipeline.py
Ashok75's picture
Upload 28 files
428ef01 verified
#!/usr/bin/env python3
"""
file_pipeline.py
Universal file pipeline for GAKR AI.
Responsibilities:
- Create dataupload/ folder structure (if not present)
- Save uploaded files to disk
- Detect file type
- Run type-specific extractors
- Return structured, text-friendly context for Phi-3
"""
from __future__ import annotations
import os
import mimetypes
from datetime import datetime
from typing import List, Dict, Any
import pandas as pd
import pdfplumber
import docx
import fitz # PyMuPDF
from PIL import Image
import pytesseract
import whisper
import ffmpeg
from fastapi import UploadFile
# ============================================================
# PATHS & FOLDERS
# ============================================================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
UPLOAD_ROOT = os.path.join(BASE_DIR, "dataupload")
FOLDERS = {
"image": "images",
"video": "videos",
"audio": "audio",
"document": "documents",
"tabular": "tabular",
"other": "other",
}
def ensure_folders() -> None:
"""
Ensure base upload folder and all subfolders exist.
"""
os.makedirs(UPLOAD_ROOT, exist_ok=True)
for sub in FOLDERS.values():
os.makedirs(os.path.join(UPLOAD_ROOT, sub), exist_ok=True)
ensure_folders()
# ============================================================
# TYPE DETECTION & PATHS
# ============================================================
def detect_kind(filename: str, content_type: str | None) -> str:
"""
Decide logical kind: tabular, document, image, audio, video, other.
"""
ext = os.path.splitext(filename)[1].lower()
if ext in [".csv", ".xlsx", ".xls", ".json"]:
return "tabular"
if ext in [".pdf", ".txt"]:
return "document"
if ext in [".docx"]:
return "document"
if ext in [".png", ".jpg", ".jpeg", ".webp", ".bmp"]:
return "image"
if ext in [".mp3", ".wav", ".m4a"]:
return "audio"
if ext in [".mp4", ".mkv", ".mov", ".avi"]:
return "video"
if content_type:
if content_type.startswith("image/"):
return "image"
if content_type.startswith("audio/"):
return "audio"
if content_type.startswith("video/"):
return "video"
return "other"
def make_target_path(kind: str, filename: str) -> str:
"""
Build a safe, timestamped filepath under dataupload/{sub}/.
"""
sub = FOLDERS.get(kind, "other")
safe_name = os.path.basename(filename)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
final_name = f"{timestamp}_{safe_name}"
return os.path.join(UPLOAD_ROOT, sub, final_name)
# ============================================================
# MAIN MULTI-FILE ENTRY POINT
# ============================================================
async def process_files(files: List[UploadFile]) -> Dict[str, Any]:
"""
Save all files, run analyses, and return structured context.
Output example:
{
"files": [
{
"original_name": "...",
"stored_path": "dataupload/documents/...",
"kind": "document",
"summary": { ... }
},
...
]
}
"""
ensure_folders()
results: List[Dict[str, Any]] = []
for uf in files:
try:
kind = detect_kind(uf.filename, uf.content_type)
target_path = make_target_path(kind, uf.filename)
# Save file to disk
try:
with open(target_path, "wb") as out:
data = await uf.read()
out.write(data)
except Exception as save_err:
results.append(
{
"original_name": uf.filename,
"stored_path": None,
"kind": kind,
"summary": {
"error": f"Failed to save file: {save_err}"
},
}
)
continue
# Analyze by type
try:
summary = analyze_file(target_path, kind)
except Exception as analyze_err:
summary = {
"error": f"Unexpected error in analysis: {analyze_err}"
}
results.append(
{
"original_name": uf.filename,
"stored_path": os.path.relpath(target_path, BASE_DIR),
"kind": kind,
"summary": summary,
}
)
except Exception as outer_err:
results.append(
{
"original_name": getattr(uf, "filename", "unknown"),
"stored_path": None,
"kind": "unknown",
"summary": {
"error": f"Fatal error while handling file: {outer_err}"
},
}
)
return {"files": results}
# ============================================================
# TYPE-SPECIFIC ANALYSIS
# ============================================================
def analyze_file(path: str, kind: str) -> Dict[str, Any]:
if kind == "tabular":
return analyze_tabular(path)
if kind == "document":
return analyze_document(path)
if kind == "image":
return analyze_image(path)
if kind == "audio":
return analyze_audio(path)
if kind == "video":
return analyze_video(path)
return {"type": "other", "note": "Unsupported or unknown file type"}
# ---------- TABULAR: CSV / Excel / JSON ----------
def analyze_tabular(path: str) -> Dict[str, Any]:
ext = os.path.splitext(path)[1].lower()
df = None
try:
if ext == ".csv":
df = pd.read_csv(path)
elif ext in [".xlsx", ".xls"]:
df = pd.read_excel(path)
elif ext == ".json":
df = pd.read_json(path)
else:
return {
"type": "tabular",
"error": f"Unsupported tabular format: {ext}",
}
except Exception as e:
return {"type": "tabular", "error": f"Failed to load table: {e}"}
summary: Dict[str, Any] = {
"type": "tabular",
"rows": int(df.shape[0]),
"columns": [str(c) for c in df.columns],
}
try:
summary["missing_values"] = df.isna().sum().to_dict()
except Exception as e:
summary["missing_values_error"] = str(e)
try:
summary["numeric_stats"] = df.describe(include="number").to_dict()
except Exception:
summary["numeric_stats"] = {}
return summary
# ---------- DOCUMENTS: PDF / DOCX / TXT ----------
def analyze_document(path: str) -> Dict[str, Any]:
ext = os.path.splitext(path)[1].lower()
text = ""
try:
if ext == ".pdf":
# First try pdfplumber
try:
with pdfplumber.open(path) as pdf:
pages = []
for page in pdf.pages[:10]:
t = page.extract_text()
if t:
pages.append(t)
text = "\n".join(pages)
except Exception:
# Fallback to PyMuPDF
doc = fitz.open(path)
chunks = []
for page in doc[:10]:
chunks.append(page.get_text())
text = "\n".join(chunks)
elif ext == ".docx":
d = docx.Document(path)
paras = [p.text for p in d.paragraphs if p.text.strip()]
text = "\n".join(paras)
else: # .txt or unknown plain-text
with open(path, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
except Exception as e:
return {
"type": "document",
"error": f"Failed to extract document text: {e}",
}
short = text[:4000]
return {
"type": "document",
"char_count": len(text),
"preview": short,
}
# ---------- IMAGES ----------
def analyze_image(path: str) -> Dict[str, Any]:
try:
img = Image.open(path)
except Exception as e:
return {"type": "image", "error": f"Failed to open image: {e}"}
try:
text = pytesseract.image_to_string(img)
except Exception as e:
text = ""
ocr_error = str(e)
else:
ocr_error = None
short = text[:2000]
result: Dict[str, Any] = {
"type": "image",
"size": {"width": img.width, "height": img.height},
"ocr_preview": short,
}
if ocr_error:
result["ocr_error"] = ocr_error
return result
# ---------- AUDIO (Whisper) ----------
_whisper_model = None
def get_whisper_model():
global _whisper_model
if _whisper_model is None:
try:
_whisper_model = whisper.load_model("small")
except Exception as e:
raise RuntimeError(f"Failed to load Whisper model: {e}")
return _whisper_model
def analyze_audio(path: str) -> Dict[str, Any]:
try:
model = get_whisper_model()
except Exception as e:
return {"type": "audio", "error": str(e)}
try:
result = model.transcribe(path)
except Exception as e:
return {"type": "audio", "error": f"Whisper transcription failed: {e}"}
text = result.get("text", "") or ""
short = text[:4000]
duration = None
try:
if result.get("segments"):
duration = result["segments"][-1].get("end", None)
except Exception:
duration = None
return {
"type": "audio",
"duration_sec": duration,
"transcript_preview": short,
}
# ---------- VIDEO (audio extraction + Whisper) ----------
def analyze_video(path: str) -> Dict[str, Any]:
audio_path = path + ".tmp_audio.wav"
audio_summary: Dict[str, Any]
try:
(
ffmpeg
.input(path)
.output(audio_path, ac=1, ar=16000)
.overwrite_output()
.run(quiet=True)
)
except Exception as e:
return {
"type": "video",
"error": f"Failed to extract audio from video: {e}",
}
try:
audio_summary = analyze_audio(audio_path)
finally:
try:
if os.path.exists(audio_path):
os.remove(audio_path)
except Exception:
pass
return {
"type": "video",
"audio_analysis": audio_summary,
}