File size: 3,257 Bytes
bee9f12
 
 
 
 
 
 
 
 
 
f051701
 
 
 
 
 
 
bee9f12
 
f051701
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bee9f12
 
46a5ca1
 
 
 
 
 
 
 
bee9f12
 
46a5ca1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import uuid
from PIL import Image
import edge_tts
from duckduckgo_search import DDGS

def clean_history(chat_history):
    return [msg for msg in chat_history if isinstance(msg.get("content"), str)]

def save_image(image: Image.Image) -> str:
    try:
        filename = f"output_{uuid.uuid4()}.png"
        image.save(filename)
        return filename
    except Exception as e:
        print(f"Error saving image: {str(e)}")
        return None

async def text_to_speech(text: str, voice: str):
    try:
        output_file = f"speech_{uuid.uuid4()}.mp3"
        # Add rate and volume parameters
        communicate = edge_tts.Communicate(
            text=text,
            voice=voice,
            rate="+0%",  # Normal speed
            volume="+0%"  # Normal volume
        )
        await communicate.save(output_file)
        return output_file
    except ValueError as ve:
        print(f"Voice error: {str(ve)}")
        # Fallback to default voice if specified voice fails
        try:
            communicate = edge_tts.Communicate(
                text=text,
                voice="en-US-JennyNeural",  # Fallback voice
                rate="+0%",
                volume="+0%"
            )
            await communicate.save(output_file)
            return output_file
        except Exception as e:
            print(f"Fallback voice error: {str(e)}")
            return None
    except Exception as e:
        print(f"TTS error: {str(e)}")
        return None

def web_search(query: str) -> str:
    try:
        with DDGS() as ddgs:
            results = list(ddgs.text(query, max_results=5))
            if not results:
                return "No results found for your query."
            return format_search_results(results)
    except Exception as e:
        return f"Error performing search: {str(e)}"

def format_search_results(results):
    try:
        formatted = "### Search Results\n\n"
        for result in results:
            title = result.get('title', 'No Title')
            body = result.get('body', 'No Description')
            url = result.get('link', result.get('url', '#'))  # Try both 'link' and 'url' keys
            
            # Clean and format the results
            title = title.replace('*', '').replace('_', '')
            body = body.replace('*', '').replace('_', '')
            
            formatted += f"**{title}**\n"
            formatted += f"{body}\n"
            formatted += f"[Read More]({url})\n\n"
            formatted += "---\n\n"
        
        return formatted
    except Exception as e:
        return f"Error formatting results: {str(e)}"

def handle_file_upload(file):
    try:
        if file is None:
            return None
        
        if isinstance(file, str):  # If it's already a file path
            return file
            
        # If it's an uploaded file, save it
        if hasattr(file, 'name'):
            ext = os.path.splitext(file.name)[1].lower()
            filename = f"upload_{uuid.uuid4()}{ext}"
            with open(filename, 'wb') as f:
                f.write(file.read())
            return filename
            
        return None
    except Exception as e:
        print(f"Error handling file upload: {str(e)}")
        return None