import logging from typing import Optional, Any, Dict # Added Dict from zoneinfo import ZoneInfo # datasets import might not be strictly needed by LeaderboardViewer itself anymore, # but _get_dataframe might still use types from it if EvalResult refers to them. # For now, let's keep it if your EvalResult or SuiteConfig models have dependencies. # If not, it can be removed from here. import datasets # Potentially removable from this file import matplotlib.pyplot as plt import plotly.express as px import plotly.graph_objects as go import numpy as np import pandas as pd import seaborn as sns import json # For loading the local JSON file import os # For checking file existence from agenteval import compute_summary_statistics from agenteval.config import SuiteConfig from agenteval.models import EvalResult logger = logging.getLogger(__name__) import logging from typing import Optional, Any, Dict, List # Added List from zoneinfo import ZoneInfo # Assuming this might be used by SuiteConfig/EvalResult or _get_dataframe import json import os # Assuming these are correctly imported from your project from agenteval.config import SuiteConfig from agenteval.models import EvalResult # from agenteval import compute_summary_statistics # Used by _get_dataframe class DataTransformer: """ Load and visualize leaderboard from a single, local JSON result file. """ _INFORMAL_TO_FORMAL_NAME_MAP = { "lit": "Literature Understanding", "data": "Data Analysis", "code": "Code Execution", "discovery": "Discovery", "arxivdigestables_validation": "Arxivdigestables Validation", "sqa_dev": "Sqa Dev", "litqa2_validation": "Litqa2 Validation", "paper_finder_validation": "Paper Finder Validation", "discoverybench_validation": "Discoverybench Validation", "core_bench_validation": "Core Bench Validation", "ds1000_validation": "DS1000 Validation", "e2e_discovery_validation": "E2E Discovery Validation", "super_validation": "Super Validation", # Add any other raw names that can appear in task.name or task.tags } def __init__( self, json_file_path: str, # Mandatory: path to the local JSON file split: str, # Still needed for context within the JSON's suite_config is_internal: bool = False ): self._json_file_path = json_file_path self._split = split self._internal = is_internal self._loaded_json_data: Optional[Dict[str, Any]] = None self._cfg: Optional[SuiteConfig] = None logger.info(f"Initializing LeaderboardViewer with local JSON file: {self._json_file_path}") # --- Load and Validate JSON data --- if not os.path.exists(self._json_file_path): raise FileNotFoundError(f"JSON file not found at path: {self._json_file_path}") try: with open(self._json_file_path, 'r', encoding='utf-8') as f: self._loaded_json_data = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Failed to parse JSON from local file {self._json_file_path}: {e}") except Exception as e: raise ValueError(f"Error reading local file {self._json_file_path}: {e}") if not self._loaded_json_data: raise ValueError(f"No data loaded from JSON file {self._json_file_path}.") try: eval_result = EvalResult.model_validate(self._loaded_json_data) except Exception as e: raise ValueError(f"Failed to validate JSON data from file '{self._json_file_path}' against EvalResult model: {e}") self._cfg = eval_result.suite_config if not isinstance(self._cfg, SuiteConfig): raise TypeError(f"self._cfg is not a SuiteConfig object after loading from '{self._json_file_path}', got {type(self._cfg)}.") # --- Populate Tag Map (Corrected Placement and Helper Function Access) --- self.tag_map: dict[str, list[str]] = {} # Access tasks from the loaded config tasks_for_split: List[Any] = self._cfg.get_tasks(self._split) # Assuming get_tasks returns a list of task-like objects for task in tasks_for_split: # Ensure task object has 'name' and 'tags' attributes if not hasattr(task, 'name') or not hasattr(task, 'tags'): logger.warning(f"Task object {task} is missing 'name' or 'tags' attribute. Skipping.") continue formal_task_display_name = self._get_formal_display_name_static(task.name) # Use the helper method if not (task.tags or []): continue for raw_tag_name in task.tags: formal_tag_display_name_key = self._get_formal_display_name_static(raw_tag_name) self.tag_map.setdefault(formal_tag_display_name_key, []).append(formal_task_display_name) for key in self.tag_map: self.tag_map[key] = sorted(list(set(self.tag_map[key]))) # --- Helper function defined as a static method or regular method --- # Option 1: Static method (doesn't need 'self', uses the class attribute) @staticmethod def _get_formal_display_name_static(raw_name: str) -> str: """ Helper function to get the formal display name for a raw tag or task name. Uses the class's map and provides a fallback. """ return DataTransformer._INFORMAL_TO_FORMAL_NAME_MAP.get(raw_name, raw_name.replace("_", " ").title()) def _load(self) -> tuple[pd.DataFrame, dict[str, list[str]]]: """ Prepares the DataFrame from the loaded JSON data. The JSON data is already loaded and validated in __init__. """ if self._loaded_json_data is None or self._cfg is None: # This should not happen if __init__ completed successfully raise RuntimeError("LeaderboardViewer2 not properly initialized. JSON data or SuiteConfig is missing.") # The _get_dataframe function expects a list of records. # Since we have a single JSON file representing one result, wrap it in a list. records_list: list[dict] = [self._loaded_json_data] overview_df = _get_dataframe( records_list=records_list, split=self._split, is_internal=self._internal, suite_config=self._cfg, # Pass the SuiteConfig loaded in __init__ ) return overview_df, self.tag_map # --- view method remains the same as your last version --- def view( self, tag: Optional[str] = None, with_plots: bool = False, use_plotly: bool = False, ) -> tuple[pd.DataFrame, dict[str, Any]]: data, tag_map = self._load() # tag_map is also returned by _load now print(f"AHAHASHJDBFGASJHDBJAHSDB,AHDB {tag_map}") print(f"THIS IS THE DATA DATA DTAA {data.columns}") if data.empty or (len(data) == 1 and data.iloc[0].get("Agent") == "No data"): logger.warning("No data available to view. Returning empty DataFrame and plots.") return data, {} base_cols = ["Agent", "Submitter", "Date", "Logs"] existing_cols = [col for col in base_cols if col in data.columns] primary_score_col: str group_metric_names: list[str] if tag is None: primary = "Overall" group = list(tag_map.keys()) else: primary = tag group = tag_map.get(tag, []) if f"{primary} Score" in data.columns: data = data.sort_values(f"{primary} Score", ascending=False) else: logger.warning(f"Primary metric '{primary}' for sorting not found. Data will not be sorted by it.") metrics_to_display = [] if f"{primary} Cost" in data.columns: metrics_to_display.append(f"{primary} Cost") if f"{primary} Score" in data.columns: metrics_to_display.append(f"{primary} Score") for g_item in group: if g_item in data.columns: metrics_to_display.append(g_item) if f"{g_item} Cost" in data.columns: metrics_to_display.append(f"{g_item} Cost") if f"{g_item} Score" in data.columns: metrics_to_display.append(f"{g_item} Score") final_cols_to_display = existing_cols + [m for m in metrics_to_display if m in data.columns] final_cols_to_display = sorted(list(set(final_cols_to_display)), key=final_cols_to_display.index) df_view = data.loc[:, final_cols_to_display].reset_index(drop=True) plots: dict[str, Any] = {} if with_plots: plot_metric_names = [primary] + [g_item for g_item in group if g_item in data.columns] for metric_name in plot_metric_names: score_col = f"{metric_name} Score" cost_col = f"{metric_name} Cost" if score_col in df_view.columns and cost_col in df_view.columns: if use_plotly: fig = _plot_scatter_plotly(df_view, x=cost_col, y=score_col, agent_col="Agent") plots[f"scatter_{metric_name}"] = fig else: logger.warning( f"Skipping plot for '{metric_name}': score column '{score_col}' or cost column '{cost_col}' not found." ) return df_view, plots def _safe_round(value, digits=2): return round(value, digits) if isinstance(value, (float, int)) and pd.notna(value) else value def _get_dataframe( records_list: list[dict], split: str, is_internal: bool, suite_config: SuiteConfig, timezone: str = "US/Pacific", ) -> pd.DataFrame: # This function remains the same as in the previous version you provided. # It takes a list of records (which will be a list containing one item # from the loaded JSON file) and processes it. if not records_list: logger.warning(f"No records provided to _get_dataframe for split '{split}'. Returning empty DataFrame with placeholder.") expected_pretty_cols = ["Agent Name", "Submitter", "Date", "Overall Score", "Logs"] empty_df = pd.DataFrame({p_col: ["No data"] for p_col in expected_pretty_cols}) return empty_df cfg = suite_config rows = [] for itm_idx, itm in enumerate(records_list): if not isinstance(itm, dict): logger.warning(f"Item {itm_idx} in records_list is not a dict, skipping.") continue try: ev = EvalResult.model_validate(itm) except Exception as e: logger.error(f"Failed to validate item {itm_idx} with EvalResult: {itm}. Error: {e}") continue sub = ev.submission date_str = None if sub.submit_time is not None: submit_dt = sub.submit_time if not isinstance(submit_dt, pd.Timestamp): if submit_dt.tzinfo is None: logger.debug(f"Submission time for {sub.agent_name} is timezone-naive, assuming UTC.") submit_dt = submit_dt.replace(tzinfo=ZoneInfo("UTC")) date_str = pd.Timestamp(submit_dt).tz_convert(ZoneInfo(timezone)).strftime("%Y-%m-%d") else: date_str = None if not ev.results: logger.warning( f"Skipping submission {sub.agent_name} ({sub.username or 'N/A'}) " f"({sub.submit_time or 'N/A'}) due to no results." ) continue stats = compute_summary_statistics( suite_config=cfg, split=split, results=ev.results ) flat = {} print(f"STATS STATS ASTATAS SD T S T A A {stats}") for key, s_obj in stats.items(): parts = key.split("/") if parts[0] == "overall": flat["overall/score"] = _safe_round(getattr(s_obj, 'score', np.nan)) flat["overall/cost"] = _safe_round(getattr(s_obj, 'cost', np.nan)) elif parts[0] == "tag" and len(parts) > 1: tag_name = parts[1] flat[f"tag/{tag_name}/score"] = _safe_round(getattr(s_obj, 'score', np.nan)) flat[f"tag/{tag_name}/cost"] = _safe_round(getattr(s_obj, 'cost', np.nan)) elif parts[0] == "task" and len(parts) > 1: task_name = parts[1] score = getattr(s_obj, 'score', np.nan) cost = getattr(s_obj, 'cost', np.nan) score_stderr = getattr(s_obj, 'score_stderr', np.nan) cost_stderr = getattr(s_obj, 'cost_stderr', np.nan) flat[f"task/{task_name}/score"] = _safe_round(score) flat[f"task/{task_name}/score_ci"] = _safe_round(score_stderr * 1.96 if pd.notna(score_stderr) else np.nan) flat[f"task/{task_name}/cost"] = _safe_round(cost) flat[f"task/{task_name}/cost_ci"] = _safe_round(cost_stderr * 1.96 if pd.notna(cost_stderr) else np.nan) else: logger.debug(f"Uncommon key structure from compute_summary_statistics: '{key}'. Attempting generic add.") if hasattr(s_obj, 'score'): flat[f"{key}/score"] = _safe_round(s_obj.score) if hasattr(s_obj, 'cost'): flat[f"{key}/cost"] = _safe_round(s_obj.cost) current_logs_url = None if is_internal and sub.logs_url: current_logs_url = str(sub.logs_url) elif not is_internal and sub.logs_url_public: current_logs_url = str(sub.logs_url_public) rows.append( { "agent_name": sub.agent_name or "N/A", "username": sub.username or "N/A", "submit_time": date_str, **flat, "logs_url": current_logs_url, } ) if not rows: logger.warning(f"No valid rows generated from records_list for split '{split}'. Returning empty DataFrame with placeholder.") expected_pretty_cols = ["Agent", "Submitter", "Date", "Overall Score", "Overall Cost", "Logs"] empty_df = pd.DataFrame({p_col: ["No data"] for p_col in expected_pretty_cols}) return empty_df df = pd.DataFrame(rows) pretty_cols = {c: _pretty_column_name(c) for c in df.columns if c in df.columns} overview = df.rename(columns=pretty_cols) return overview def _pretty_column_name(col: str) -> str: """Map raw column name to display name.""" # --- Step 1: Fixed, direct mappings --- fixed_mappings = { "submit_time": "Date", "agent_name": "Agent", "username": "Submitter", "logs_url": "Logs", "overall/score": "Overall Score", "overall/cost": "Overall Cost", } if col in fixed_mappings: return fixed_mappings[col] # --- Step 2: Define your mapping for informal names to descriptive names --- informal_map = DataTransformer._INFORMAL_TO_FORMAL_NAME_MAP # --- Step 3: Dynamic mappings for task or tag columns using the informal_to_formal_name_map --- parts = col.split("/") if len(parts) == 3: item_type, informal_name, metric_suffix = parts # formal_name = informal_map.get(informal_name) if formal_name is None: formal_name = informal_name.replace("_", " ").title() print(f"[DEBUG _pretty_column_name] Informal name '{informal_name}' not in map, using fallback: '{formal_name}'") if metric_suffix == "score": return f"{formal_name} Score" if metric_suffix == "cost": return f"{formal_name} Cost" if metric_suffix == "score_ci": return f"{formal_name} Score 95% CI" if metric_suffix == "cost_ci": return f"{formal_name} Cost 95% CI" # --- Step 4: Fallback for columns that don't match the "type/name/metric" pattern --- if "/" not in col: return col.replace("_", " ").title() else: return parts[-1].replace("_", " ").title() DEFAULT_Y_COLUMN = "Overall Score" DUMMY_X_VALUE_FOR_MISSING_COSTS = 0 # Value to use if x-axis data (costs) is missing def _plot_scatter_plotly( data: pd.DataFrame, x: Optional[str], y: str, agent_col: str = "Agent" ) -> go.Figure: x_col_to_use = x y_col_to_use = y # 1. Check if y-column exists if y_col_to_use not in data.columns: logger.error( f"y-axis column '{y_col_to_use}' MUST exist in DataFrame. " f"Cannot generate plot. Available columns: {data.columns.tolist()}" ) return go.Figure() # 2. Check if agent_col exists if agent_col not in data.columns: logger.warning( f"Agent column '{agent_col}' not found in DataFrame. " f"Available columns: {data.columns.tolist()}. Returning empty figure." ) return go.Figure() # 3. Prepare data (make a copy, handle numeric conversion for y) data_plot = data.copy() try: data_plot[y_col_to_use] = pd.to_numeric(data_plot[y_col_to_use], errors='coerce') except Exception as e: logger.error(f"Error converting y-column '{y_col_to_use}' to numeric: {e}. Returning empty figure.") return go.Figure() # 4. Handle x-column (costs) x_axis_label = x_col_to_use if x_col_to_use else "Cost (Data N/A)" # Label for the x-axis x_data_is_valid = False if x_col_to_use and x_col_to_use in data_plot.columns: try: data_plot[x_col_to_use] = pd.to_numeric(data_plot[x_col_to_use], errors='coerce') # Check if there's any non-NaN data after coercion for x if data_plot[x_col_to_use].notna().any(): x_data_is_valid = True else: logger.info(f"x-axis column '{x_col_to_use}' exists but contains all NaN/None values after numeric conversion.") except Exception as e: logger.warning(f"Error converting x-column '{x_col_to_use}' to numeric: {e}. Will use dummy x-values.") # x_data_is_valid remains False else: if x_col_to_use: # Name was provided but column doesn't exist logger.warning(f"x-axis column '{x_col_to_use}' not found in DataFrame.") else: # x (column name) was None logger.info("x-axis column name was not provided (is None).") if not x_data_is_valid: logger.info(f"Using dummy x-value '{DUMMY_X_VALUE_FOR_MISSING_COSTS}' for all data points as x-data is missing or invalid.") # Create a new column with the dummy x-value for all rows # Use a unique name for this dummy column to avoid potential clashes dummy_x_col_name = "__dummy_x_for_plotting__" data_plot[dummy_x_col_name] = DUMMY_X_VALUE_FOR_MISSING_COSTS x_col_to_use = dummy_x_col_name # Update x_col_to_use to point to our dummy data x_axis_label = x if x else "Cost (Data N/A)" # Use original x name for label if provided # or a generic label if x was None. # Could also be f"Cost (Fixed at {DUMMY_X_VALUE_FOR_MISSING_COSTS})" # 5. Drop rows where y is NaN (x is now guaranteed to have values, either real or dummy) data_plot.dropna(subset=[y_col_to_use], inplace=True) fig = go.Figure() if data_plot.empty: logger.warning(f"No valid data to plot for y='{y_col_to_use}' (and x='{x_col_to_use}') after cleaning NaNs from y.") # Still return a figure object, but it will be empty. Update layout for clarity. fig.update_layout( title=f"{y_col_to_use} vs. {x_axis_label} (No Data)", xaxis=dict(title=x_axis_label, range=[DUMMY_X_VALUE_FOR_MISSING_COSTS - 1, DUMMY_X_VALUE_FOR_MISSING_COSTS + 1] if not x_data_is_valid else None), yaxis=dict(title=y_col_to_use) ) return fig for agent, group in data_plot.groupby(agent_col): hover_x_display = "%{x:.2f}" if x_data_is_valid else str(DUMMY_X_VALUE_FOR_MISSING_COSTS) + " (fixed)" fig.add_trace(go.Scatter( x=group[x_col_to_use], y=group[y_col_to_use], mode='markers', name=str(agent), hovertemplate=f"{x_axis_label}: {hover_x_display}
{y_col_to_use}: %{{y:.2f}}{str(agent)}", marker=dict(size=10) )) # Configure layout xaxis_config = dict(title=x_axis_label) if not x_data_is_valid: # If using dummy x, set a tighter, fixed range for x-axis xaxis_config['range'] = [DUMMY_X_VALUE_FOR_MISSING_COSTS - 1, DUMMY_X_VALUE_FOR_MISSING_COSTS + 1] xaxis_config['tickvals'] = [DUMMY_X_VALUE_FOR_MISSING_COSTS] # Show only one tick at the dummy value xaxis_config['ticktext'] = [str(DUMMY_X_VALUE_FOR_MISSING_COSTS)] else: # Real x-data xaxis_config['rangemode'] = "tozero" fig.update_layout( title=f"{y_col_to_use} vs. {x_axis_label}", xaxis=xaxis_config, yaxis=dict(title=y_col_to_use, rangemode="tozero"), legend_title_text=agent_col ) return fig