Spaces:
Build error
Build error
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import gradio as gr | |
| import time | |
| import psutil | |
| import tracemalloc | |
| import gc | |
| import pandas as pd | |
| import dask.dataframe as dd | |
| import polars as pl | |
| import duckdb | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| import io | |
| import os | |
| from PIL import Image | |
| import numpy as np | |
| import matplotlib | |
| import wandb | |
| from datasets import load_dataset | |
| # Load dataset once at the start to avoid redundant requests | |
| # dataset = load_dataset("Chendi/NYC_TAXI_FARE_CLEANED") | |
| wandb.login(key=os.getenv("WANDB_API_KEY")) | |
| wandb.init(project="billion-row-analysis", name="benchmarking") | |
| dataset = load_dataset("AnnsKhan/jan_2024_nyc", split="train") | |
| parquet_path = "jan_2024.parquet" | |
| if not os.path.exists(parquet_path): | |
| dataset.to_pandas().to_parquet(parquet_path) # Save to disk | |
| os.environ["MODIN_ENGINE"] = "dask" | |
| # Initialize FastAPI app | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Performance measurement function | |
| def measure_performance(load_function, *args): | |
| gc.collect() | |
| tracemalloc.start() | |
| start_time = time.time() | |
| start_cpu = psutil.cpu_percent(interval=1) | |
| total_memory = psutil.virtual_memory().total # Get total system memory | |
| start_memory = psutil.Process().memory_info().rss / total_memory * 100 # Convert to percentage | |
| data = load_function(*args) | |
| end_memory = psutil.Process().memory_info().rss / total_memory * 100 # Convert to percentage | |
| end_cpu = psutil.cpu_percent(interval=1) | |
| end_time = time.time() | |
| _, peak_memory = tracemalloc.get_traced_memory() | |
| tracemalloc.stop() | |
| peak_memory_percentage = peak_memory / total_memory * 100 # Convert to percentage | |
| return data, end_time - start_time, max(end_cpu - start_cpu, 0), max(end_memory - start_memory, 0), peak_memory_percentage | |
| # # Data loading functions | |
| # def load_data_python_vectorized(): | |
| # df = dataset["train"].to_pandas() | |
| # num_cols = df.select_dtypes(include=['number']).columns | |
| # np_data = {col: df[col].to_numpy() for col in num_cols} | |
| # return np_data | |
| # def load_data_pandas(): | |
| # return dataset["train"].to_pandas() | |
| # def load_data_dask(): | |
| # return dd.from_pandas(dataset["train"].to_pandas(), npartitions=10) | |
| # def load_data_polars(): | |
| # return pl.from_pandas(dataset["train"].to_pandas()) | |
| # def load_data_duckdb(): | |
| # return duckdb.from_df(dataset["train"].to_pandas()) | |
| # Data loading functions | |
| def load_data_python_vectorized(): | |
| df = pd.read_parquet(parquet_path) | |
| # Convert numerical columns to NumPy arrays for vectorized operations | |
| num_cols = df.select_dtypes(include=['number']).columns | |
| np_data = {col: df[col].to_numpy() for col in num_cols} | |
| return np_data | |
| def load_data_pandas(): | |
| return pd.read_parquet(parquet_path) | |
| def load_data_dask(): | |
| return dd.read_parquet(parquet_path) | |
| def load_data_polars(): | |
| return pl.read_parquet(parquet_path) | |
| def load_data_duckdb(): | |
| return duckdb.read_parquet(parquet_path) | |
| # Loaders list | |
| loaders = [ | |
| (load_data_pandas, "Pandas"), | |
| (load_data_dask, "Dask"), | |
| (load_data_polars, "Polars"), | |
| (load_data_duckdb, "DuckDB"), | |
| (load_data_python_vectorized, "Python Vectorized"), | |
| ] | |
| def run_benchmark(): | |
| benchmark_results = [] | |
| error_messages = [] | |
| for loader, lib_name in loaders: | |
| try: | |
| data, load_time, cpu_load, mem_load, peak_mem_load = measure_performance(loader) | |
| # Log metrics to Weights & Biases | |
| wandb.log({ | |
| "Library": lib_name, | |
| "Load Time (s)": load_time, | |
| "CPU Load (%)": cpu_load, | |
| "Memory Load (%)": mem_load, | |
| "Peak Memory (%)": peak_mem_load | |
| }) | |
| benchmark_results.append({ | |
| "Library": lib_name, | |
| "Load Time (s)": load_time, | |
| "CPU Load (%)": cpu_load, | |
| "Memory Load (%)": mem_load, | |
| "Peak Memory (%)": peak_mem_load | |
| }) | |
| except Exception as e: | |
| error_messages.append(f"{lib_name} Error: {str(e)}") | |
| if error_messages: | |
| return '\n'.join(error_messages), None | |
| benchmark_df = pd.DataFrame(benchmark_results) | |
| sns.set(style="whitegrid") | |
| fig, axes = plt.subplots(2, 2, figsize=(14, 10)) | |
| fig.suptitle("Benchmark Results", fontsize=16) | |
| sns.barplot(x="Library", y="Load Time (s)", data=benchmark_df, ax=axes[0, 0]) | |
| sns.barplot(x="Library", y="CPU Load (%)", data=benchmark_df, ax=axes[0, 1]) | |
| sns.barplot(x="Library", y="Memory Load (%)", data=benchmark_df, ax=axes[1, 0]) | |
| sns.barplot(x="Library", y="Peak Memory (%)", data=benchmark_df, ax=axes[1, 1]) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png') | |
| buf.seek(0) | |
| # Convert plot to an image and log it to wandb | |
| image = Image.open(buf) | |
| wandb.log({"Benchmark Results": wandb.Image(image)}) | |
| image_array = np.array(image) | |
| return benchmark_df.to_markdown(), image_array # Return NumPy array | |
| matplotlib.use("Agg") | |
| def explore_dataset(): | |
| try: | |
| df = pd.read_parquet(parquet_path) | |
| # Convert float64 columns to float32 to reduce memory usage | |
| for col in df.select_dtypes(include=['float64']).columns: | |
| df[col] = df[col].astype('float32') | |
| # If dataset is too large, sample 10% | |
| if len(df) > 1_000_000: | |
| df = df.sample(frac=0.5, random_state=42) | |
| # Generate dataset summary | |
| summary = df.describe(include='all').T | |
| summary["missing_values"] = df.isnull().sum() | |
| summary["unique_values"] = df.nunique() | |
| summary_text = summary.to_markdown() | |
| # Log dataset summary as text in Weights & Biases | |
| wandb.log({"Dataset Summary": wandb.Html(summary_text)}) | |
| # Prepare for visualization | |
| fig, axes = plt.subplots(2, 2, figsize=(16, 10)) | |
| fig.suptitle("Dataset Overview", fontsize=16) | |
| # Plot data type distribution | |
| data_types = df.dtypes.value_counts() | |
| sns.barplot(x=data_types.index.astype(str), y=data_types.values, ax=axes[0, 0]) | |
| axes[0, 0].set_title("Column Count by Data Type by AnnsKhan") | |
| axes[0, 0].set_ylabel("Count") | |
| axes[0, 0].set_xlabel("Column Type") | |
| # Plot mean values of numeric columns | |
| num_cols = df.select_dtypes(include=['number']).columns | |
| if len(num_cols) > 0: | |
| mean_values = df[num_cols].mean() | |
| sns.barplot(x=mean_values.index, y=mean_values.values, ax=axes[0, 1]) | |
| axes[0, 1].set_title("Mean Values of Numeric Columns") | |
| axes[0, 1].set_xlabel("Column Name") | |
| axes[0, 1].tick_params(axis='x', rotation=45) | |
| # Log mean values to Weights & Biases | |
| for col, mean_val in mean_values.items(): | |
| wandb.log({f"Mean Values/{col}": mean_val}) | |
| # Plot histogram for a selected numerical column | |
| if len(num_cols) > 0: | |
| selected_col = num_cols[0] # Choose the first numeric column | |
| sns.histplot(df[selected_col], bins=30, kde=True, ax=axes[1, 0]) | |
| axes[1, 0].set_title(f"Distribution of pick-up locations ID") | |
| axes[1, 0].set_xlabel(selected_col) | |
| axes[1, 0].set_ylabel("Frequency") | |
| # Plot correlation heatmap | |
| if len(num_cols) > 1: | |
| corr = df[num_cols].corr() | |
| sns.heatmap(corr, annot=True, cmap="coolwarm", fmt=".2f", linewidths=0.5, ax=axes[1, 1]) | |
| axes[1, 1].set_title("Correlation Heatmap") | |
| # Save figure to buffer | |
| buf = io.BytesIO() | |
| plt.tight_layout() | |
| plt.savefig(buf, format='png', bbox_inches='tight') | |
| plt.close(fig) | |
| buf.seek(0) | |
| # Convert figure to NumPy array | |
| image = Image.open(buf) | |
| image_array = np.array(image) | |
| # Log image to Weights & Biases | |
| wandb.log({"Dataset Overview": wandb.Image(image)}) | |
| return summary_text, image_array | |
| except Exception as e: | |
| return f"Error loading data: {str(e)}", None | |
| # Gradio interface setup | |
| def gradio_interface(): | |
| def run_and_plot(): | |
| results, plot = run_benchmark() | |
| return results, plot | |
| def explore_data(): | |
| summary, plot = explore_dataset() | |
| return summary, plot | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Explore Dataset") | |
| explore_button = gr.Button("Explore Data") | |
| summary_text = gr.Textbox(label="Dataset Summary") | |
| explore_image = gr.Image(label="Feature Distributions") | |
| explore_button.click(explore_data, outputs=[summary_text, explore_image]) | |
| gr.Markdown("## Benchmarking Different Data Loading Libraries") | |
| run_button = gr.Button("Run Benchmark") | |
| result_text = gr.Textbox(label="Benchmark Results") | |
| plot_image = gr.Image(label="Performance Graph") | |
| run_button.click(run_and_plot, outputs=[result_text, plot_image]) | |
| return demo | |
| demo = gradio_interface() | |
| # Run the Gradio app | |
| demo.launch(share=False) # No need for share=True in VS Code, local access is sufficient | |