File size: 12,049 Bytes
a878557 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | import gradio as gr
import pandas as pd
import numpy as np
from transformers import pipeline
import re
import datetime
import warnings
warnings.filterwarnings('ignore')
# Initialize the text-to-text model for question answering
qa_model = pipeline(
"text2text-generation",
model="google/flan-t5-small", # Using a smaller model for free tier compatibility
device=-1 # Use CPU
)
# Global variable to store the dataframe
df = None
df_text_representation = None
def preprocess_dataframe(df):
"""Process dataframe to handle common data types and create a text representation."""
# Convert date-like columns to datetime
for col in df.columns:
if df[col].dtype == 'object':
try:
# Try to convert to datetime
df[col] = pd.to_datetime(df[col], errors='ignore')
except:
pass
# Create a text description of the dataframe
column_descriptions = []
for col in df.columns:
dtype = df[col].dtype
if pd.api.types.is_numeric_dtype(dtype):
if pd.api.types.is_integer_dtype(dtype):
type_desc = "integer"
min_val = df[col].min() if not df[col].isna().all() else "unknown"
max_val = df[col].max() if not df[col].isna().all() else "unknown"
column_descriptions.append(f"Column '{col}' contains {type_desc} values ranging from {min_val} to {max_val}")
else:
type_desc = "decimal"
min_val = round(df[col].min(), 2) if not df[col].isna().all() else "unknown"
max_val = round(df[col].max(), 2) if not df[col].isna().all() else "unknown"
column_descriptions.append(f"Column '{col}' contains {type_desc} values ranging from {min_val} to {max_val}")
elif pd.api.types.is_datetime64_dtype(dtype):
earliest = df[col].min() if not df[col].isna().all() else "unknown"
latest = df[col].max() if not df[col].isna().all() else "unknown"
column_descriptions.append(f"Column '{col}' contains date/time values from {earliest} to {latest}")
else:
unique_count = df[col].nunique()
if unique_count <= 10: # For columns with few unique values, list them
unique_vals = ", ".join([f"'{str(val)}'" for val in df[col].dropna().unique()[:10]])
column_descriptions.append(f"Column '{col}' contains text values including: {unique_vals}")
else:
column_descriptions.append(f"Column '{col}' contains text values with {unique_count} unique entries")
# Create a general dataframe description
df_desc = [
f"This is a dataset with {df.shape[0]} rows and {df.shape[1]} columns.",
f"The columns are: {', '.join([f'{col}' for col in df.columns])}.",
"Column details:"
]
return df, "\n".join(df_desc + column_descriptions)
def extract_numeric_values(text):
"""Extract numeric values from a string."""
# Try to find decimal numbers first
decimal_matches = re.findall(r'-?\d+\.\d+', text)
if decimal_matches:
return [float(m) for m in decimal_matches]
# Then look for integers
int_matches = re.findall(r'-?\d+', text)
if int_matches:
return [int(m) for m in int_matches]
return []
def extract_dates(text):
"""Extract potential date references from text."""
date_patterns = [
r'\d{4}-\d{1,2}-\d{1,2}', # YYYY-MM-DD
r'\d{1,2}/\d{1,2}/\d{2,4}', # MM/DD/YYYY or DD/MM/YYYY
r'\d{1,2}-\d{1,2}-\d{2,4}', # MM-DD-YYYY or DD-MM-YYYY
r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]* \d{1,2},? \d{4}\b' # Month DD, YYYY
]
dates = []
for pattern in date_patterns:
matches = re.findall(pattern, text)
dates.extend(matches)
return dates
def interpret_query(query, df_text):
"""Use the language model to interpret the query about the dataframe."""
prompt = f"""
You are an AI assistant that helps analyze data. Given the following dataset description and a user question,
provide a clear and concise answer based only on the information in the dataset.
Dataset information:
{df_text}
User question: {query}
Your answer should be informative and direct. If the question cannot be answered with the available information,
explain why. If the question requires finding specific data, provide the relevant values or data points.
"""
result = qa_model(prompt, max_length=512, do_sample=False)
return result[0]['generated_text']
def get_data_insight(query, df):
"""Try to derive insights from the dataframe based on the query."""
response = ""
# Extract potential column references from the query
column_matches = []
for col in df.columns:
if col.lower() in query.lower():
column_matches.append(col)
# Extract values that might be used for filtering
numeric_values = extract_numeric_values(query)
date_values = extract_dates(query)
# Check if we need to compute statistics
compute_stats = any(term in query.lower() for term in ['average', 'mean', 'max', 'min', 'sum', 'count', 'total'])
# Check if this is a filtering query
filtering_query = any(term in query.lower() for term in ['where', 'greater than', 'less than', 'equal to', 'between', 'filter', 'show'])
try:
# If columns were mentioned and we need stats
if column_matches and compute_stats:
for col in column_matches:
if pd.api.types.is_numeric_dtype(df[col].dtype):
response += f"\nStatistics for {col}:\n"
response += f"Mean: {df[col].mean():.2f}\n"
response += f"Min: {df[col].min()}\n"
response += f"Max: {df[col].max()}\n"
response += f"Sum: {df[col].sum()}\n"
response += f"Count: {df[col].count()}\n"
# If this seems like a filtering query
elif filtering_query and column_matches:
filtered_df = df.copy()
filter_applied = False
for col in column_matches:
# Handle numeric filtering
if pd.api.types.is_numeric_dtype(df[col].dtype) and numeric_values:
if "greater than" in query.lower() or "more than" in query.lower() or "above" in query.lower():
filtered_df = filtered_df[filtered_df[col] > numeric_values[0]]
filter_applied = True
elif "less than" in query.lower() or "smaller than" in query.lower() or "below" in query.lower():
filtered_df = filtered_df[filtered_df[col] < numeric_values[0]]
filter_applied = True
elif "equal" in query.lower():
filtered_df = filtered_df[filtered_df[col] == numeric_values[0]]
filter_applied = True
elif "between" in query.lower() and len(numeric_values) >= 2:
filtered_df = filtered_df[(filtered_df[col] >= min(numeric_values)) & (filtered_df[col] <= max(numeric_values))]
filter_applied = True
# Handle date filtering
elif pd.api.types.is_datetime64_dtype(df[col].dtype) and date_values:
try:
date_obj = pd.to_datetime(date_values[0])
if "after" in query.lower() or "later than" in query.lower():
filtered_df = filtered_df[filtered_df[col] > date_obj]
filter_applied = True
elif "before" in query.lower() or "earlier than" in query.lower():
filtered_df = filtered_df[filtered_df[col] < date_obj]
filter_applied = True
elif "on" in query.lower() or "equal" in query.lower():
filtered_df = filtered_df[filtered_df[col].dt.date == date_obj.date()]
filter_applied = True
except:
pass
# Handle text filtering
elif df[col].dtype == 'object':
for word in query.lower().split():
if len(word) > 3 and word not in ["show", "where", "with", "contains", "containing", "that", "have"]:
filtered_df = filtered_df[filtered_df[col].str.lower().str.contains(word, na=False)]
filter_applied = True
if filter_applied:
if len(filtered_df) > 0:
if len(filtered_df) <= 10: # Show full results if 10 or fewer rows
response += f"\nFound {len(filtered_df)} matching rows:\n{filtered_df.to_string()}"
else:
response += f"\nFound {len(filtered_df)} matching rows. Here are the first 5:\n{filtered_df.head().to_string()}"
else:
response += "\nNo data found matching your criteria."
except Exception as e:
response += f"\nError processing query: {str(e)}"
return response
def process_file_and_query(file, query):
global df, df_text_representation
if file is None:
return "Please upload a CSV file first."
if query.strip() == "":
return "Please enter a question about the data."
try:
# Read the CSV file
df = pd.read_csv(file.name)
# Preprocess the dataframe
df, df_text_representation = preprocess_dataframe(df)
# Get LLM interpretation of the query
llm_response = interpret_query(query, df_text_representation)
# Try to get direct data insights
data_insights = get_data_insight(query, df)
# Combine responses
final_response = f"Response: {llm_response}"
if data_insights:
final_response += f"\n\nAdditional Data Analysis:{data_insights}"
return final_response
except Exception as e:
return f"Error processing file or query: {str(e)}"
# Create Gradio interface
with gr.Blocks(title="CSV Question Answering Bot") as demo:
gr.Markdown("# CSV Question Answering Bot")
gr.Markdown("Upload a CSV file and ask questions about it in natural language.")
with gr.Row():
with gr.Column():
file_input = gr.File(label="Upload CSV File")
query_input = gr.Textbox(label="Ask a question about your data", placeholder="Example: What's the average sales value?")
submit_btn = gr.Button("Submit")
with gr.Column():
output = gr.Textbox(label="Answer", lines=10)
submit_btn.click(fn=process_file_and_query, inputs=[file_input, query_input], outputs=output)
gr.Examples(
examples=[
["What columns are in this dataset?"],
["What's the average of the numeric columns?"],
["Show me the first 5 rows"],
["How many rows have values greater than 100?"],
["What's the date range in this dataset?"]
],
inputs=query_input
)
gr.Markdown("""
## How to use this app
1. Upload a CSV file of any kind (numerical data, dates, text, etc.)
2. Type a natural language question about the data
3. Click 'Submit' to get your answer
Examples of questions you can ask:
- "What are the columns in this dataset?"
- "What's the average salary?"
- "Show me rows where age is greater than 30"
- "What's the minimum and maximum price?"
- "How many entries were recorded after January 2023?"
- "Find all products in the Electronics category"
""")
# Launch the app
if __name__ == "__main__":
demo.launch() |