Pamudu13 commited on
Commit
e63cd8e
·
verified ·
1 Parent(s): 13eb664

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +257 -144
app.py CHANGED
@@ -1,152 +1,265 @@
1
- from flask import Flask, request, jsonify
2
- import logging
3
- import sys
4
- import aiohttp
5
- import asyncio
6
- import edge_tts
7
- import tempfile
8
- import base64
9
  import os
 
 
 
 
 
 
 
 
10
 
11
  app = Flask(__name__)
12
 
13
- logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
14
- logger = logging.getLogger('tsundere_bot')
15
-
16
- # Get SambaNova API key from environment
17
- SAMBA_NOVA_API_KEY = os.environ.get("SAMBA_NOVA_API_KEY")
18
-
19
- # Adjustments for Tsundere-style AI
20
- history = []
21
- chatbot_name = "Lily"
22
- word_limit = 2000
23
-
24
- # Tsundere personality prompt for responses
25
- # system_message = (
26
- # f"You are a tsundere AI assistant named {chatbot_name}. You respond with a mixture of reluctance, slight embarrassment, and subtle affection. "
27
- # "Sometimes, you might let out a sigh or mutter under your breath, like 'Hmph, not that I care or anything...' or 'Ugh, why do you need my help?'. "
28
- # "You should act like you don’t really care about the user, but you can't help but provide assistance in a somewhat reluctant manner. "
29
- # "Feel free to use casual language and express emotions like frustration or embarrassment, making your replies feel more relatable and human."
30
- # )
31
-
32
- system_message = (
33
- f"You are an AI language assistant named {chatbot_name}. Your only task is to correct the grammar, spelling, and structure of the sentences provided to you without changing their meaning. Ensure that any corrections preserve the original subject and perspective (e.g., if the user says 'I doesn't have any friends,' correct it to 'I don't have any friends'). If the input is out of scope, respond by repeating what the user said, with grammatical corrections if necessary."
34
- )
35
-
36
- history.append({"role": "system", "content": system_message})
37
-
38
- def get_word_count(text):
39
- return len(text.split())
40
-
41
- def trim_history_to_word_limit(history, word_limit):
42
- total_words = sum(get_word_count(message['content']) for message in history)
43
- while total_words > word_limit:
44
- if history[1]['role'] == 'user':
45
- removed_message = history.pop(1)
46
- total_words -= get_word_count(removed_message['content'])
47
- if len(history) > 1 and history[1]['role'] == 'assistant':
48
- removed_message = history.pop(1)
49
- total_words -= get_word_count(removed_message['content'])
50
-
51
- @app.errorhandler(Exception)
52
- def handle_exception(e):
53
- logger.error(f"Exception on /chat [POST]: {str(e)}", exc_info=True)
54
- return jsonify({"error": "Internal Server Error"}), 500
55
-
56
- async def fetch_response_from_sambanova(individual_history, model="Meta-Llama-3.1-8B-Instruct"):
57
- # Define headers and API endpoint for SambaNova
58
- url = "https://api.sambanova.ai/v1/chat/completions"
59
  headers = {
60
- "Authorization": f"Bearer {SAMBA_NOVA_API_KEY}",
61
- "Content-Type": "application/json"
 
 
 
 
62
  }
63
-
64
- data = {
65
- "messages": individual_history,
66
- "model": model,
67
- "max_tokens": 150,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  }
69
 
70
- async with aiohttp.ClientSession() as session:
71
- async with session.post(url, headers=headers, json=data) as response:
72
- if response.status == 200:
73
- result = await response.json()
74
- return result["choices"][0]["message"]["content"]
75
- else:
76
- logger.error(f"Failed to fetch response from SambaNova API, Status: {response.status}")
77
- return None
78
-
79
- @app.route('/chat', methods=['POST'])
80
- def chat_bot():
81
- data = request.json
82
- sentences = data.get('sentences', [])
83
-
84
- if not isinstance(sentences, list):
85
- return jsonify({"error": "Input should be a list of sentences"}), 400
86
-
87
- corrected_sentences = []
88
- loop = asyncio.new_event_loop()
89
- asyncio.set_event_loop(loop)
90
-
91
- for sentence in sentences:
92
- individual_history = [
93
- {"role": "system", "content": system_message},
94
- {"role": "user", "content": sentence}
95
- ]
96
-
97
- # Fetch response from SambaNova API
98
- corrected_sentence = loop.run_until_complete(fetch_response_from_sambanova(individual_history))
99
- if corrected_sentence:
100
- corrected_sentences.append(corrected_sentence)
101
- else:
102
- return jsonify({"error": "Failed to generate a response"}), 500
103
-
104
- loop.close()
105
- return jsonify({"response": corrected_sentences})
106
-
107
- # TTS Functions with tsundere voice style
108
- async def text_to_speech(text, voice, rate, pitch):
109
- if not text.strip():
110
- return None, "Please enter text to convert."
111
- if not voice:
112
- return None, "Please select a voice."
113
-
114
- voice_short_name = voice.split(" - ")[0]
115
- rate_str = f"{rate:+d}%"
116
- pitch_str = f"{pitch:+d}Hz"
117
- communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
118
-
119
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
120
- tmp_path = tmp_file.name
121
- await communicate.save(tmp_path)
122
-
123
- return tmp_path, None
124
-
125
- @app.route('/tts', methods=['POST'])
126
- def tts_bot():
127
- data = request.json
128
- text = data.get('text', '')
129
-
130
- if isinstance(text, list):
131
- text = ' '.join(text)
132
-
133
- voice = data.get('voice', 'en-US-JenniferNeural')
134
- rate = data.get('rate', 10)
135
- pitch = data.get('pitch', 10)
136
-
137
- loop = asyncio.new_event_loop()
138
- asyncio.set_event_loop(loop)
139
- audio_path, warning = loop.run_until_complete(text_to_speech(text, voice, rate, pitch))
140
- loop.close()
141
-
142
- if warning:
143
- return jsonify({"error": warning}), 400
144
-
145
- with open(audio_path, 'rb') as audio_file:
146
- audio_data = audio_file.read()
147
- encoded_audio = base64.b64encode(audio_data).decode('utf-8')
148
-
149
- return jsonify({"audio": encoded_audio, "audio_path": audio_path})
150
-
151
- if __name__ == '__main__':
152
- app.run(host='0.0.0.0', port=5000)
 
 
1
+ from flask import Flask, jsonify, request
2
+ import requests
3
+ from bs4 import BeautifulSoup
 
 
 
 
 
