Spaces:
Running
Running
| import os | |
| import uvicorn | |
| import base64 | |
| import nest_asyncio | |
| import shutil | |
| import logging | |
| import tempfile | |
| import io | |
| import requests | |
| # CORREÇÃO: Adicionado 'Any' na lista de imports | |
| from typing import List, Optional, Any | |
| from fastapi import FastAPI, UploadFile, File, Form | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from huggingface_hub import InferenceClient | |
| from duckduckgo_search import DDGS | |
| from PIL import Image | |
| # Imports LlamaIndex | |
| from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings, PromptTemplate | |
| from llama_index.core.llms import CustomLLM, CompletionResponse, CompletionResponseGen, LLMMetadata | |
| from llama_index.core.llms.callbacks import llm_completion_callback | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_parse import LlamaParse | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("CognilineOmni") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| LLAMA_KEY = os.getenv("LLAMA_KEY") | |
| nest_asyncio.apply() | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- 1. VISÃO --- | |
| class VisionCore: | |
| def __init__(self, token): | |
| self.api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" | |
| self.headers = {"Authorization": f"Bearer {token}"} | |
| def see(self, image_b64): | |
| try: | |
| if "," in image_b64: image_b64 = image_b64.split(",")[1] | |
| image_data = base64.b64decode(image_b64) | |
| response = requests.post(self.api_url, headers=self.headers, data=image_data) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| return f"[VISÃO]: {result[0].get('generated_text', 'Objeto técnico.')}" | |
| return "[Erro Visual: API ocupada]" | |
| except Exception as e: return f"[Erro Vision: {str(e)}]" | |
| # --- 2. BUSCA WEB --- | |
| def search_web(query): | |
| try: | |
| results = DDGS().text(query, max_results=3) | |
| if not results: return None | |
| return "\n".join([f"• {r['title']}: {r['body'][:200]}..." for r in results]) | |
| except: return None | |
| # --- 3. CÉREBRO DUPLO --- | |
| class DoubleBrainLLM(CustomLLM): | |
| model_name: str = "Qwen/Qwen2.5-7B-Instruct" | |
| token: str = None | |
| client: Any = None | |
| def __init__(self, model_name: str, token: str, **kwargs): | |
| super().__init__(model_name=model_name, token=token, **kwargs) | |
| self.client = InferenceClient(token=token) | |
| def metadata(self) -> LLMMetadata: return LLMMetadata(model_name=self.model_name, num_output=2048, context_window=8192) | |
| def complete(self, prompt: str, **kwargs) -> CompletionResponse: | |
| sys = "Sistema COGNILINE. 1. Use APENAS LaTeX ($...$) para fórmulas. 2. Responda em Português Técnico." | |
| msgs = [{"role": "system", "content": sys}, {"role": "user", "content": prompt}] | |
| try: | |
| resp = self.client.chat_completion(model=self.model_name, messages=msgs, max_tokens=1536, temperature=0.2) | |
| return CompletionResponse(text=resp.choices[0].message.content) | |
| except Exception as e: return CompletionResponse(text=f"Erro IA: {str(e)}") | |
| def stream_complete(self, prompt: str, **kwargs) -> CompletionResponseGen: yield self.complete(prompt, **kwargs) | |
| class CoreSystem: | |
| def __init__(self): | |
| self.rag = None; self.llm = None; self.vision = None; self.mode = "OFFLINE" | |
| def init(self): | |
| if not self.llm: | |
| t = HF_TOKEN if HF_TOKEN else "" | |
| self.llm = DoubleBrainLLM(model_name="Qwen/Qwen2.5-7B-Instruct", token=t) | |
| self.vision = VisionCore(t) | |
| Settings.llm = self.llm | |
| Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") | |
| def start_rag(self, key, path): | |
| try: | |
| os.environ["LLAMA_CLOUD_API_KEY"] = key | |
| docs = SimpleDirectoryReader(path, file_extractor={".pdf": LlamaParse(result_type="markdown")}).load_data() | |
| self.rag = VectorStoreIndex.from_documents(docs).as_query_engine() | |
| self.mode = "RAG" | |
| return True, "Base Ativa." | |
| except Exception as e: return False, str(e) | |
| core = CoreSystem() | |
| async def init_api(files: List[UploadFile] = File(default=[])): | |
| core.init() | |
| if not files: core.mode = "FREE"; return {"status": "ok", "message": "Modo Livre.", "mode": "FREE"} | |
| tmp = tempfile.mkdtemp() | |
| for f in files: | |
| with open(os.path.join(tmp, f.filename), "wb") as b: b.write(await f.read()) | |
| ok, m = core.start_rag(LLAMA_KEY, tmp) | |
| return {"status": "ok" if ok else "error", "message": m, "mode": "RAG"} | |
| class Q(BaseModel): query: str; generate_title: bool = False; web_search: bool = False; image_base64: Optional[str] = None | |
| async def api_ask(q: Q): | |
| if not core.llm: core.init() | |
| vis = core.vision.see(q.image_base64) if q.image_base64 else "" | |
| web = "" | |
| if q.web_search: | |
| try: | |
| w = DDGS().text(q.query, max_results=2) | |
| if w: web = "\n" + str(w) | |
| except: pass | |
| full = f"{vis}{web} PERGUNTA: {q.query}" | |
| if core.mode == "FREE": ans = str(core.llm.complete(full).text).strip() | |
| elif core.mode == "RAG" and core.rag: ans = str(core.rag.query(full)).strip() | |
| else: return {"ans": "Offline."} | |
| if web: ans += "\n> Fonte Web." | |
| tit = str(core.llm.complete(f"Título curto: {q.query}").text).replace('"','') if q.generate_title else None | |
| return {"ans": ans, "title": tit} | |
| async def home(): | |
| try: | |
| with open("index.html", "r", encoding="utf-8") as f: html = f.read() | |
| except: return HTMLResponse("<h1>ERRO CRÍTICO: Crie o arquivo index.html na aba Files!</h1>") | |
| logo = "" | |
| if os.path.exists("CGL.png"): | |
| with open("CGL.png", "rb") as f: logo = f"data:image/png;base64,{base64.b64encode(f.read()).decode()}" | |
| return HTMLResponse(content=html.replace("LOGO_B64", logo)) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |