# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Benchmark Environment Clients. Provides HTTP and WebSocket clients for the Benchmark Environment. """ from typing import Dict from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from openenv.core.http_env_client import HTTPEnvClient from openenv.core.ws_env_client import WebSocketEnvClient from .models import BenchmarkAction, BenchmarkObservation class BenchmarkEnv(HTTPEnvClient[BenchmarkAction, BenchmarkObservation]): """HTTP client for the Benchmark Environment.""" def _step_payload(self, action: BenchmarkAction) -> Dict: return {"wait_seconds": action.wait_seconds} def _parse_result(self, payload: Dict) -> StepResult[BenchmarkObservation]: obs_data = payload.get("observation", {}) observation = BenchmarkObservation( host_url=obs_data.get("host_url", ""), pid=obs_data.get("pid", 0), session_hash=obs_data.get("session_hash", ""), waited_seconds=obs_data.get("waited_seconds", 0.0), timestamp=obs_data.get("timestamp", 0.0), done=payload.get("done", False), reward=payload.get("reward"), metadata=obs_data.get("metadata", {}), ) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> State: return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), ) class BenchmarkEnvWS(WebSocketEnvClient[BenchmarkAction, BenchmarkObservation]): """WebSocket client for the Benchmark Environment.""" def _step_payload(self, action: BenchmarkAction) -> Dict: return {"wait_seconds": action.wait_seconds} def _parse_result(self, payload: Dict) -> StepResult[BenchmarkObservation]: obs_data = payload.get("observation", {}) observation = BenchmarkObservation( host_url=obs_data.get("host_url", ""), pid=obs_data.get("pid", 0), session_hash=obs_data.get("session_hash", ""), waited_seconds=obs_data.get("waited_seconds", 0.0), timestamp=obs_data.get("timestamp", 0.0), done=payload.get("done", False), reward=payload.get("reward"), metadata=obs_data.get("metadata", {}), ) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> State: return State( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), )