Spaces:
Runtime error
Runtime error
| from fasthtml_hf import setup_hf_backup | |
| import os | |
| import pandas as pd | |
| import traceback | |
| from datetime import datetime | |
| from typing import Literal | |
| from pydantic import BaseModel, Field | |
| from fasthtml.common import * | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain.output_parsers import PydanticOutputParser | |
| from langchain_openai import ChatOpenAI | |
| from langchain_anthropic import ChatAnthropic | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_community.utilities.wikipedia import WikipediaAPIWrapper | |
| from langchain_community.tools.wikipedia.tool import WikipediaQueryRun | |
| # Set up the app, including daisyui and tailwind for the chat component | |
| tlink = Script(src="https://cdn.tailwindcss.com"), | |
| dlink = Link(rel="stylesheet", href="https://cdn.jsdelivr.net/npm/daisyui@4.11.1/dist/full.min.css") | |
| assets_dir = "/Users/manaranjanp/Documents/Work/MyLearnings/fastHTML/llmtimeline/assets" | |
| app = FastHTML(hdrs=(tlink, dlink, picolink)) | |
| # Pydantic models | |
| class Event(BaseModel): | |
| time: datetime = Field(description="When the event occurred") | |
| description: str = Field(description="A summary of what happened. Not more than 20 words.") | |
| sentiment: Literal["Positive", "Negative"] = Field(..., description="Categorization of the event sentiment") | |
| class EventResponse(BaseModel): | |
| events: List[Event] = Field(min_length=5, max_length=30, description="List of events extracted from the context") | |
| # Set up the Pydantic output parser | |
| parser = PydanticOutputParser(pydantic_object=EventResponse) | |
| # LangChain prompt template with format instructions | |
| event_extraction_template = """ | |
| Extract the time based informations or events from the context and return a list of events with time, event description and event sentiment type whether it was positive or negative event. | |
| The context may contain information about people, organization or any other entity. | |
| <context> | |
| {context} | |
| </context> | |
| The response must follow the following schema strictly. There will be penalty for not following the schema. | |
| <schema> | |
| {format_instructions} | |
| </schema> | |
| Must ensure the event belongs to the topic {topic} and try to get at least {numevents} unique events possible from the context. | |
| Output: | |
| """ | |
| event_prompt = PromptTemplate( | |
| input_variables=["topic", "context"], | |
| partial_variables={"format_instructions": parser.get_format_instructions()}, | |
| template=event_extraction_template | |
| ) | |
| # Function to get the appropriate language model based on user selection | |
| def getModel(model, key): | |
| if(model == 'OpenAI Gpt-4o'): | |
| os.environ['OPENAI_API_KEY'] = key | |
| return ChatOpenAI(temperature=0, # Set to 0 for deterministic output | |
| model="gpt-4o-2024-08-06", # Using the GPT-4 Turbo model | |
| max_tokens=8000) # Limit the response length | |
| elif (model == 'Anthropic Claude'): | |
| os.environ['ANTHROPIC_API_KEY'] = key | |
| return ChatAnthropic(model='claude-3-5-sonnet-20240620') # Limit the response length | |
| else: | |
| os.environ['GOOGLE_API_KEY'] = key | |
| return ChatGoogleGenerativeAI( | |
| model="gemini-1.5-pro", | |
| temperature=0, | |
| max_tokens=8000, | |
| max_retries=2, | |
| ) | |
| # Function to generate an HTML table from the summary object | |
| #def generate_timeline_html(timeline): | |
| # rows = [] | |
| # for idx, tline in timeline.iterrows(): | |
| # if(tline['Sentiment'] == "Positive"): | |
| # rows.append(Div(Div( H2(tline['Time']), P(tline['Event']), cls = 'content'), cls = "container left")) | |
| # else: | |
| # rows.append(Div(Div( H2(tline['Time']), P(tline['Event']), cls = 'content'), cls = "container right")) | |
| # | |
| # return Div(*rows, cls="timeline") | |
| # Function to generate an HTML table from the summary object | |
| def generate_timeline_html(timeline): | |
| rows = [] | |
| for idx, tline in timeline.iterrows(): | |
| if idx % 2 == 0: | |
| rows.append(Li(Div(File("./assets/circle.svg"), cls = "timeline-middle"), | |
| Div(Time(tline['TimeStr'], | |
| cls = "font-mono italic"), | |
| Div(tline['Event'], | |
| cls = 'text-lg font-black'), | |
| cls = "timeline-start mb-10 md:text-end"), | |
| Hr())) | |
| else: | |
| rows.append(Li(Div(File("./assets/circle.svg"), cls = "timeline-middle"), | |
| Div(Time(tline['TimeStr'], | |
| cls = "font-mono italic"), | |
| Div(tline['Event'], | |
| cls = 'text-lg font-black'), | |
| cls = "timeline-end mb-10"), | |
| Hr())) | |
| return Ul(*rows, cls="timeline timeline-vertical") | |
| def get_timeline_df(result): | |
| results_data = [] | |
| # Parse the final result into GradedQAPair objects | |
| try: | |
| if not isinstance(result, EventResponse): | |
| raise ValueError(f"Expected a list, but got {type(result)}") | |
| except Exception as e: | |
| print(f"An error occurred during analysis: {str(e)}") | |
| raise | |
| except Exception as e: | |
| print(f"An error occurred during analysis: {str(e)}") | |
| raise | |
| if isinstance(result, EventResponse): | |
| # Create a list to hold the data for the DataFrame | |
| for event in result.events: | |
| results_data.append({ | |
| 'Time': event.time, | |
| 'Event': event.description, | |
| 'Sentiment': event.sentiment | |
| }) | |
| df = pd.DataFrame(results_data) | |
| df = df.sort_values("Time", ascending = True).reset_index() | |
| df['TimeStr'] = df['Time'].map(lambda x: x.strftime('%d/%m/%Y')) | |
| return df | |
| # Placeholder function for Q&A generation | |
| def generate_timeline(topic, numevents, llm): | |
| # This function will be implemented later | |
| # For now, return a sample DataFrame | |
| # titles = wikipedia.search(topic, results = 1) | |
| # page = wikipedia.page(titles[0]) | |
| # wiki_content = page.content | |
| wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper(top_k_results=3, doc_content_chars_max=5000)) | |
| wiki_content = wikipedia.run(topic) | |
| print(f"wiki_content: {wiki_content}") | |
| # print(f"wiki_artifact: {wiki_artifact}") | |
| chain = event_prompt | llm | parser | |
| result = chain.invoke({"context" : wiki_content, | |
| "topic": topic, | |
| "numevents": numevents}) | |
| try: | |
| # Parse the output using PydanticOutputParser | |
| # response = parser.parse(result) | |
| # Create the DataFrame | |
| print(f"Results: {result}") | |
| # timeline = parser.parse(result) | |
| df = get_timeline_df(result) | |
| # Optionally, save the DataFrame to a CSV file | |
| df.to_csv(f"{topic.replace(' ', '_')}_timeline.csv", index=True) | |
| print("Results saved to 'results.csv'") | |
| except Exception as e: | |
| print(f"Error parsing LLM output: {str(e)}") | |
| return None | |
| return df | |
| # Function to generate the configuration form for the web interface | |
| def getConfigForm(): | |
| return Card(Form(hx_post="/submit", hx_target="#result", hx_swap_oob="innerHTML", hx_indicator="#indicator")( | |
| Div( | |
| Label(Strong("Model and Topic: "), style="color:#3498db; font-size:25px;") | |
| ), | |
| Div( | |
| Span(Strong('Model: '), cls ="badge"), | |
| Select(Option("OpenAI Gpt-4o"), Option("Anthropic Claude"), Option("Google Gemini"), id="model", cls = 'select w-full max-w-xs') | |
| ), | |
| Div( | |
| Span(Strong('API Key: '), cls ="badge"), | |
| Input(id="secret", type="password", placeholder="Key: "), | |
| ), | |
| Div( | |
| Span(Strong('Topic for timeline (Person/Organization/Event): '), cls ="badge"), | |
| Input(type = 'text', | |
| id="topic", | |
| cls = "input w-full max-w-xs", | |
| placeholder = "Type here") | |
| ), | |
| Div( | |
| Span(Strong('How many events: '), cls ="badge"), | |
| Select(Option("5"), Option("10"), Option("20"), Option("30"), id="numevents", cls = 'select w-full max-w-xs') | |
| ), | |
| Div( | |
| Button("Generate Timeline", cls = 'btn') | |
| ), | |
| Div( | |
| Br(), | |
| A("Developed by Manaranjan Pradhan", href="http://www.manaranjanp.com/", | |
| target="_blank", | |
| style = 'color: red; font-size: 16px;') | |
| ))) | |
| # Define the route for the homepage | |
| def homepage(): | |
| return Titled(Card(H2('Generate a Timeline Dashboard using AI', cls = 'text-4xl font-bold')), Grid( getConfigForm(), | |
| Div( | |
| Div(id="result"), | |
| Div(Label(Strong('Generating timeline for the topic.... take a deep breath....')), | |
| Progress(), id="indicator", cls="htmx-indicator") | |
| ) | |
| , style="grid-template-columns: 400px 1000px; gap: 50px;" | |
| )) | |
| async def get(fname: str, ext: str): | |
| fpath:str = (assets_dir)+'/'+str(fname)+'.'+str(ext) | |
| if os.path.isfile(fpath): | |
| response = FileResponse(fpath, media_type="image/svg") | |
| print("file sent:"+fpath) | |
| else: | |
| print("file failed:"+fpath) | |
| response = HTTPException(status_code=404, detail="File not found") | |
| # Define the route for form submission | |
| async def post(d:dict): | |
| try: | |
| # Get the appropriate language model | |
| model = getModel(d['model'], d['secret']) | |
| # Perform one-pass summarization | |
| timeline_df = generate_timeline(d['topic'], | |
| d['numevents'], | |
| model) | |
| #qas = pd.read_csv("results_tesla.csv") | |
| timeline_df.head(10) | |
| # Generate and return the HTML table with the summaries | |
| return generate_timeline_html(timeline_df) | |
| except BaseException as e: | |
| print(traceback.format_exc()) | |
| return str(e) | |
| setup_hf_backup(app) | |
| # Start the FastAPI server | |
| serve() | |