BookRecommender / agents.py
Lui3ui3ui's picture
Upload 2 files
61c6352 verified
from langgraph.graph import StateGraph, END
from search import duckduckgo_search
import asyncio
import re
import json
import asyncio
import httpx
import os
import ast
import traceback
from huggingface_hub import InferenceClient
# Create a single shared client
# It will read your HUGGINGFACEHUB_API_TOKEN from the env for authentication
client = InferenceClient(token=os.getenv("HF_API_TOKEN"))
async def hf_chat(model: str, messages: list[dict]):
loop = asyncio.get_running_loop()
def _sync_call():
# Ensure you have initialized the client with your HF_API_TOKEN
return client.chat.completions.create(
model=model,
messages=messages,
# you can pass generation params here too
# temperature=0.7, max_tokens=512, ...
)
completion = await loop.run_in_executor(None, _sync_call)
return {
"message": {
"role": completion.choices[0].message.role,
"content": completion.choices[0].message.content
}
}
# Alias `chat` to your HF-backed version
chat = hf_chat
class AsyncLogger:
def __init__(self):
self._log = []
self._lock = asyncio.Lock()
async def log(self, message):
async with self._lock:
self._log.append(message)
async def get_log(self):
async with self._lock:
return "\n".join(self._log)
async def clear(self):
async with self._lock:
self._log.clear()
logger = AsyncLogger()
def extract_json_array(text: str):
# Remove Markdown/HTML formatting
text = re.sub(r"```(?:json)?\n?|</?(?:pre|code|p)>", "", text, flags=re.IGNORECASE)
# Extract the first [...] block
match = re.search(r"(\[\s*{.*?}\s*\])", text, re.DOTALL)
if not match:
# Try to find any JSON array in the text
match = re.search(r"(\[.*?\])", text, re.DOTALL)
if not match:
return []
json_str = match.group(1)
# Try parsing as JSON
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
print("[extract_json_array] JSON decode error:", e)
# Fallback: try ast.literal_eval
try:
return ast.literal_eval(json_str)
except Exception as e2:
print("[extract_json_array] literal_eval failed:", e2)
# Try to fix common JSON issues
try:
# Remove trailing commas
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
# Fix unquoted keys
json_str = re.sub(r'(\w+):', r'"\1":', json_str)
# Fix extra quotes around objects
json_str = re.sub(r'"\s*{\s*"', '{"', json_str)
json_str = re.sub(r'"\s*}\s*"', '"}', json_str)
# Fix missing commas between objects
json_str = re.sub(r'"\s*}\s*{', '"},{', json_str)
# Fix extra quotes around individual objects in arrays
json_str = re.sub(r'"\s*({[^}]+})\s*"', r'\1', json_str)
return json.loads(json_str)
except Exception as e3:
print("[extract_json_array] JSON fixing failed:", e3)
return []
def safe_json_parse(content: str, fallback_value=None):
"""Safely parse JSON content with multiple fallback strategies"""
if fallback_value is None:
fallback_value = []
# Clean the content
cleaned_content = re.sub(r"```(?:json)?\n?|</?(?:pre|code|p)>", "", content, flags=re.IGNORECASE).strip()
# Try direct JSON parsing
try:
return json.loads(cleaned_content)
except json.JSONDecodeError as e:
print(f"[safe_json_parse] JSONDecodeError: {e}")
# Try to extract JSON array
extracted = extract_json_array(cleaned_content)
if extracted:
return extracted
# Try ast.literal_eval
try:
return ast.literal_eval(cleaned_content)
except Exception as e2:
print(f"[safe_json_parse] literal_eval failed: {e2}")
# Try to fix common JSON issues
try:
# Remove trailing commas
fixed_content = re.sub(r',\s*}', '}', cleaned_content)
fixed_content = re.sub(r',\s*]', ']', fixed_content)
# Fix unquoted keys
fixed_content = re.sub(r'(\w+):', r'"\1":', fixed_content)
# Fix single quotes to double quotes
fixed_content = fixed_content.replace("'", '"')
# Fix extra quotes around objects
fixed_content = re.sub(r'"\s*{\s*"', '{"', fixed_content)
fixed_content = re.sub(r'"\s*}\s*"', '"}', fixed_content)
# Fix missing commas between objects
fixed_content = re.sub(r'"\s*}\s*{', '"},{', fixed_content)
# Fix extra quotes around individual objects in arrays
fixed_content = re.sub(r'"\s*({[^}]+})\s*"', r'\1', fixed_content)
return json.loads(fixed_content)
except Exception as e3:
print(f"[safe_json_parse] JSON fixing failed: {e3}")
return fallback_value
def merge_state(current_state: dict, new_data: dict) -> dict:
"""Safely merge new data into current state, preserving existing data"""
merged_state = current_state.copy()
for key, value in new_data.items():
if key in merged_state:
# If both are lists, extend the current list
if isinstance(merged_state[key], list) and isinstance(value, list):
merged_state[key].extend(value)
# If both are strings, concatenate them
elif isinstance(merged_state[key], str) and isinstance(value, str):
merged_state[key] += "\n" + value
# Otherwise, overwrite
else:
merged_state[key] = value
else:
merged_state[key] = value
return merged_state
# Node 1: Extract books from user input
async def extract_books_node(state):
try:
print("[extract_books_node] πŸ‘‰ enter")
user_input = state.get("user_input", "")
prompt = (
"Extract all book titles and authors from the user input. Do not add books on your own, just take the user input."
"If a book is mentioned but the author is missing, try to fill the missing author in using reasoning with your knowledge."
"IMPORTANT: Output ONLY a valid JSON array with this exact format:\n"
'[{"title": "Book Title", "author": "Author Name"}]\n'
"Rules:\n"
"- Use double quotes for all strings\n"
"- No trailing commas\n"
"- No markdown formatting or code blocks\n"
"- No explanations or extra text\n"
"- If no books found, return empty array: []\n\n"
f"User input: {user_input}"
)
print("[extract_books_node] Prompt sent to LLM:\n", prompt)
response = await chat(
model="mistralai/Mistral-7B-Instruct-v0.2",
messages=[{"role":"user","content": prompt}]
)
content = response["message"]["content"]
print("[extract_books_node] Raw LLM response:\n", repr(content))
print(f"[extract_books_node] Response type: {type(content)}, length: {len(content)}")
# Use the new safe JSON parsing function
books = safe_json_parse(content, fallback_value=[])
# If parsing completely failed, try to extract book titles manually
if not books and content:
print("[extract_books_node] JSON parsing failed, attempting manual extraction")
# Look for patterns like "title" or "book" in the content
lines = content.split('\n')
manual_books = []
for line in lines:
line = line.strip()
if any(keyword in line.lower() for keyword in ['title', 'book', 'author']):
# Try to extract title and author from the line
title_match = re.search(r'"title":\s*"([^"]+)"', line)
author_match = re.search(r'"author":\s*"([^"]+)"', line)
if title_match:
title = title_match.group(1)
author = author_match.group(1) if author_match else "Unknown"
manual_books.append({"title": title, "author": author})
if manual_books:
books = manual_books
print("[extract_books_node] Manual extraction successful:", books)
else:
# Last resort: try to extract from the specific malformed pattern we saw
print("[extract_books_node] Attempting pattern-based extraction")
# Look for patterns like "title": "Book Name"
title_matches = re.findall(r'"title":\s*"([^"]+)"', content)
author_matches = re.findall(r'"author":\s*"([^"]+)"', content)
if title_matches:
for i, title in enumerate(title_matches):
author = author_matches[i] if i < len(author_matches) else "Unknown"
manual_books.append({"title": title, "author": author})
if manual_books:
books = manual_books
print("[extract_books_node] Pattern-based extraction successful:", books)
# Additional fix: if books is a list but contains malformed strings, try to fix them
if isinstance(books, list) and books:
print("[extract_books_node] Checking for malformed book entries...")
fixed_books = []
for book in books:
if isinstance(book, str):
# Try to parse the string as JSON
try:
# Remove extra quotes around the object
cleaned_book = book.strip()
if cleaned_book.startswith('"') and cleaned_book.endswith('"'):
cleaned_book = cleaned_book[1:-1]
parsed_book = json.loads(cleaned_book)
if isinstance(parsed_book, dict) and parsed_book.get("title"):
fixed_books.append(parsed_book)
except:
# Try regex extraction as fallback
title_match = re.search(r'"title":\s*"([^"]+)"', book)
author_match = re.search(r'"author":\s*"([^"]+)"', book)
if title_match:
title = title_match.group(1)
author = author_match.group(1) if author_match else "Unknown"
fixed_books.append({"title": title, "author": author})
elif isinstance(book, dict) and book.get("title"):
fixed_books.append(book)
if fixed_books:
books = fixed_books
print("[extract_books_node] Fixed malformed book entries:", books)
print("[extract_books_node] Parsed books:", books)
# Ensure books is a list and each book has required fields
if not isinstance(books, list):
books = []
# Validate and clean each book entry
validated_books = []
for book in books:
if isinstance(book, dict):
validated_book = {
"title": str(book.get("title", "")).strip(),
"author": str(book.get("author", "")).strip()
}
if validated_book["title"]: # Only add if title is not empty
validated_books.append(validated_book)
print("[extract_books_node] Validated books:", validated_books)
print("[extract_books_node] πŸ‘ˆ exit with", {"extracted_books": validated_books})
return {"extracted_books": validated_books}
except Exception as e:
print("[extract_books_node] ❌ exception:", repr(e))
print("[extract_books_node] Traceback:\n", traceback.format_exc())
raise
# Node 1.1 New Node: Complete missing authors
async def complete_authors_node(state):
try:
print("[complete_authors_node] πŸ‘‰ enter")
books = state.get("extracted_books", [])
incomplete_books = [book for book in books if not book.get("author", "").strip()]
if not incomplete_books:
print("[complete_authors_node] No missing authors to complete.")
return {"extracted_books": books}
# Prepare prompt for LLM
prompt = (
"You are given a list of books with some missing authors. "
"For each book, fill in the correct author using your knowledge. "
"IMPORTANT: Output ONLY a valid JSON array with this exact format:\n"
'[{"title": "Book Title", "author": "Author Name"}]\n'
"Rules:\n"
"- Use double quotes for all strings\n"
"- No trailing commas\n"
"- No markdown formatting or code blocks\n"
"- No explanations or extra text\n"
"- Return all books, not just the ones with missing authors\n\n"
f"Books with missing authors:\n{json.dumps(incomplete_books, ensure_ascii=False)}"
)
print("[complete_authors_node] Prompt sent to LLM:\n", prompt)
response = await chat(
model="mistralai/Mistral-7B-Instruct-v0.2",
messages=[{"role": "user", "content": prompt}]
)
content = response["message"]["content"]
print("[complete_authors_node] Raw LLM response:\n", repr(content))
print(f"[complete_authors_node] Response type: {type(content)}, length: {len(content)}")
# Use the new safe JSON parsing function
completed_books_from_llm = safe_json_parse(content, fallback_value=[])
print("[complete_authors_node] Parsed completed books:", completed_books_from_llm)
# Merge back into the full book list
title_to_author = {book["title"]: book.get("author", "Unknown") for book in completed_books_from_llm}
completed_books = []
for book in books:
title = book.get("title", "").strip()
author = book.get("author", "").strip()
if not author:
# Fill from LLM result or fallback to DuckDuckGo
author = title_to_author.get(title, "").strip()
if not author:
# DuckDuckGo fallback if still missing
query = f"{title} book author"
print(f"[complete_authors_node] Searching DuckDuckGo for author: {query}")
search_results = await duckduckgo_search(query)
found_author = "Unknown"
if search_results:
for res in search_results:
snippet = res.get("snippet", "")
title_text = res.get("title", "")
match = re.search(r"by ([A-Z][a-z]+(?: [A-Z][a-z]+)*)", snippet + " " + title_text)
if match:
found_author = match.group(1)
print(f"[complete_authors_node] Found author '{found_author}' for book '{title}'")
break
author = found_author
completed_books.append({
"title": title,
"author": author
})
# Validate the completed books
validated_books = []
for book in completed_books:
if isinstance(book, dict):
validated_book = {
"title": str(book.get("title", "")).strip(),
"author": str(book.get("author", "")).strip()
}
if validated_book["title"]: # Only add if title is not empty
validated_books.append(validated_book)
print("[complete_authors_node] Validated completed books:", validated_books)
return {"extracted_books": validated_books}
except Exception as e:
print("[complete_authors_node] ❌ exception:", repr(e))
print("[complete_authors_node] Traceback:\n", traceback.format_exc())
raise
# Node 2
async def recommend_books_node(state):
try:
print("[recommend_books_node] πŸ‘‰ enter")
extracted_books = state.get("extracted_books", [])
reasoning_steps = []
recommended_books = []
print("[recommend_books_node] Extracted books:", extracted_books)
# await logger.log(f"[recommend_books_node] Extracted books: {extracted_books}")
if not extracted_books:
reasoning_steps.append("No books extracted from the input. Check if the extraction failed.")
return {"recommendations": [], "reasoning": "\n".join(reasoning_steps)}
for book in extracted_books:
title = book.get("title", "")
author = book.get("author", "")
query = f"Books similar to '{title}' by {author}"
reasoning_steps.append(f"Searching DuckDuckGo with query: {query}")
print(f"[recommend_books_node] Searching with query: {query}")
# await logger.log(f"Searching DuckDuckGo with query: {query}")
search_results = await duckduckgo_search(query)
if not search_results:
reasoning_steps.append(f"No results found for: {query}")
print(f"[recommend_books_node] No results found for query: {query}")
# await logger.log(f"No results found for query: {query}")
continue
print(f"[recommend_books_node] Results for query '{query}': {search_results}")
for res in search_results:
recommended_books.append({
"title": res.get("title", "No Title"),
"link": res.get("link", ""),
"snippet": res.get("snippet", "")
})
reasoning_steps.append(f"βœ… Found: {res.get('title', 'No Title')} ({res.get('link', '')})")
if not recommended_books:
reasoning_steps.append("No recommendations found across all queries.")
print("[recommend_books_node] Final recommendations:", recommended_books)
print("[recommend_books_node] πŸ‘ˆ exit with", {"recommendations": recommended_books, "reasoning": "\n".join(reasoning_steps)})
return {
"recommendations": recommended_books,
"reasoning": "\n".join(reasoning_steps)
}
except Exception as e:
print("[extract_books_node] ❌ exception:", repr(e))
raise
# Node 3: Reason about the search results and generate recommendations
async def reasoning_node(state):
try:
recommendations = state.get("recommendations", [])
initial_reasoning = state.get("reasoning", "")
if not recommendations:
final_reasoning = initial_reasoning + "\nNo recommendations found to reason about."
print("[reasoning_node] No recommendations to process.")
return {"final_recommendations": [], "final_reasoning": final_reasoning}
# Format recommendations as input for the LLM
recommendations_text = "\n".join(
[f"Title: {rec['title']}\nLink: {rec['link']}\nSnippet: {rec['snippet']}\n" for rec in recommendations]
)
prompt = (
"You are a helpful book recommendation expert. You are given a web search result. "
"Analyze it and select the most relevant book recommendations. Explain why you recommend each book. "
"Do not recommend the same books from the user input!"
"IMPORTANT: Output ONLY a valid JSON array with this exact format:\n"
'[{"title": "Book Title", "reason": "Why this book is recommended", "link": "URL"}]\n'
"Rules:\n"
"- Use double quotes for all strings\n"
"- No trailing commas\n"
"- No markdown formatting or code blocks\n"
"- No explanations or extra text\n"
"- If no good recommendations, return empty array: []\n\n"
f"Books found from search:\n{recommendations_text}"
)
print("[reasoning_node] Prompt sent to LLM:\n", prompt)
response = await chat(
model="mistralai/Mistral-7B-Instruct-v0.2",
messages=[{"role":"user","content": prompt}]
)
content = response['message']['content']
print("[reasoning_node] Raw LLM response:\n", repr(content))
print(f"[reasoning_node] Response type: {type(content)}, length: {len(content)}")
print("[reasoning_node] Response content (first 500 chars):", content[:500])
# Use the new safe JSON parsing function
final_recommendations = safe_json_parse(content, fallback_value=[])
print("[reasoning_node] Parsed final recommendations:", final_recommendations)
print("[reasoning_node] Type of final_recommendations:", type(final_recommendations))
print("[reasoning_node] Length of final_recommendations:", len(final_recommendations) if isinstance(final_recommendations, list) else "Not a list")
# Compose final reasoning combining initial and LLM results
final_reasoning = initial_reasoning + "\n\nFinal reasoning:\n"
for rec in final_recommendations:
final_reasoning += f"βœ… Recommended: {rec.get('title', 'Unknown')} - {rec.get('reason', 'No reason provided.')}\n"
# Validate final recommendations
validated_recommendations = []
if isinstance(final_recommendations, list):
for rec in final_recommendations:
if isinstance(rec, dict):
validated_rec = {
"title": str(rec.get("title", "")).strip(),
"reason": str(rec.get("reason", "")).strip(),
"link": str(rec.get("link", "")).strip()
}
if validated_rec["title"]: # Only add if title is not empty
validated_recommendations.append(validated_rec)
print("[reasoning_node] Validated final recommendations:", validated_recommendations)
print("[reasoning_node] Final reasoning:\n", final_reasoning)
# Return the new state with our data
result_state = {
"final_recommendations": validated_recommendations,
"final_reasoning": final_reasoning
}
print("[reasoning_node] Returning state with keys:", list(result_state.keys()))
print("[reasoning_node] πŸ‘ˆ exit with", result_state)
# Try returning as a dict to ensure proper state handling
return dict(result_state)
except Exception as e:
print("[reasoning_node] ❌ exception:", repr(e))
print("[reasoning_node] Traceback:\n", traceback.format_exc())
# Return a safe fallback state instead of raising
print("[reasoning_node] Returning fallback state due to exception")
return {
"final_recommendations": [],
"final_reasoning": f"Error in reasoning node: {str(e)}"
}
# Build the graph
def build_graph():
graph = StateGraph(dict)
graph.add_node("extract_books", extract_books_node)
graph.add_node("complete_authors", complete_authors_node) # <-- New node
graph.add_node("recommend_books", recommend_books_node)
graph.add_node("reasoning", reasoning_node)
# Define edges
graph.add_edge("extract_books", "complete_authors") # Modified
graph.add_edge("complete_authors", "recommend_books") # Modified
graph.add_edge("recommend_books", "reasoning")
graph.add_edge("reasoning", END)
graph.set_entry_point("extract_books")
return graph.compile()