Spaces:
Paused
Paused
| import os | |
| import requests | |
| import json | |
| from io import BytesIO | |
| from quart import Quart, render_template, request, jsonify | |
| import aiohttp | |
| app = Quart(__name__) | |
| # Use a dictionary to store the history for each session | |
| history = {} | |
| async def index(): | |
| return await render_template('index.html') | |
| async def generate_text(): | |
| data = await request.get_json() | |
| prompt = data.get('prompt', '') | |
| session_id = data.get('session_id', 'default') | |
| toggle = data.get('toggle', False) | |
| # Check if the session_id already exists in the history | |
| if session_id not in history: | |
| history[session_id] = [{"role": "system", "content": """ You are Helpful Assistant made by NyX AI and Samir!"""}] | |
| if toggle: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f'https://ddg-api.awam.repl.co/api/search?query={prompt}') as response: | |
| search_data = await response.json() | |
| search_info = ' '.join([f"Title: {result['Title']}, Link: {result['Link']}, Snippet: {result['Snippet']}" for result in search_data]) | |
| # Append the user message to the history for this session | |
| history[session_id].append({"role": "user", "content": prompt}) | |
| url = 'https://api.deepinfra.com/v1/openai/chat/completions' | |
| headers = { | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Connection': 'keep-alive', | |
| 'Content-Type': 'application/json', | |
| 'Origin': 'https://deepinfra.com', | |
| 'Referer': 'https://deepinfra.com/', | |
| 'Sec-Fetch-Dest': 'empty', | |
| 'Sec-Fetch-Mode': 'cors', | |
| 'Sec-Fetch-Site': 'same-site', | |
| 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1', | |
| 'X-Deepinfra-Source': 'web-page', | |
| 'accept': 'text/event-stream', | |
| } | |
| data = { | |
| "model": "mwen/Qwen2.5-72B-Instruct", | |
| "messages": history[session_id], | |
| "max_tokens": 10000, | |
| "stream": False | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(url, json=data, headers=headers) as response: | |
| response_data = await response.json() | |
| # Extract the assistant's message from the response | |
| if 'choices' in response_data: | |
| assistant_message = response_data['choices'][0]['message']['content'] | |
| else: | |
| assistant_message = "I'm sorry, I couldn't generate a response." | |
| # Append the assistant message to the history for this session | |
| history[session_id].append({ | |
| "role": "assistant", | |
| "content": assistant_message | |
| }) | |
| return jsonify({'result': assistant_message}) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |