File size: 4,267 Bytes
a60683b 08c7be8 15dc739 a60683b 08c7be8 15dc739 a60683b 4adcefb 1843c3a 08c7be8 4adcefb 735359c 4adcefb 735359c a60683b 735359c 15dc739 a60683b 08c7be8 b6186d4 1843c3a a60683b b6186d4 4adcefb b6186d4 4adcefb 08c7be8 a60683b 15dc739 08c7be8 4adcefb a60683b 4adcefb a60683b 4adcefb a60683b b6186d4 735359c b6186d4 a60683b 08c7be8 4adcefb a60683b 4adcefb a60683b b6186d4 a60683b b6186d4 a60683b 4adcefb b6186d4 a60683b b6186d4 a60683b 4adcefb a60683b 4adcefb a60683b b6186d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from groq import Groq
from langchain_community.document_loaders import WebBaseLoader
import os
import io
from dotenv import load_dotenv
from PIL import Image
import pytesseract
import whisper
from docx import Document
import pandas as pd
import PyPDF2
# Load environment variables
load_dotenv()
# Set paths for OCR & audio
pytesseract.pytesseract.tesseract_cmd = os.getenv("TESSERACT_CMD", "/usr/bin/tesseract")
os.environ["PATH"] += os.pathsep + os.getenv("FFMPEG_PATH", "/usr/bin")
app = FastAPI()
client = Groq(api_key=os.getenv("GROQ_API_KEY"))
UPLOAD_DIR = "uploaded_files"
os.makedirs(UPLOAD_DIR, exist_ok=True)
MAX_FILE_SIZE_MB = 10
def extract_text_from_file(file_path):
ext = os.path.splitext(file_path)[-1].lower()
if ext == ".txt":
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
elif ext == ".docx":
doc = Document(file_path)
return "\n".join([p.text for p in doc.paragraphs])
elif ext == ".csv":
df = pd.read_csv(file_path)
return df.to_string(index=False)
elif ext == ".pdf":
with open(file_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
return "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
return "❌ Unsupported file type."
@app.post("/chat-with-file")
async def chat_with_file(file: UploadFile = File(...), question: str = Form(...)):
try:
contents = await file.read()
if len(contents) > MAX_FILE_SIZE_MB * 1024 * 1024:
return JSONResponse(status_code=400, content={"error": "❌ File too large. Max 10MB."})
path = os.path.join(UPLOAD_DIR, file.filename)
with open(path, "wb") as f:
f.write(contents)
text = extract_text_from_file(path)
response = client.chat.completions.create(
model="llama3-8b-8192",
messages=[
{"role": "system", "content": "You are a helpful assistant. Answer using the uploaded file."},
{"role": "user", "content": f"{text}\n\nQuestion: {question}"}
]
)
return {"answer": response.choices[0].message.content}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
class URLQuery(BaseModel):
url: str
question: str
@app.post("/chat-with-url")
async def chat_with_url(data: URLQuery):
try:
loader = WebBaseLoader(data.url, header_template={"User-Agent": "Mozilla/5.0"})
docs = loader.load()
content = "\n".join([doc.page_content for doc in docs])
response = client.chat.completions.create(
model="llama3-8b-8192",
messages=[
{"role": "system", "content": "You are a helpful assistant. Answer using the webpage content."},
{"role": "user", "content": f"Web Content:\n{content}\n\nQuestion: {data.question}"}
]
)
return {"answer": response.choices[0].message.content}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/extract-text-from-image")
async def extract_text_from_image(file: UploadFile = File(...)):
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("L")
image = image.resize((image.width * 2, image.height * 2))
text = pytesseract.image_to_string(image, lang="eng")
return {"answer": text.strip() or "⚠️ No text extracted."}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.post("/transcribe-audio")
async def transcribe_audio(file: UploadFile = File(...)):
try:
contents = await file.read()
path = os.path.join(UPLOAD_DIR, file.filename)
with open(path, "wb") as f:
f.write(contents)
model = whisper.load_model("base")
result = model.transcribe(path)
return {"answer": result.get("text", "⚠️ No transcript returned.")}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
|