|
|
|
|
|
|
|
|
|
|
|
|
| """My Env Environment Client."""
|
|
|
| from typing import Dict
|
|
|
| from openenv.core.client_types import StepResult
|
| from openenv.core.env_server.types import State
|
| from openenv.core import EnvClient
|
|
|
| from .models import MyAction, MyObservation
|
|
|
|
|
| class MyEnv(
|
| EnvClient[MyAction, MyObservation]
|
| ):
|
| """
|
| Client for the My Env Environment.
|
|
|
| This client maintains a persistent WebSocket connection to the environment server,
|
| enabling efficient multi-step interactions with lower latency.
|
| Each client instance has its own dedicated environment session on the server.
|
|
|
| Example:
|
| >>> # Connect to a running server
|
| >>> with MyEnv(base_url="http://localhost:8000") as client:
|
| ... result = client.reset()
|
| ... print(result.observation.echoed_message)
|
| ...
|
| ... result = client.step(MyAction(message="Hello!"))
|
| ... print(result.observation.echoed_message)
|
|
|
| Example with Docker:
|
| >>> # Automatically start container and connect
|
| >>> client = MyEnv.from_docker_image("my_env-env:latest")
|
| >>> try:
|
| ... result = client.reset()
|
| ... result = client.step(MyAction(message="Test"))
|
| ... finally:
|
| ... client.close()
|
| """
|
|
|
| def _step_payload(self, action: MyAction) -> Dict:
|
| """
|
| Convert MyAction to JSON payload for step message.
|
|
|
| Args:
|
| action: MyAction instance
|
|
|
| Returns:
|
| Dictionary representation suitable for JSON encoding
|
| """
|
| return {
|
| "message": action.message,
|
| }
|
|
|
| def _parse_result(self, payload: Dict) -> StepResult[MyObservation]:
|
| """
|
| Parse server response into StepResult[MyObservation].
|
|
|
| Args:
|
| payload: JSON response data from server
|
|
|
| Returns:
|
| StepResult with MyObservation
|
| """
|
| obs_data = payload.get("observation", {})
|
| observation = MyObservation(
|
| echoed_message=obs_data.get("echoed_message", ""),
|
| message_length=obs_data.get("message_length", 0),
|
| done=payload.get("done", False),
|
| reward=payload.get("reward"),
|
| metadata=obs_data.get("metadata", {}),
|
| )
|
|
|
| return StepResult(
|
| observation=observation,
|
| reward=payload.get("reward"),
|
| done=payload.get("done", False),
|
| )
|
|
|
| def _parse_state(self, payload: Dict) -> State:
|
| """
|
| Parse server response into State object.
|
|
|
| Args:
|
| payload: JSON response from state request
|
|
|
| Returns:
|
| State object with episode_id and step_count
|
| """
|
| return State(
|
| episode_id=payload.get("episode_id"),
|
| step_count=payload.get("step_count", 0),
|
| )
|
|
|