Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Benchmark Environment Clients. | |
| Provides HTTP and WebSocket clients for the Benchmark Environment. | |
| """ | |
| from typing import Dict | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from openenv.core.http_env_client import HTTPEnvClient | |
| from openenv.core.ws_env_client import WebSocketEnvClient | |
| from .models import BenchmarkAction, BenchmarkObservation | |
| class BenchmarkEnv(HTTPEnvClient[BenchmarkAction, BenchmarkObservation]): | |
| """HTTP client for the Benchmark Environment.""" | |
| def _step_payload(self, action: BenchmarkAction) -> Dict: | |
| return {"wait_seconds": action.wait_seconds} | |
| def _parse_result(self, payload: Dict) -> StepResult[BenchmarkObservation]: | |
| obs_data = payload.get("observation", {}) | |
| observation = BenchmarkObservation( | |
| host_url=obs_data.get("host_url", ""), | |
| pid=obs_data.get("pid", 0), | |
| session_hash=obs_data.get("session_hash", ""), | |
| waited_seconds=obs_data.get("waited_seconds", 0.0), | |
| timestamp=obs_data.get("timestamp", 0.0), | |
| done=payload.get("done", False), | |
| reward=payload.get("reward"), | |
| metadata=obs_data.get("metadata", {}), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |
| class BenchmarkEnvWS(WebSocketEnvClient[BenchmarkAction, BenchmarkObservation]): | |
| """WebSocket client for the Benchmark Environment.""" | |
| def _step_payload(self, action: BenchmarkAction) -> Dict: | |
| return {"wait_seconds": action.wait_seconds} | |
| def _parse_result(self, payload: Dict) -> StepResult[BenchmarkObservation]: | |
| obs_data = payload.get("observation", {}) | |
| observation = BenchmarkObservation( | |
| host_url=obs_data.get("host_url", ""), | |
| pid=obs_data.get("pid", 0), | |
| session_hash=obs_data.get("session_hash", ""), | |
| waited_seconds=obs_data.get("waited_seconds", 0.0), | |
| timestamp=obs_data.get("timestamp", 0.0), | |
| done=payload.get("done", False), | |
| reward=payload.get("reward"), | |
| metadata=obs_data.get("metadata", {}), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |