Spaces:
Runtime error
Runtime error
| import base64 | |
| import datetime | |
| import io | |
| import json | |
| import re | |
| from collections import Counter | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import tiktoken | |
| import yaml | |
| from openai import OpenAI | |
| def extract_json_from_response(text: str) -> str: | |
| """Extract JSON from a response that might contain markdown code blocks.""" | |
| # Try to find JSON within code blocks first | |
| json_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) | |
| if json_match: | |
| return json_match.group(1) | |
| # If no code blocks, try to find raw JSON | |
| json_match = re.search(r"\{.*\}", text, re.DOTALL) | |
| if json_match: | |
| return json_match.group(0) | |
| # If no JSON found, return the original text | |
| return text | |
| def count_tokens(text: str, model: str = "gpt-4") -> int: | |
| """Count tokens in text using tiktoken.""" | |
| try: | |
| encoder = tiktoken.encoding_for_model(model) | |
| return len(encoder.encode(str(text))) | |
| except Exception as e: | |
| print(f"Error counting tokens: {e}") | |
| return 0 | |
| def create_distribution_plot(data, column): | |
| """Create a distribution plot using Plotly and convert to image.""" | |
| try: | |
| # Check if the column contains lists | |
| if isinstance(data[column].iloc[0], list): | |
| print(f"Processing list column: {column}") | |
| value_counts = flatten_list_column(data, column) | |
| fig = go.Figure( | |
| [ | |
| go.Bar( | |
| x=value_counts.index, | |
| y=value_counts.values, | |
| marker=dict( | |
| color=value_counts.values, | |
| colorscale=px.colors.sequential.Plotly3, | |
| ), | |
| ) | |
| ] | |
| ) | |
| else: | |
| if data[column].dtype in ["int64", "float64"]: | |
| # Continuous data - use histogram | |
| fig = go.Figure() | |
| fig.add_trace( | |
| go.Histogram( | |
| x=data[column], | |
| name="Count", | |
| nbinsx=30, | |
| marker=dict( | |
| color="rgba(110, 68, 255, 0.7)", | |
| line=dict(color="rgba(184, 146, 255, 1)", width=1), | |
| ), | |
| ) | |
| ) | |
| else: | |
| # Categorical data | |
| value_counts = data[column].value_counts() | |
| fig = go.Figure( | |
| [ | |
| go.Bar( | |
| x=value_counts.index, | |
| y=value_counts.values, | |
| marker=dict( | |
| color=value_counts.values, | |
| colorscale=px.colors.sequential.Plotly3, | |
| ), | |
| ) | |
| ] | |
| ) | |
| # Common layout updates | |
| fig.update_layout( | |
| title=dict(text=f"Distribution of {column}", x=0.5, y=0.95), | |
| xaxis_title=column, | |
| yaxis_title="Count", | |
| template="plotly_white", | |
| margin=dict(t=50, l=50, r=30, b=50), | |
| width=600, | |
| height=400, | |
| showlegend=False, | |
| plot_bgcolor="rgba(0,0,0,0)", | |
| paper_bgcolor="rgba(0,0,0,0)", | |
| ) | |
| # Rotate x-axis labels if needed | |
| if isinstance(data[column].iloc[0], list) or data[column].dtype not in [ | |
| "int64", | |
| "float64", | |
| ]: | |
| fig.update_layout(xaxis_tickangle=-45) | |
| # Update grid style | |
| fig.update_yaxes(gridcolor="rgba(128,128,128,0.1)", gridwidth=1) | |
| fig.update_xaxes(gridcolor="rgba(128,128,128,0.1)", gridwidth=1) | |
| # Convert to PNG with moderate resolution | |
| img_bytes = fig.to_image(format="png", scale=1.5) | |
| # Encode to base64 | |
| img_base64 = base64.b64encode(img_bytes).decode() | |
| return img_base64 | |
| except Exception as e: | |
| print(f"Error creating distribution plot for {column}: {str(e)}") | |
| raise e | |
| def create_wordcloud(data, column): | |
| """Create a word cloud visualization.""" | |
| import matplotlib.pyplot as plt | |
| from wordcloud import WordCloud | |
| try: | |
| # Handle list columns | |
| if isinstance(data[column].iloc[0], list): | |
| text = " ".join( | |
| [ | |
| " ".join(map(str, sublist)) | |
| for sublist in data[column] | |
| if isinstance(sublist, list) | |
| ] | |
| ) | |
| else: | |
| # Handle regular columns | |
| text = " ".join(data[column].astype(str)) | |
| wordcloud = WordCloud( | |
| width=600, | |
| height=300, | |
| background_color="white", | |
| colormap="plasma", | |
| max_words=100, | |
| ).generate(text) | |
| # Create matplotlib figure | |
| plt.figure(figsize=(8, 4)) | |
| plt.imshow(wordcloud, interpolation="bilinear") | |
| plt.axis("off") | |
| plt.title(f"Word Cloud for {column}") | |
| # Save to bytes | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight", dpi=150) | |
| plt.close() | |
| buf.seek(0) | |
| # Convert to base64 | |
| img_base64 = base64.b64encode(buf.getvalue()).decode() | |
| return img_base64 | |
| except Exception as e: | |
| print(f"Error creating word cloud for {column}: {str(e)}") | |
| raise e | |
| def analyze_dataset_with_openai(client: OpenAI, data) -> dict: | |
| """Analyze dataset using OpenAI API with improved type inference and efficient sampling.""" | |
| # Convert dictionary to DataFrame if needed | |
| if isinstance(data, dict): | |
| df = pd.DataFrame(data) | |
| else: | |
| df = data | |
| # Take a very small sample for efficiency | |
| sample_size = min(3, len(df)) | |
| if len(df) > 3: | |
| sample_indices = df.index[ | |
| :sample_size | |
| ] # Take first 3 rows instead of random sampling | |
| sample_df = df.loc[sample_indices] | |
| else: | |
| sample_df = df | |
| dataset_sample = sample_df.to_dict("records") | |
| single_record = dataset_sample[0] | |
| # Create type hints dictionary - only process the sample | |
| type_hints = {} | |
| for column in sample_df.columns: | |
| # Get the pandas dtype | |
| dtype = sample_df[column].dtype | |
| # Efficiently identify types without complex operations | |
| if pd.api.types.is_integer_dtype(dtype): | |
| type_hints[column] = "integer" | |
| elif pd.api.types.is_float_dtype(dtype): | |
| type_hints[column] = "number" | |
| elif pd.api.types.is_bool_dtype(dtype): | |
| type_hints[column] = "boolean" | |
| elif pd.api.types.is_datetime64_any_dtype(dtype): | |
| type_hints[column] = "datetime" | |
| elif pd.api.types.is_categorical_dtype(dtype): | |
| type_hints[column] = "categorical" | |
| elif pd.api.types.is_string_dtype(dtype): | |
| # Simple check for list-like values | |
| first_val = sample_df[column].iloc[0] | |
| if isinstance(first_val, list): | |
| type_hints[column] = "array" | |
| else: | |
| type_hints[column] = "string" | |
| else: | |
| type_hints[column] = "unknown" | |
| prompt = f"""Analyze this dataset sample and provide the following in a JSON response: | |
| 1. A concise description that includes: | |
| - A one-sentence overview of what the dataset contains | |
| - A bullet-pointed list of key features and statistics | |
| - A brief statement about potential ML/AI applications | |
| 2. A schema showing each field's type and description. Here is the actual DataFrame type information: | |
| {json.dumps(type_hints, indent=2)} | |
| And here's a single record for reference: | |
| {json.dumps(single_record, indent=2)} | |
| 3. A formatted example record | |
| Format your response as JSON with these exact keys: | |
| {{ | |
| "description": {{ | |
| "overview": "One clear sentence describing the dataset...", | |
| "key_features": [ | |
| "Feature or statistic 1", | |
| "Feature or statistic 2" | |
| ], | |
| "ml_applications": "Brief statement about ML/AI use cases..." | |
| }}, | |
| "schema": {{ | |
| "field_name": {{ | |
| "type": "use the type from the provided type_hints", | |
| "description": "Description of what this field contains" | |
| }} | |
| }}, | |
| "example": {{"key": "value"}} | |
| }} | |
| For context, here are more sample records: | |
| {json.dumps(dataset_sample, indent=2)} | |
| """ | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=2000, | |
| ) | |
| # Get the response content | |
| response_text = response.choices[0].message.content | |
| # Extract JSON from the response | |
| json_str = extract_json_from_response(response_text) | |
| # Parse the JSON | |
| result = json.loads(json_str) | |
| return result | |
| except Exception as e: | |
| print(f"OpenAI API error: {str(e)}") | |
| return { | |
| "description": { | |
| "overview": "Error analyzing dataset", | |
| "key_features": ["Error: Failed to analyze dataset"], | |
| "ml_applications": "Analysis unavailable", | |
| }, | |
| "schema": {}, | |
| "example": {}, | |
| } | |
| def analyze_dataset_statistics(df): | |
| """Generate simplified dataset statistics with token counting.""" | |
| stats = { | |
| "basic_stats": { | |
| "total_records": len(df), | |
| "total_features": len(df.columns), | |
| "memory_usage": f"{df.memory_usage(deep=True).sum() / (1024*1024):.2f} MB", | |
| }, | |
| "token_stats": {"total": 0, "by_column": {}}, | |
| } | |
| # Count tokens for each column | |
| for column in df.columns: | |
| try: | |
| if df[column].dtype == "object" or isinstance(df[column].iloc[0], list): | |
| # For list columns, join items into strings | |
| if isinstance(df[column].iloc[0], list): | |
| token_counts = df[column].apply( | |
| lambda x: count_tokens(" ".join(str(item) for item in x)) | |
| ) | |
| else: | |
| token_counts = df[column].apply(lambda x: count_tokens(str(x))) | |
| total_tokens = int(token_counts.sum()) | |
| stats["token_stats"]["total"] += total_tokens | |
| stats["token_stats"]["by_column"][column] = total_tokens | |
| except Exception as e: | |
| print(f"Error processing column {column}: {str(e)}") | |
| continue | |
| return stats | |
| def format_dataset_stats(stats): | |
| """Format simplified dataset statistics as markdown.""" | |
| md = """## Dataset Overview | |
| ### Basic Statistics | |
| * Total Records: {total_records:,} | |
| * Total Features: {total_features} | |
| * Memory Usage: {memory_usage} | |
| """.format( | |
| **stats["basic_stats"] | |
| ) | |
| # Token Statistics | |
| if stats["token_stats"]["total"] > 0: | |
| md += "\n### Token Info\n" | |
| md += f"* Total Tokens: {stats['token_stats']['total']:,}\n" | |
| if stats["token_stats"]["by_column"]: | |
| md += "\nTokens by Column:\n" | |
| for col, count in stats["token_stats"]["by_column"].items(): | |
| md += f"* {col}: {count:,}\n" | |
| return md | |
| def generate_dataset_card( | |
| dataset_info: dict, | |
| distribution_plots: dict, | |
| wordcloud_plots: dict, | |
| openai_analysis: dict, | |
| df: pd.DataFrame, | |
| ) -> str: | |
| """Generate a beautiful and clean dataset card.""" | |
| # Basic dataset metadata | |
| yaml_content = { | |
| "language": ["en"], | |
| "license": "apache-2.0", | |
| "multilinguality": "monolingual", | |
| "size_categories": [get_size_category(len(df))], | |
| "task_categories": ["other"], | |
| } | |
| yaml_string = yaml.dump(yaml_content, sort_keys=False) | |
| # Generate dataset statistics | |
| stats = analyze_dataset_statistics(df) | |
| description = openai_analysis["description"] | |
| # Build the markdown content with proper spacing | |
| readme_content = f"""--- | |
| {yaml_string}--- | |
| # {dataset_info['dataset_name']} | |
| {description['overview']} | |
| ### Key Features | |
| {chr(10).join(f'* {feature}' for feature in description['key_features'])} | |
| ### Potential Applications | |
| {description['ml_applications']} | |
| ## Dataset Statistics | |
| * Total Records: {stats['basic_stats']['total_records']:,} | |
| * Total Features: {stats['basic_stats']['total_features']} | |
| * Memory Usage: {stats['basic_stats']['memory_usage']} | |
| ## Dataset Schema | |
| | Field | Type | Description | | |
| | --- | --- | --- | | |
| {chr(10).join(f"| {field} | {info['type']} | {info['description']} |" for field, info in openai_analysis['schema'].items())} | |
| ## Example Record | |
| ```json | |
| {json.dumps(openai_analysis['example'], indent=2)} | |
| ``` | |
| ## Data Distribution Analysis | |
| The following visualizations show the distribution patterns and characteristics of key features in the dataset: | |
| """ | |
| # Add individual distribution plots with clean spacing | |
| for col, img_str in distribution_plots.items(): | |
| readme_content += f"""### Distribution of {col} | |
| <img src="data:image/png;base64,{img_str}" alt="Distribution of {col}" style="max-width: 800px;"> | |
| """ | |
| # Add word clouds with clean spacing | |
| if wordcloud_plots: | |
| readme_content += "## Feature Word Clouds\n\n" | |
| for col, img_str in wordcloud_plots.items(): | |
| readme_content += f"""### Word Cloud for {col} | |
| <img src="data:image/png;base64,{img_str}" alt="Word Cloud for {col}" style="max-width: 800px;"> | |
| """ | |
| # Add token statistics if available | |
| if stats.get("token_stats") and stats["token_stats"]["total"] > 0: | |
| readme_content += """## Token Statistics | |
| """ | |
| readme_content += f"* Total Tokens: {stats['token_stats']['total']:,}\n" | |
| if stats["token_stats"].get("by_column"): | |
| readme_content += "\n**Tokens by Column:**\n" | |
| for col, count in stats["token_stats"]["by_column"].items(): | |
| readme_content += f"* {col}: {count:,}\n" | |
| # Add citation section | |
| clean_name = dataset_info["dataset_name"].replace("/", "_") | |
| readme_content += f""" | |
| ## Citation | |
| ```bibtex | |
| @dataset{{{clean_name}, | |
| title = {{{dataset_info['dataset_name']}}}, | |
| year = {{{datetime.datetime.now().year}}}, | |
| publisher = {{Hugging Face}}, | |
| url = {{https://huggingface.co/datasets/{dataset_info['dataset_name']}}} | |
| }} | |
| ``` | |
| ### Usage Guidelines | |
| This dataset is released under the Apache 2.0 License. When using this dataset: | |
| * π Cite the dataset using the BibTeX entry above | |
| * π€ Consider contributing improvements or reporting issues | |
| * π‘ Share derivative works with the community when possible | |
| """ | |
| return readme_content | |
| def get_size_category(record_count: int) -> str: | |
| """Determine the size category based on record count.""" | |
| if record_count < 1000: | |
| return "n<1K" | |
| elif record_count < 10000: | |
| return "1K<n<10K" | |
| elif record_count < 100000: | |
| return "10K<n<100K" | |
| elif record_count < 1000000: | |
| return "100K<n<1M" | |
| else: | |
| return "n>1M" | |
| def format_overview_section(analysis: dict, stats: dict) -> str: | |
| """Create a comprehensive overview section.""" | |
| description = analysis["description"] | |
| overview = f""" | |
| {description['overview']} | |
| ### Key Features and Characteristics | |
| {chr(10).join(f'* {feature}' for feature in description['key_features'])} | |
| ### Potential Applications | |
| {description['ml_applications']} | |
| ### Dataset Size | |
| * Total Records: {stats['basic_stats']['total_records']:,} | |
| * Total Features: {stats['basic_stats']['total_features']} | |
| * Memory Usage: {stats['basic_stats']['memory_usage']} | |
| """ | |
| return overview.strip() | |
| def format_schema_section(schema: dict, df: pd.DataFrame) -> str: | |
| """Generate an enhanced schema section with statistics.""" | |
| # Table header | |
| table = "| Field | Type | Description | Non-Null Count | Unique Values |\n" | |
| table += "| --- | --- | --- | --- | --- |\n" | |
| # Generate rows with additional statistics | |
| for field, info in schema.items(): | |
| try: | |
| non_null = df[field].count() | |
| unique = df[field].nunique() | |
| row = f"| {field} | {info['type']} | {info['description']} | {non_null:,} | {unique:,} |" | |
| table += row + "\n" | |
| except Exception as e: | |
| print(f"Error processing field {field}: {e}") | |
| continue | |
| return table | |
| def format_visualization_section( | |
| distribution_plots: dict, wordcloud_plots: dict | |
| ) -> str: | |
| """Format the visualization section with improved layout.""" | |
| content = ( | |
| """The following visualizations show key characteristics of the dataset:\n\n""" | |
| ) | |
| # Add distribution plots | |
| if distribution_plots: | |
| content += "### Distribution Plots\n\n" | |
| content += '<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(500px, 1fr)); gap: 20px;">\n' | |
| for col, img_str in distribution_plots.items(): | |
| content += f"""<div> | |
| <h4>Distribution of {col}</h4> | |
| <img src="data:image/png;base64,{img_str}" style="width: 100%; height: auto;"> | |
| </div>\n""" | |
| content += "</div>\n\n" | |
| # Add word clouds | |
| if wordcloud_plots: | |
| content += "### Word Clouds\n\n" | |
| content += '<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(400px, 1fr)); gap: 20px;">\n' | |
| for col, img_str in wordcloud_plots.items(): | |
| content += f"""<div> | |
| <h4>Word Cloud for {col}</h4> | |
| <img src="data:image/png;base64,{img_str}" style="width: 100%; height: auto;"> | |
| </div>\n""" | |
| content += "</div>\n" | |
| return content | |
| def generate_limitations_section(df: pd.DataFrame, analysis: dict) -> str: | |
| """Generate a section about dataset limitations and potential biases.""" | |
| limitations = [ | |
| "This dataset may not be representative of all possible scenarios or use cases.", | |
| f"The dataset contains {len(df):,} records, which may limit its applicability to certain tasks.", | |
| "There may be inherent biases in the data collection or annotation process.", | |
| ] | |
| # Add warnings about missing values if present | |
| missing_values = df.isnull().sum() | |
| if missing_values.any(): | |
| limitations.append( | |
| f"Some fields contain missing values: {', '.join(missing_values[missing_values > 0].index)}" | |
| ) | |
| return f"""The following limitations and potential biases should be considered when using this dataset: | |
| {chr(10).join(f'* {limitation}' for limitation in limitations)} | |
| Please consider these limitations when using the dataset and validate results accordingly.""" | |
| def generate_usage_section(dataset_info: dict, analysis: dict) -> str: | |
| """Generate comprehensive usage guidelines.""" | |
| return f"""This dataset is released under the Apache 2.0 License. When using this dataset: | |
| * π Cite the dataset using the BibTeX entry provided below | |
| * π€ Consider contributing improvements or reporting issues | |
| * π‘ Share derivative works with the community when possible | |
| * π Validate the dataset's suitability for your specific use case | |
| * β οΈ Be aware of the limitations and biases discussed above | |
| * π Consider the dataset size and computational requirements for your application | |
| For questions or additional information, please visit the dataset repository on Hugging Face. | |
| """ | |
| def get_task_categories(df: pd.DataFrame, analysis: dict) -> list: | |
| """Infer potential task categories based on the data and analysis.""" | |
| categories = ["other"] # Default category | |
| # Add more sophisticated task inference logic based on column names and content | |
| text_columns = df.select_dtypes(include=["object"]).columns | |
| numeric_columns = df.select_dtypes(include=["int64", "float64"]).columns | |
| if len(text_columns) > 0: | |
| categories.append("text-classification") | |
| if len(numeric_columns) > 0: | |
| categories.append("regression") | |
| return list(set(categories)) # Remove duplicates | |
| def clean_dataset_name(name: str) -> str: | |
| """Clean dataset name for citation.""" | |
| return name.replace("/", "_").replace("-", "_").lower() | |
| def generate_schema_table(schema: dict) -> str: | |
| """Generate a markdown table for the schema, handling nested structures.""" | |
| # Table header | |
| table = "| Field | Type | Description |\n| --- | --- | --- |\n" | |
| # Generate rows recursively | |
| rows = [] | |
| for field, info in schema.items(): | |
| rows.extend(format_schema_item(field, info)) | |
| # Join all rows | |
| table += "\n".join(rows) | |
| return table | |
| def format_stats_section(stats: dict) -> str: | |
| """Format the statistics section of the dataset card.""" | |
| content = """### Basic Statistics | |
| """ | |
| # Add basic stats | |
| for key, value in stats["basic_stats"].items(): | |
| # Convert key from snake_case to Title Case | |
| formatted_key = key.replace("_", " ").title() | |
| content += f"* {formatted_key}: {value}\n" | |
| # Add token statistics if available | |
| if stats.get("token_stats") and stats["token_stats"]["total"] > 0: | |
| content += "\n### Token Statistics\n" | |
| content += f"* Total Tokens: {stats['token_stats']['total']:,}\n" | |
| if stats["token_stats"].get("by_column"): | |
| content += "\n**Tokens by Column:**\n" | |
| for col, count in stats["token_stats"]["by_column"].items(): | |
| content += f"* {col}: {count:,}\n" | |
| return content | |
| def format_schema_item(field_name: str, field_info: dict, prefix: str = "") -> list: | |
| """Recursively format schema items for nested structures.""" | |
| rows = [] | |
| # Handle nested objects | |
| if isinstance(field_info, dict): | |
| if "type" in field_info and "description" in field_info: | |
| # This is a leaf node with type and description | |
| rows.append( | |
| f"| {prefix}{field_name} | {field_info['type']} | {field_info['description']} |" | |
| ) | |
| else: | |
| # This is a nested object, recurse through its properties | |
| for subfield, subinfo in field_info.items(): | |
| if prefix: | |
| new_prefix = f"{prefix}{field_name}." | |
| else: | |
| new_prefix = f"{field_name}." | |
| rows.extend(format_schema_item(subfield, subinfo, new_prefix)) | |
| return rows | |
| def flatten_list_column(data, column): | |
| """Flatten a column containing lists into individual values with counts.""" | |
| # Flatten the lists into individual items | |
| flattened = [ | |
| item | |
| for sublist in data[column] | |
| if isinstance(sublist, list) | |
| for item in sublist | |
| ] | |
| # Count occurrences | |
| value_counts = pd.Series(Counter(flattened)) | |
| return value_counts | |