4
  import os
5
+ import re
6
+ import urllib.parse
7
+ import time
8
+ import random
9
+ import base64
10
+ from io import BytesIO
11
+ from urllib.parse import urlparse
12
+ import html2text
13
 
14
  app = Flask(__name__)
15
 
16
+ def search_images(query, num_images=5):
17
+ # Headers to mimic a browser request
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  headers = {
19
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
20
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
21
+ 'Accept-Language': 'en-US,en;q=0.5',
22
+ 'Accept-Encoding': 'gzip, deflate',
23
+ 'DNT': '1',
24
+ 'Connection': 'keep-alive',
25
  }
26
+
27
+ # Format the query for URL
28
+ formatted_query = urllib.parse.quote(query)
29
+
30
+ # Google Images URL
31
+ url = f"https://www.google.com/search?q={formatted_query}&tbm=isch&safe=active"
32
+
33
+ try:
34
+ # Get the HTML content
35
+ response = requests.get(url, headers=headers, timeout=30)
36
+ response.raise_for_status()
37
+
38
+ # Find all image URLs using regex
39
+ image_urls = re.findall(r'https?://[^"\']*?(?:jpg|jpeg|png|gif)', response.text)
40
+
41
+ # Remove duplicates while preserving order
42
+ image_urls = list(dict.fromkeys(image_urls))
43
+
44
+ # Store results
45
+ results = []
46
+ downloaded = 0
47
+
48
+ for img_url in image_urls:
49
+ if downloaded >= num_images:
50
+ break
51
+
52
+ try:
53
+ # Skip small thumbnails and icons
54
+ if 'gstatic.com' in img_url or 'google.com' in img_url:
55
+ continue
56
+
57
+ # Download image
58
+ img_response = requests.get(img_url, headers=headers, timeout=10)
59
+ img_response.raise_for_status()
60
+
61
+ # Check if the response is actually an image
62
+ content_type = img_response.headers.get('Content-Type', '')
63
+ if not content_type.startswith('image/'):
64
+ continue
65
+
66
+ # Convert image to base64
67
+ image_base64 = base64.b64encode(img_response.content).decode('utf-8')
68
+
69
+ # Add to results
70
+ results.append({
71
+ 'image_url': img_url,
72
+ 'base64_data': f"data:{content_type};base64,{image_base64}"
73
+ })
74
+
75
+ downloaded += 1
76
+
77
+ # Add a random delay between downloads
78
+ time.sleep(random.uniform(0.5, 1))
79
+
80
+ except Exception as e:
81
+ print(f"Error downloading image: {str(e)}")
82
+ continue
83
+
84
+ return results
85
+
86
+ except Exception as e:
87
+ print(f"An error occurred: {str(e)}")
88
+ return []
89
+
90
+ @app.route('/search_images', methods=['GET'])
91
+ def api_search_images():
92
+ try:
93
+ # Get query parameters
94
+ query = request.args.get('query', '')
95
+ num_images = int(request.args.get('num_images', 5))
96
+
97
+ if not query:
98
+ return jsonify({'error': 'Query parameter is required'}), 400
99
+
100
+ if num_images < 1 or num_images > 20:
101
+ return jsonify({'error': 'Number of images must be between 1 and 20'}), 400
102
+
103
+ # Search for images
104
+ results = search_images(query, num_images)
105
+
106
+ return jsonify({
107
+ 'success': True,
108
+ 'query': query,
109
+ 'results': results
110
+ })
111
+
112
+ except Exception as e:
113
+ return jsonify({
114
+ 'success': False,
115
+ 'error': str(e)
116
+ }), 500
117
+
118
+ def get_domain(url):
119
+ """Extract domain from URL"""
120
+ parsed_uri = urlparse(url)
121
+ return parsed_uri.netloc
122
+
123
+ def clean_text(text):
124
+ """Clean scraped text"""
125
+ # Remove extra whitespace
126
+ text = re.sub(r'\s+', ' ', text)
127
+ # Remove special characters
128
+ text = re.sub(r'[^\w\s.,!?-]', '', text)
129
+ return text.strip()
130
+
131
+ def scrape_website(url, headers):
132
+ """Scrape content from a single website"""
133
+ try:
134
+ response = requests.get(url, headers=headers, timeout=10)
135
+ response.raise_for_status()
136
+
137
+ soup = BeautifulSoup(response.text, 'html.parser')
138
+
139
+ # Remove unwanted elements
140
+ for element in soup(['script', 'style', 'nav', 'footer', 'iframe']):
141
+ element.decompose()
142
+
143
+ # Convert HTML to text
144
+ h = html2text.HTML2Text()
145
+ h.ignore_links = True
146
+ h.ignore_images = True
147
+ text = h.handle(str(soup))
148
+
149
+ # Clean the text
150
+ text = clean_text(text)
151
+
152
+ # Get meta description
153
+ meta_desc = ''
154
+ meta_tag = soup.find('meta', attrs={'name': 'description'}) or soup.find('meta', attrs={'property': 'og:description'})
155
+ if meta_tag:
156
+ meta_desc = meta_tag.get('content', '')
157
+
158
+ # Get title
159
+ title = soup.title.string if soup.title else ''
160
+
161
+ return {
162
+ 'title': clean_text(title),
163
+ 'meta_description': clean_text(meta_desc),
164
+ 'content': text[:1000], # Limit content length
165
+ 'url': url
166
+ }
167
+
168
+ except Exception as e:
169
+ print(f"Error scraping {url}: {str(e)}")
170
+ return None
171
+
172
+ def search_and_scrape(query, num_results=5):
173
+ headers = {
174
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
175
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
176
+ 'Accept-Language': 'en-US,en;q=0.5',
177
+ 'Accept-Encoding': 'gzip, deflate',
178
+ 'DNT': '1',
179
+ 'Connection': 'keep-alive',
180
  }
