Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import re | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Generator, Tuple | |
| import google.generativeai as genai | |
| from tavily import TavilyClient | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| import numpy as np | |
| from urllib.parse import urlparse | |
| import hashlib | |
| class RAGPipeline: | |
| """RAG pipeline for document indexing, retrieval and re-ranking""" | |
| def __init__(self, embedding_model, reranker): | |
| self.embedding_model = embedding_model | |
| self.reranker = reranker | |
| self.documents = [] | |
| self.embeddings = None | |
| def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: | |
| """Chunk text into overlapping segments""" | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunk = text[start:end] | |
| # Try to end on sentence boundary | |
| if end < len(text): | |
| last_period = chunk.rfind('. ') | |
| if last_period > chunk_size // 2: | |
| end = start + last_period + 2 | |
| chunk = text[start:end] | |
| chunks.append(chunk.strip()) | |
| start = end - overlap | |
| return chunks | |
| def index_research(self, research_items: List[Dict]): | |
| """Index research documents for retrieval""" | |
| self.documents = [] | |
| for item in research_items: | |
| content = item.get('content', '') | |
| source = item.get('url', 'Unknown') | |
| title = item.get('title', 'Untitled') | |
| # Chunk the content | |
| chunks = self.chunk_text(content) | |
| for i, chunk in enumerate(chunks): | |
| if len(chunk.strip()) > 100: # Skip very short chunks | |
| self.documents.append({ | |
| 'content': chunk, | |
| 'source': source, | |
| 'title': title, | |
| 'chunk_id': i | |
| }) | |
| if self.documents: | |
| # Generate embeddings | |
| texts = [doc['content'] for doc in self.documents] | |
| self.embeddings = self.embedding_model.encode(texts, show_progress_bar=False) | |
| def retrieve_and_rerank(self, query: str, top_k: int = 10) -> List[Dict]: | |
| """Retrieve and re-rank relevant chunks""" | |
| if not self.documents or self.embeddings is None: | |
| return [] | |
| # Semantic search | |
| query_embedding = self.embedding_model.encode([query]) | |
| similarities = np.dot(query_embedding, self.embeddings.T)[0] | |
| # Get top candidates (more than final top_k for re-ranking) | |
| top_indices = np.argsort(similarities)[::-1][:top_k * 2] | |
| candidates = [self.documents[i] for i in top_indices] | |
| # Re-rank with cross-encoder | |
| pairs = [(query, doc['content']) for doc in candidates] | |
| scores = self.reranker.predict(pairs) | |
| # Sort by re-ranking scores | |
| ranked_results = [] | |
| for doc, score in zip(candidates, scores): | |
| doc_copy = doc.copy() | |
| doc_copy['relevance_score'] = float(score) | |
| ranked_results.append(doc_copy) | |
| ranked_results.sort(key=lambda x: x['relevance_score'], reverse=True) | |
| return ranked_results[:top_k] | |
| def gather_research(tavily_client, queries: List[str], max_results_per_query: int = 5) -> List[Dict]: | |
| """Gather research from multiple search queries""" | |
| all_results = [] | |
| seen_urls = set() | |
| for query in queries: | |
| try: | |
| print(f" Searching: {query[:50]}...") | |
| search_results = tavily_client.search( | |
| query=query, | |
| max_results=max_results_per_query, | |
| search_depth="advanced", | |
| include_answer=True, | |
| include_raw_content=True | |
| ) | |
| for result in search_results.get('results', []): | |
| url = result.get('url', '') | |
| if url and url not in seen_urls: | |
| seen_urls.add(url) | |
| all_results.append({ | |
| 'title': result.get('title', 'Unknown'), | |
| 'url': url, | |
| 'content': result.get('content', ''), | |
| 'raw_content': result.get('raw_content', ''), | |
| 'score': result.get('score', 0.0), | |
| 'query': query | |
| }) | |
| time.sleep(0.5) # Rate limiting | |
| except Exception as e: | |
| print(f" Search error for '{query}': {str(e)}") | |
| continue | |
| return all_results | |
| def run_verification_step(writer_model, section_text: str, research_context: str) -> str: | |
| """Verify claims and check for hallucinations""" | |
| verification_prompt = f""" | |
| You are a fact-checker. Review this section and the research context to identify any potential inaccuracies, unsupported claims, or hallucinations. | |
| SECTION TO VERIFY: | |
| {section_text} | |
| RESEARCH CONTEXT: | |
| {research_context[:3000]} | |
| Check for: | |
| 1. Claims not supported by the research | |
| 2. Factual inaccuracies | |
| 3. Misleading statements | |
| 4. Missing context | |
| If the section is accurate and well-supported, respond with "VERIFIED: Section is accurate." | |
| If issues are found, respond with "ISSUES FOUND:" followed by specific problems and suggested corrections. | |
| """ | |
| try: | |
| response = writer_model.generate_content( | |
| verification_prompt, | |
| generation_config=genai.types.GenerationConfig(temperature=0.1) | |
| ) | |
| verification_result = response.text | |
| if "VERIFIED" in verification_result.upper(): | |
| return section_text | |
| else: | |
| return f"{section_text}\n\n*Verification Note: {verification_result}*" | |
| except Exception as e: | |
| return section_text | |
| def get_clarifying_questions(model, topic: str) -> str: | |
| """Generate clarifying questions for research focus""" | |
| prompt = f""" | |
| You are a research strategist. For the topic "{topic}", generate 4-6 specific clarifying questions that will help create a more focused and comprehensive research report. | |
| Focus on: | |
| - Specific aspects or subtopics of interest | |
| - Target audience and use case | |
| - Geographical or temporal scope | |
| - Depth and technical level required | |
| - Particular perspectives or angles | |
| - Current vs historical focus | |
| Format as numbered questions. Be specific and actionable. | |
| Topic: {topic} | |
| """ | |
| try: | |
| response = model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| return f""" | |
| 1. What specific aspects of {topic} are you most interested in exploring? | |
| 2. Who is the intended audience for this research? | |
| 3. Are you looking for recent developments, historical analysis, or both? | |
| 4. What geographic regions or markets should be the focus? | |
| 5. What level of technical detail is appropriate? | |
| 6. Are there particular challenges or opportunities you want to emphasize? | |
| """ | |
| def research_and_plan(config, planner_model, tavily_client, topic: str, clarifications: str) -> Dict[str, Any]: | |
| """Create comprehensive research plan with search strategies""" | |
| # Step 1: Construct detailed research brief | |
| brief_prompt = f""" | |
| Based on the initial topic and user clarifications, create a detailed, focused research brief. | |
| Initial Topic: {topic} | |
| User Clarifications: {clarifications} | |
| Create a refined, specific research focus that incorporates the user's requirements. Be precise about scope, angle, and key areas to investigate. | |
| Respond with just the refined research brief (2-3 sentences): | |
| """ | |
| try: | |
| response = planner_model.generate_content(brief_prompt) | |
| detailed_topic = response.text.strip() | |
| except Exception as e: | |
| detailed_topic = f"Comprehensive analysis of {topic}" | |
| # Step 2: Initial broad research for context | |
| print("Conducting initial research for planning...") | |
| initial_queries = [detailed_topic, f"{topic} overview", f"{topic} recent developments"] | |
| initial_research = gather_research(tavily_client, initial_queries, 3) | |
| planning_context = "\n\n".join([ | |
| f"Source: {item['title']}\n{item['content'][:500]}" | |
| for item in initial_research[:10] | |
| ]) | |
| # Step 3: Generate detailed section plan | |
| planning_prompt = f""" | |
| Create a comprehensive research plan for: {detailed_topic} | |
| Research Context: | |
| {planning_context} | |
| Generate 6-8 detailed sections with specific search strategies for each. | |
| Respond in JSON format: | |
| {{ | |
| "detailed_topic": "{detailed_topic}", | |
| "sections": [ | |
| {{ | |
| "title": "Section Title", | |
| "description": "Detailed description of what this section will cover", | |
| "search_queries": ["specific query 1", "specific query 2", "specific query 3"], | |
| "key_questions": ["key question 1", "key question 2"] | |
| }} | |
| ] | |
| }} | |
| Make search queries specific and varied to capture different perspectives and sources. | |
| """ | |
| try: | |
| response = planner_model.generate_content( | |
| planning_prompt, | |
| generation_config=genai.types.GenerationConfig(temperature=0.3) | |
| ) | |
| # Extract JSON from response | |
| response_text = response.text.strip() | |
| json_start = response_text.find('{') | |
| json_end = response_text.rfind('}') + 1 | |
| if json_start != -1 and json_end != -1: | |
| json_text = response_text[json_start:json_end] | |
| plan_data = json.loads(json_text) | |
| return plan_data | |
| else: | |
| raise ValueError("No valid JSON found") | |
| except Exception as e: | |
| print(f"Planning error: {str(e)}") | |
| # Fallback plan | |
| return { | |
| "detailed_topic": detailed_topic, | |
| "sections": [ | |
| { | |
| "title": "Introduction and Background", | |
| "description": "Historical context and foundational overview", | |
| "search_queries": [f"{topic} history", f"{topic} background", f"what is {topic}"], | |
| "key_questions": [f"What is {topic}?", f"How did {topic} develop?"] | |
| }, | |
| { | |
| "title": "Current State and Recent Developments", | |
| "description": "Present situation and latest updates", | |
| "search_queries": [f"{topic} 2024", f"{topic} recent news", f"{topic} current trends"], | |
| "key_questions": [f"What is the current state of {topic}?", "What are recent developments?"] | |
| }, | |
| { | |
| "title": "Key Players and Market Analysis", | |
| "description": "Important organizations, companies, and market dynamics", | |
| "search_queries": [f"{topic} companies", f"{topic} market leaders", f"{topic} industry analysis"], | |
| "key_questions": ["Who are the key players?", "What is the market structure?"] | |
| }, | |
| { | |
| "title": "Challenges and Opportunities", | |
| "description": "Current challenges and future opportunities", | |
| "search_queries": [f"{topic} challenges", f"{topic} opportunities", f"{topic} problems"], | |
| "key_questions": ["What are the main challenges?", "What opportunities exist?"] | |
| }, | |
| { | |
| "title": "Future Outlook and Trends", | |
| "description": "Predictions and emerging trends", | |
| "search_queries": [f"{topic} future", f"{topic} predictions", f"{topic} trends 2024"], | |
| "key_questions": ["What does the future hold?", "What trends are emerging?"] | |
| }, | |
| { | |
| "title": "Conclusion and Implications", | |
| "description": "Summary and broader implications", | |
| "search_queries": [f"{topic} implications", f"{topic} impact", f"{topic} summary"], | |
| "key_questions": ["What are the key takeaways?", "What are the broader implications?"] | |
| } | |
| ] | |
| } | |
| def write_report_stream(config, writer_model, tavily_client, embedding_model, reranker, plan: Dict[str, Any]) -> Generator[str, None, None]: | |
| """Generate comprehensive research report with proper citations""" | |
| detailed_topic = plan.get('detailed_topic', 'Research Topic') | |
| sections = plan.get('sections', []) | |
| # Initialize report state | |
| report_content = f"# Deep Research Report: {detailed_topic}\n\n" | |
| report_content += f"*Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n" | |
| all_sources = {} | |
| citation_counter = 1 | |
| rag_pipeline = RAGPipeline(embedding_model, reranker) | |
| yield f"π¬ **Starting Deep Research Process**\n\n**Topic:** {detailed_topic}\n**Sections:** {len(sections)}\n\n---\n\n" | |
| for i, section in enumerate(sections): | |
| section_title = section.get('title', f'Section {i+1}') | |
| section_desc = section.get('description', '') | |
| search_queries = section.get('search_queries', [f"{detailed_topic} {section_title}"]) | |
| yield f"### π Section {i+1}/{len(sections)}: {section_title}\n\n" | |
| # Gather research for this section | |
| yield f"π **Searching web sources...**\n" | |
| for j, query in enumerate(search_queries[:3]): # Limit to 3 queries per section | |
| yield f" β Query {j+1}: `{query}`\n" | |
| section_research = gather_research(tavily_client, search_queries, config.DEEP_DIVE_SEARCH_RESULTS) | |
| if not section_research: | |
| yield f"β οΈ No sources found for this section\n\n" | |
| continue | |
| yield f"β **Found {len(section_research)} sources**\n\n" | |
| yield f"π **Processing and ranking content...**\n" | |
| # Index and retrieve relevant content | |
| rag_pipeline.index_research(section_research) | |
| relevant_chunks = rag_pipeline.retrieve_and_rerank( | |
| section_desc, | |
| top_k=config.CHUNKS_TO_USE_FOR_WRITING | |
| ) | |
| # Build context with citations | |
| context_for_llm = "" | |
| section_sources = {} | |
| for chunk in relevant_chunks: | |
| source_url = chunk['source'] | |
| if source_url not in all_sources: | |
| all_sources[source_url] = { | |
| 'number': citation_counter, | |
| 'title': chunk.get('title', 'Unknown Title'), | |
| 'url': source_url | |
| } | |
| citation_counter += 1 | |
| source_num = all_sources[source_url]['number'] | |
| section_sources[source_url] = source_num | |
| context_for_llm += f"[Source {source_num}] {chunk['content']}\n\n" | |
| yield f"βοΈ **Writing section content...**\n" | |
| # Generate section content | |
| writer_prompt = f""" | |
| Write a comprehensive section titled "{section_title}" for a research report on "{detailed_topic}". | |
| Section Description: {section_desc} | |
| Research Context: | |
| {context_for_llm} | |
| Requirements: | |
| - Write 4-6 well-structured paragraphs | |
| - Use information from the provided sources | |
| - Include in-text citations using [Source X] format | |
| - Maintain academic writing style | |
| - Ensure accuracy and relevance | |
| - Connect logically to the overall topic | |
| Write only the section content (without the title - it will be added automatically). | |
| Include proper citations for all claims using the [Source X] format provided in the context. | |
| """ | |
| try: | |
| response = writer_model.generate_content( | |
| writer_prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=config.WRITER_TEMPERATURE, | |
| max_output_tokens=1500 | |
| ) | |
| ) | |
| section_content = response.text.strip() | |
| except Exception as e: | |
| section_content = f"Error generating content: {str(e)}" | |
| yield f"π **Fact-checking content...**\n" | |
| # Verification step | |
| verified_content = run_verification_step(writer_model, section_content, context_for_llm[:2000]) | |
| # Add section to report | |
| section_bibliography = "\n".join([ | |
| f"[{num}] {all_sources[url]['title']} - {url}" | |
| for url, num in section_sources.items() | |
| ]) | |
| final_section = f"## {section_title}\n\n{verified_content}\n\n**Section Sources:**\n{section_bibliography}\n\n" | |
| report_content += final_section | |
| yield f"β **Section {i+1} completed**\n\n---\n\n" | |
| # Add master bibliography | |
| yield f"π **Compiling final bibliography...**\n" | |
| master_bibliography = "## Complete Bibliography\n\n" | |
| for source_data in sorted(all_sources.values(), key=lambda x: x['number']): | |
| master_bibliography += f"[{source_data['number']}] {source_data['title']}\n {source_data['url']}\n\n" | |
| report_content += master_bibliography | |
| # Add methodology section | |
| methodology = f"""## Research Methodology | |
| This report was generated using a comprehensive research methodology: | |
| 1. **Topic Refinement**: Initial topic was refined based on user clarifications | |
| 2. **Multi-Query Search**: Each section used 3-5 targeted search queries | |
| 3. **Source Gathering**: Collected {len(all_sources)} unique sources using advanced web search | |
| 4. **Content Processing**: Documents were chunked and embedded for semantic retrieval | |
| 5. **Relevance Ranking**: Used cross-encoder re-ranking for optimal content selection | |
| 6. **Citation Integration**: All claims are supported by cited sources | |
| 7. **Fact Verification**: Each section underwent verification for accuracy | |
| 8. **Quality Assurance**: Final review for coherence and completeness | |
| *Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} using AI-powered research pipeline* | |
| """ | |
| report_content += methodology | |
| yield f"π **Research Complete!**\n\n**Final Report:**\n- {len(sections)} sections\n- {len(all_sources)} sources cited\n- {len(report_content.split())} words\n\n---\n\n" | |
| # Final yield with complete report | |
| yield report_content |