GABASSI's picture
Update app.py
8a33e64 verified
import os
import uvicorn
import base64
import nest_asyncio
import shutil
import logging
import tempfile
import io
import requests
# CORREÇÃO: Adicionado 'Any' na lista de imports
from typing import List, Optional, Any
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from huggingface_hub import InferenceClient
from duckduckgo_search import DDGS
from PIL import Image
# Imports LlamaIndex
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings, PromptTemplate
from llama_index.core.llms import CustomLLM, CompletionResponse, CompletionResponseGen, LLMMetadata
from llama_index.core.llms.callbacks import llm_completion_callback
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_parse import LlamaParse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("CognilineOmni")
HF_TOKEN = os.getenv("HF_TOKEN")
LLAMA_KEY = os.getenv("LLAMA_KEY")
nest_asyncio.apply()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# --- 1. VISÃO ---
class VisionCore:
def __init__(self, token):
self.api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large"
self.headers = {"Authorization": f"Bearer {token}"}
def see(self, image_b64):
try:
if "," in image_b64: image_b64 = image_b64.split(",")[1]
image_data = base64.b64decode(image_b64)
response = requests.post(self.api_url, headers=self.headers, data=image_data)
if response.status_code == 200:
result = response.json()
if isinstance(result, list) and len(result) > 0:
return f"[VISÃO]: {result[0].get('generated_text', 'Objeto técnico.')}"
return "[Erro Visual: API ocupada]"
except Exception as e: return f"[Erro Vision: {str(e)}]"
# --- 2. BUSCA WEB ---
def search_web(query):
try:
results = DDGS().text(query, max_results=3)
if not results: return None
return "\n".join([f"• {r['title']}: {r['body'][:200]}..." for r in results])
except: return None
# --- 3. CÉREBRO DUPLO ---
class DoubleBrainLLM(CustomLLM):
model_name: str = "Qwen/Qwen2.5-7B-Instruct"
token: str = None
client: Any = None
def __init__(self, model_name: str, token: str, **kwargs):
super().__init__(model_name=model_name, token=token, **kwargs)
self.client = InferenceClient(token=token)
@property
def metadata(self) -> LLMMetadata: return LLMMetadata(model_name=self.model_name, num_output=2048, context_window=8192)
@llm_completion_callback()
def complete(self, prompt: str, **kwargs) -> CompletionResponse:
sys = "Sistema COGNILINE. 1. Use APENAS LaTeX ($...$) para fórmulas. 2. Responda em Português Técnico."
msgs = [{"role": "system", "content": sys}, {"role": "user", "content": prompt}]
try:
resp = self.client.chat_completion(model=self.model_name, messages=msgs, max_tokens=1536, temperature=0.2)
return CompletionResponse(text=resp.choices[0].message.content)
except Exception as e: return CompletionResponse(text=f"Erro IA: {str(e)}")
@llm_completion_callback()
def stream_complete(self, prompt: str, **kwargs) -> CompletionResponseGen: yield self.complete(prompt, **kwargs)
class CoreSystem:
def __init__(self):
self.rag = None; self.llm = None; self.vision = None; self.mode = "OFFLINE"
def init(self):
if not self.llm:
t = HF_TOKEN if HF_TOKEN else ""
self.llm = DoubleBrainLLM(model_name="Qwen/Qwen2.5-7B-Instruct", token=t)
self.vision = VisionCore(t)
Settings.llm = self.llm
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
def start_rag(self, key, path):
try:
os.environ["LLAMA_CLOUD_API_KEY"] = key
docs = SimpleDirectoryReader(path, file_extractor={".pdf": LlamaParse(result_type="markdown")}).load_data()
self.rag = VectorStoreIndex.from_documents(docs).as_query_engine()
self.mode = "RAG"
return True, "Base Ativa."
except Exception as e: return False, str(e)
core = CoreSystem()
@app.post("/api/init")
async def init_api(files: List[UploadFile] = File(default=[])):
core.init()
if not files: core.mode = "FREE"; return {"status": "ok", "message": "Modo Livre.", "mode": "FREE"}
tmp = tempfile.mkdtemp()
for f in files:
with open(os.path.join(tmp, f.filename), "wb") as b: b.write(await f.read())
ok, m = core.start_rag(LLAMA_KEY, tmp)
return {"status": "ok" if ok else "error", "message": m, "mode": "RAG"}
class Q(BaseModel): query: str; generate_title: bool = False; web_search: bool = False; image_base64: Optional[str] = None
@app.post("/api/ask")
async def api_ask(q: Q):
if not core.llm: core.init()
vis = core.vision.see(q.image_base64) if q.image_base64 else ""
web = ""
if q.web_search:
try:
w = DDGS().text(q.query, max_results=2)
if w: web = "\n" + str(w)
except: pass
full = f"{vis}{web} PERGUNTA: {q.query}"
if core.mode == "FREE": ans = str(core.llm.complete(full).text).strip()
elif core.mode == "RAG" and core.rag: ans = str(core.rag.query(full)).strip()
else: return {"ans": "Offline."}
if web: ans += "\n> Fonte Web."
tit = str(core.llm.complete(f"Título curto: {q.query}").text).replace('"','') if q.generate_title else None
return {"ans": ans, "title": tit}
@app.get("/", response_class=HTMLResponse)
async def home():
try:
with open("index.html", "r", encoding="utf-8") as f: html = f.read()
except: return HTMLResponse("<h1>ERRO CRÍTICO: Crie o arquivo index.html na aba Files!</h1>")
logo = ""
if os.path.exists("CGL.png"):
with open("CGL.png", "rb") as f: logo = f"data:image/png;base64,{base64.b64encode(f.read()).decode()}"
return HTMLResponse(content=html.replace("LOGO_B64", logo))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)