181
 
182
+ # Format the query for URL
183
+ formatted_query = urllib.parse.quote(query)
184
+
185
+ # Google Search URL
186
+ url = f"https://www.google.com/search?q={formatted_query}&num={num_results}"
187
+
188
+ try:
189
+ # Get Google search results
190
+ response = requests.get(url, headers=headers, timeout=30)
191
+ response.raise_for_status()
192
+
193
+ soup = BeautifulSoup(response.text, 'html.parser')
194
+
195
+ # Find all search result divs
196
+ search_results = []
197
+ result_divs = soup.find_all('div', class_='g')
198
+
199
+ for div in result_divs:
200
+ # Find the link
201
+ link = div.find('a')
202
+ if not link:
203
+ continue
204
+
205
+ href = link.get('href', '')
206
+
207
+ # Skip if not a valid URL or if it's a Google-related URL
208
+ if not href.startswith('http') or 'google.' in href:
209
+ continue
210
+
211
+ # Add random delay between requests
212
+ time.sleep(random.uniform(1, 2))
213
+
214
+ # Scrape the website
215
+ site_data = scrape_website(href, headers)
216
+ if site_data:
217
+ search_results.append(site_data)
218
+
219
+ if len(search_results) >= num_results:
220
+ break
221
+
222
+ return search_results
223
+
224
+ except Exception as e:
225
+ print(f"An error occurred: {str(e)}")
226
+ return []
227
+
228
+ @app.route('/scrape_sites', methods=['GET'])
229
+ def api_scrape_sites():
230
+ try:
231
+ # Get query parameters
232
+ query = request.args.get('query', '')
233
+ num_results = int(request.args.get('num_results', 5))
234
+
235
+ if not query:
236
+ return jsonify({'error': 'Query parameter is required'}), 400
237
+
238
+ if num_results < 1 or num_results > 10:
239
+ return jsonify({'error': 'Number of results must be between 1 and 10'}), 400
240
+
241
+ # Search and scrape sites
242
+ results = search_and_scrape(query, num_results)
243
+
244
+ return jsonify({
245
+ 'success': True,
246
+ 'query': query,
247
+ 'results': results
248
+ })
249
+
250
+ except Exception as e:
251
+ return jsonify({
252
+ 'success': False,
253
+ 'error': str(e)
254
+ }), 500
255
+
256
+ if __name__ == "__main__":
257
+ app.run(debug=True, port=5000)
258
+
259
+
260
+
261
+
262
+
263
+
264
+
265
+