import logging
from typing import Optional, Any, Dict # Added Dict
from zoneinfo import ZoneInfo
# datasets import might not be strictly needed by LeaderboardViewer itself anymore,
# but _get_dataframe might still use types from it if EvalResult refers to them.
# For now, let's keep it if your EvalResult or SuiteConfig models have dependencies.
# If not, it can be removed from here.
import datasets # Potentially removable from this file
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import pandas as pd
import seaborn as sns
import json # For loading the local JSON file
import os # For checking file existence
from agenteval import compute_summary_statistics
from agenteval.config import SuiteConfig
from agenteval.models import EvalResult
logger = logging.getLogger(__name__)
import logging
from typing import Optional, Any, Dict, List # Added List
from zoneinfo import ZoneInfo # Assuming this might be used by SuiteConfig/EvalResult or _get_dataframe
import json
import os
# Assuming these are correctly imported from your project
from agenteval.config import SuiteConfig
from agenteval.models import EvalResult
# from agenteval import compute_summary_statistics # Used by _get_dataframe
class DataTransformer:
"""
Load and visualize leaderboard from a single, local JSON result file.
"""
_INFORMAL_TO_FORMAL_NAME_MAP = {
"lit": "Literature Understanding",
"data": "Data Analysis",
"code": "Code Execution",
"discovery": "Discovery",
"arxivdigestables_validation": "Arxivdigestables Validation",
"sqa_dev": "Sqa Dev",
"litqa2_validation": "Litqa2 Validation",
"paper_finder_validation": "Paper Finder Validation",
"discoverybench_validation": "Discoverybench Validation",
"core_bench_validation": "Core Bench Validation",
"ds1000_validation": "DS1000 Validation",
"e2e_discovery_validation": "E2E Discovery Validation",
"super_validation": "Super Validation",
# Add any other raw names that can appear in task.name or task.tags
}
def __init__(
self,
json_file_path: str, # Mandatory: path to the local JSON file
split: str, # Still needed for context within the JSON's suite_config
is_internal: bool = False
):
self._json_file_path = json_file_path
self._split = split
self._internal = is_internal
self._loaded_json_data: Optional[Dict[str, Any]] = None
self._cfg: Optional[SuiteConfig] = None
logger.info(f"Initializing LeaderboardViewer with local JSON file: {self._json_file_path}")
# --- Load and Validate JSON data ---
if not os.path.exists(self._json_file_path):
raise FileNotFoundError(f"JSON file not found at path: {self._json_file_path}")
try:
with open(self._json_file_path, 'r', encoding='utf-8') as f:
self._loaded_json_data = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse JSON from local file {self._json_file_path}: {e}")
except Exception as e:
raise ValueError(f"Error reading local file {self._json_file_path}: {e}")
if not self._loaded_json_data:
raise ValueError(f"No data loaded from JSON file {self._json_file_path}.")
try:
eval_result = EvalResult.model_validate(self._loaded_json_data)
except Exception as e:
raise ValueError(f"Failed to validate JSON data from file '{self._json_file_path}' against EvalResult model: {e}")
self._cfg = eval_result.suite_config
if not isinstance(self._cfg, SuiteConfig):
raise TypeError(f"self._cfg is not a SuiteConfig object after loading from '{self._json_file_path}', got {type(self._cfg)}.")
# --- Populate Tag Map (Corrected Placement and Helper Function Access) ---
self.tag_map: dict[str, list[str]] = {}
# Access tasks from the loaded config
tasks_for_split: List[Any] = self._cfg.get_tasks(self._split) # Assuming get_tasks returns a list of task-like objects
for task in tasks_for_split:
# Ensure task object has 'name' and 'tags' attributes
if not hasattr(task, 'name') or not hasattr(task, 'tags'):
logger.warning(f"Task object {task} is missing 'name' or 'tags' attribute. Skipping.")
continue
formal_task_display_name = self._get_formal_display_name_static(task.name) # Use the helper method
if not (task.tags or []):
continue
for raw_tag_name in task.tags:
formal_tag_display_name_key = self._get_formal_display_name_static(raw_tag_name)
self.tag_map.setdefault(formal_tag_display_name_key, []).append(formal_task_display_name)
for key in self.tag_map:
self.tag_map[key] = sorted(list(set(self.tag_map[key])))
# --- Helper function defined as a static method or regular method ---
# Option 1: Static method (doesn't need 'self', uses the class attribute)
@staticmethod
def _get_formal_display_name_static(raw_name: str) -> str:
"""
Helper function to get the formal display name for a raw tag or task name.
Uses the class's map and provides a fallback.
"""
return DataTransformer._INFORMAL_TO_FORMAL_NAME_MAP.get(raw_name, raw_name.replace("_", " ").title())
def _load(self) -> tuple[pd.DataFrame, dict[str, list[str]]]:
"""
Prepares the DataFrame from the loaded JSON data.
The JSON data is already loaded and validated in __init__.
"""
if self._loaded_json_data is None or self._cfg is None:
# This should not happen if __init__ completed successfully
raise RuntimeError("LeaderboardViewer2 not properly initialized. JSON data or SuiteConfig is missing.")
# The _get_dataframe function expects a list of records.
# Since we have a single JSON file representing one result, wrap it in a list.
records_list: list[dict] = [self._loaded_json_data]
overview_df = _get_dataframe(
records_list=records_list,
split=self._split,
is_internal=self._internal,
suite_config=self._cfg, # Pass the SuiteConfig loaded in __init__
)
return overview_df, self.tag_map
# --- view method remains the same as your last version ---
def view(
self,
tag: Optional[str] = None,
with_plots: bool = False,
use_plotly: bool = False,
) -> tuple[pd.DataFrame, dict[str, Any]]:
data, tag_map = self._load() # tag_map is also returned by _load now
print(f"AHAHASHJDBFGASJHDBJAHSDB,AHDB {tag_map}")
print(f"THIS IS THE DATA DATA DTAA {data.columns}")
if data.empty or (len(data) == 1 and data.iloc[0].get("Agent") == "No data"):
logger.warning("No data available to view. Returning empty DataFrame and plots.")
return data, {}
base_cols = ["Agent", "Submitter", "Date", "Logs"]
existing_cols = [col for col in base_cols if col in data.columns]
primary_score_col: str
group_metric_names: list[str]
if tag is None:
primary = "Overall"
group = list(tag_map.keys())
else:
primary = tag
group = tag_map.get(tag, [])
if f"{primary} Score" in data.columns:
data = data.sort_values(f"{primary} Score", ascending=False)
else:
logger.warning(f"Primary metric '{primary}' for sorting not found. Data will not be sorted by it.")
metrics_to_display = []
if f"{primary} Cost" in data.columns:
metrics_to_display.append(f"{primary} Cost")
if f"{primary} Score" in data.columns:
metrics_to_display.append(f"{primary} Score")
for g_item in group:
if g_item in data.columns:
metrics_to_display.append(g_item)
if f"{g_item} Cost" in data.columns:
metrics_to_display.append(f"{g_item} Cost")
if f"{g_item} Score" in data.columns:
metrics_to_display.append(f"{g_item} Score")
final_cols_to_display = existing_cols + [m for m in metrics_to_display if m in data.columns]
final_cols_to_display = sorted(list(set(final_cols_to_display)), key=final_cols_to_display.index)
df_view = data.loc[:, final_cols_to_display].reset_index(drop=True)
plots: dict[str, Any] = {}
if with_plots:
plot_metric_names = [primary] + [g_item for g_item in group if g_item in data.columns]
for metric_name in plot_metric_names:
score_col = f"{metric_name} Score"
cost_col = f"{metric_name} Cost"
if score_col in df_view.columns and cost_col in df_view.columns:
if use_plotly:
fig = _plot_scatter_plotly(df_view, x=cost_col, y=score_col, agent_col="Agent")
plots[f"scatter_{metric_name}"] = fig
else:
logger.warning(
f"Skipping plot for '{metric_name}': score column '{score_col}' or cost column '{cost_col}' not found."
)
return df_view, plots
def _safe_round(value, digits=2):
return round(value, digits) if isinstance(value, (float, int)) and pd.notna(value) else value
def _get_dataframe(
records_list: list[dict],
split: str,
is_internal: bool,
suite_config: SuiteConfig,
timezone: str = "US/Pacific",
) -> pd.DataFrame:
# This function remains the same as in the previous version you provided.
# It takes a list of records (which will be a list containing one item
# from the loaded JSON file) and processes it.
if not records_list:
logger.warning(f"No records provided to _get_dataframe for split '{split}'. Returning empty DataFrame with placeholder.")
expected_pretty_cols = ["Agent Name", "Submitter", "Date", "Overall Score", "Logs"]
empty_df = pd.DataFrame({p_col: ["No data"] for p_col in expected_pretty_cols})
return empty_df
cfg = suite_config
rows = []
for itm_idx, itm in enumerate(records_list):
if not isinstance(itm, dict):
logger.warning(f"Item {itm_idx} in records_list is not a dict, skipping.")
continue
try:
ev = EvalResult.model_validate(itm)
except Exception as e:
logger.error(f"Failed to validate item {itm_idx} with EvalResult: {itm}. Error: {e}")
continue
sub = ev.submission
date_str = None
if sub.submit_time is not None:
submit_dt = sub.submit_time
if not isinstance(submit_dt, pd.Timestamp):
if submit_dt.tzinfo is None:
logger.debug(f"Submission time for {sub.agent_name} is timezone-naive, assuming UTC.")
submit_dt = submit_dt.replace(tzinfo=ZoneInfo("UTC"))
date_str = pd.Timestamp(submit_dt).tz_convert(ZoneInfo(timezone)).strftime("%Y-%m-%d")
else:
date_str = None
if not ev.results:
logger.warning(
f"Skipping submission {sub.agent_name} ({sub.username or 'N/A'}) "
f"({sub.submit_time or 'N/A'}) due to no results."
)
continue
stats = compute_summary_statistics(
suite_config=cfg, split=split, results=ev.results
)
flat = {}
print(f"STATS STATS ASTATAS SD T S T A A {stats}")
for key, s_obj in stats.items():
parts = key.split("/")
if parts[0] == "overall":
flat["overall/score"] = _safe_round(getattr(s_obj, 'score', np.nan))
flat["overall/cost"] = _safe_round(getattr(s_obj, 'cost', np.nan))
elif parts[0] == "tag" and len(parts) > 1:
tag_name = parts[1]
flat[f"tag/{tag_name}/score"] = _safe_round(getattr(s_obj, 'score', np.nan))
flat[f"tag/{tag_name}/cost"] = _safe_round(getattr(s_obj, 'cost', np.nan))
elif parts[0] == "task" and len(parts) > 1:
task_name = parts[1]
score = getattr(s_obj, 'score', np.nan)
cost = getattr(s_obj, 'cost', np.nan)
score_stderr = getattr(s_obj, 'score_stderr', np.nan)
cost_stderr = getattr(s_obj, 'cost_stderr', np.nan)
flat[f"task/{task_name}/score"] = _safe_round(score)
flat[f"task/{task_name}/score_ci"] = _safe_round(score_stderr * 1.96 if pd.notna(score_stderr) else np.nan)
flat[f"task/{task_name}/cost"] = _safe_round(cost)
flat[f"task/{task_name}/cost_ci"] = _safe_round(cost_stderr * 1.96 if pd.notna(cost_stderr) else np.nan)
else:
logger.debug(f"Uncommon key structure from compute_summary_statistics: '{key}'. Attempting generic add.")
if hasattr(s_obj, 'score'):
flat[f"{key}/score"] = _safe_round(s_obj.score)
if hasattr(s_obj, 'cost'):
flat[f"{key}/cost"] = _safe_round(s_obj.cost)
current_logs_url = None
if is_internal and sub.logs_url:
current_logs_url = str(sub.logs_url)
elif not is_internal and sub.logs_url_public:
current_logs_url = str(sub.logs_url_public)
rows.append(
{
"agent_name": sub.agent_name or "N/A",
"username": sub.username or "N/A",
"submit_time": date_str,
**flat,
"logs_url": current_logs_url,
}
)
if not rows:
logger.warning(f"No valid rows generated from records_list for split '{split}'. Returning empty DataFrame with placeholder.")
expected_pretty_cols = ["Agent", "Submitter", "Date", "Overall Score", "Overall Cost", "Logs"]
empty_df = pd.DataFrame({p_col: ["No data"] for p_col in expected_pretty_cols})
return empty_df
df = pd.DataFrame(rows)
pretty_cols = {c: _pretty_column_name(c) for c in df.columns if c in df.columns}
overview = df.rename(columns=pretty_cols)
return overview
def _pretty_column_name(col: str) -> str:
"""Map raw column name to display name."""
# --- Step 1: Fixed, direct mappings ---
fixed_mappings = {
"submit_time": "Date",
"agent_name": "Agent",
"username": "Submitter",
"logs_url": "Logs",
"overall/score": "Overall Score",
"overall/cost": "Overall Cost",
}
if col in fixed_mappings:
return fixed_mappings[col]
# --- Step 2: Define your mapping for informal names to descriptive names ---
informal_map = DataTransformer._INFORMAL_TO_FORMAL_NAME_MAP
# --- Step 3: Dynamic mappings for task or tag columns using the informal_to_formal_name_map ---
parts = col.split("/")
if len(parts) == 3:
item_type, informal_name, metric_suffix = parts #
formal_name = informal_map.get(informal_name)
if formal_name is None:
formal_name = informal_name.replace("_", " ").title()
print(f"[DEBUG _pretty_column_name] Informal name '{informal_name}' not in map, using fallback: '{formal_name}'")
if metric_suffix == "score":
return f"{formal_name} Score"
if metric_suffix == "cost":
return f"{formal_name} Cost"
if metric_suffix == "score_ci":
return f"{formal_name} Score 95% CI"
if metric_suffix == "cost_ci":
return f"{formal_name} Cost 95% CI"
# --- Step 4: Fallback for columns that don't match the "type/name/metric" pattern ---
if "/" not in col:
return col.replace("_", " ").title()
else:
return parts[-1].replace("_", " ").title()
DEFAULT_Y_COLUMN = "Overall Score"
DUMMY_X_VALUE_FOR_MISSING_COSTS = 0 # Value to use if x-axis data (costs) is missing
def _plot_scatter_plotly(
data: pd.DataFrame,
x: Optional[str],
y: str,
agent_col: str = "Agent"
) -> go.Figure:
x_col_to_use = x
y_col_to_use = y
# 1. Check if y-column exists
if y_col_to_use not in data.columns:
logger.error(
f"y-axis column '{y_col_to_use}' MUST exist in DataFrame. "
f"Cannot generate plot. Available columns: {data.columns.tolist()}"
)
return go.Figure()
# 2. Check if agent_col exists
if agent_col not in data.columns:
logger.warning(
f"Agent column '{agent_col}' not found in DataFrame. "
f"Available columns: {data.columns.tolist()}. Returning empty figure."
)
return go.Figure()
# 3. Prepare data (make a copy, handle numeric conversion for y)
data_plot = data.copy()
try:
data_plot[y_col_to_use] = pd.to_numeric(data_plot[y_col_to_use], errors='coerce')
except Exception as e:
logger.error(f"Error converting y-column '{y_col_to_use}' to numeric: {e}. Returning empty figure.")
return go.Figure()
# 4. Handle x-column (costs)
x_axis_label = x_col_to_use if x_col_to_use else "Cost (Data N/A)" # Label for the x-axis
x_data_is_valid = False
if x_col_to_use and x_col_to_use in data_plot.columns:
try:
data_plot[x_col_to_use] = pd.to_numeric(data_plot[x_col_to_use], errors='coerce')
# Check if there's any non-NaN data after coercion for x
if data_plot[x_col_to_use].notna().any():
x_data_is_valid = True
else:
logger.info(f"x-axis column '{x_col_to_use}' exists but contains all NaN/None values after numeric conversion.")
except Exception as e:
logger.warning(f"Error converting x-column '{x_col_to_use}' to numeric: {e}. Will use dummy x-values.")
# x_data_is_valid remains False
else:
if x_col_to_use: # Name was provided but column doesn't exist
logger.warning(f"x-axis column '{x_col_to_use}' not found in DataFrame.")
else: # x (column name) was None
logger.info("x-axis column name was not provided (is None).")
if not x_data_is_valid:
logger.info(f"Using dummy x-value '{DUMMY_X_VALUE_FOR_MISSING_COSTS}' for all data points as x-data is missing or invalid.")
# Create a new column with the dummy x-value for all rows
# Use a unique name for this dummy column to avoid potential clashes
dummy_x_col_name = "__dummy_x_for_plotting__"
data_plot[dummy_x_col_name] = DUMMY_X_VALUE_FOR_MISSING_COSTS
x_col_to_use = dummy_x_col_name # Update x_col_to_use to point to our dummy data
x_axis_label = x if x else "Cost (Data N/A)" # Use original x name for label if provided
# or a generic label if x was None.
# Could also be f"Cost (Fixed at {DUMMY_X_VALUE_FOR_MISSING_COSTS})"
# 5. Drop rows where y is NaN (x is now guaranteed to have values, either real or dummy)
data_plot.dropna(subset=[y_col_to_use], inplace=True)
fig = go.Figure()
if data_plot.empty:
logger.warning(f"No valid data to plot for y='{y_col_to_use}' (and x='{x_col_to_use}') after cleaning NaNs from y.")
# Still return a figure object, but it will be empty. Update layout for clarity.
fig.update_layout(
title=f"{y_col_to_use} vs. {x_axis_label} (No Data)",
xaxis=dict(title=x_axis_label, range=[DUMMY_X_VALUE_FOR_MISSING_COSTS - 1, DUMMY_X_VALUE_FOR_MISSING_COSTS + 1] if not x_data_is_valid else None),
yaxis=dict(title=y_col_to_use)
)
return fig
for agent, group in data_plot.groupby(agent_col):
hover_x_display = "%{x:.2f}" if x_data_is_valid else str(DUMMY_X_VALUE_FOR_MISSING_COSTS) + " (fixed)"
fig.add_trace(go.Scatter(
x=group[x_col_to_use],
y=group[y_col_to_use],
mode='markers',
name=str(agent),
hovertemplate=f"{x_axis_label}: {hover_x_display}
{y_col_to_use}: %{{y:.2f}}{str(agent)}",
marker=dict(size=10)
))
# Configure layout
xaxis_config = dict(title=x_axis_label)
if not x_data_is_valid: # If using dummy x, set a tighter, fixed range for x-axis
xaxis_config['range'] = [DUMMY_X_VALUE_FOR_MISSING_COSTS - 1, DUMMY_X_VALUE_FOR_MISSING_COSTS + 1]
xaxis_config['tickvals'] = [DUMMY_X_VALUE_FOR_MISSING_COSTS] # Show only one tick at the dummy value
xaxis_config['ticktext'] = [str(DUMMY_X_VALUE_FOR_MISSING_COSTS)]
else: # Real x-data
xaxis_config['rangemode'] = "tozero"
fig.update_layout(
title=f"{y_col_to_use} vs. {x_axis_label}",
xaxis=xaxis_config,
yaxis=dict(title=y_col_to_use, rangemode="tozero"),
legend_title_text=agent_col
)
return fig