diff --git a/examples/echo_mcp_demo.py b/examples/echo_mcp_demo.py new file mode 100644 index 00000000..8f465447 --- /dev/null +++ b/examples/echo_mcp_demo.py @@ -0,0 +1,56 @@ +""" +Example: Using Echo Environment with MCP + +This example demonstrates: +1. Connecting to echo_env server +2. Listing available tools via MCP +3. Calling tools using the step() API +""" + +import asyncio + +try: + from core.env_server.types import CallToolAction, ListToolsAction +except ImportError: + from openenv_core.env_server.types import CallToolAction, ListToolsAction + +from envs.echo_env import EchoEnv + + +async def main(): + # Connect to echo_env (assumes server is running on localhost:8000) + # To start the server: uvicorn envs.echo_env.server.app:app + client = EchoEnv(base_url="http://localhost:8000") + + print("=== Echo Environment MCP Demo ===\n") + + # Reset the environment + print("1. Resetting environment...") + result = client.reset() + print(f" Reset result: {result.observation.metadata}\n") + + # List available tools using step API + print("2. Listing available tools...") + list_action = ListToolsAction() + list_result = client.step(list_action) + for tool in list_result.observation.tools: + print(f" - {tool['name']}: {tool['description']}") + print() + + # Call echo_message tool using step API + print("3. Calling echo_message tool...") + call_action = CallToolAction(tool_name="echo_message", parameters={"message": "Hello from MCP!"}) + call_result = client.step(call_action) + print(f" Result: {call_result.observation.result}\n") + + # Check environment state + print("4. Checking environment state...") + state = client.state + print(f" Episode ID: {state.episode_id}") + print(f" Step count: {state.step_count}\n") + + print("Demo complete!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/test_mcp_integration.py b/examples/test_mcp_integration.py new file mode 100644 index 00000000..5b239353 --- /dev/null +++ b/examples/test_mcp_integration.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +"""Quick test script to verify MCP integration works.""" + +import asyncio +import sys +sys.path.insert(0, 'src') + +from envs.echo_env.server.echo_environment import EchoEnvironment +from core.env_server.types import ListToolsAction, CallToolAction + + +async def main(): + print("=" * 60) + print("Testing MCP Integration") + print("=" * 60) + + # Create echo environment (MCPEnvironment handles MCP setup automatically) + print("\n1. Creating Echo Environment...") + env = EchoEnvironment() + + # Test list tools + print("\n2. Testing ListToolsAction...") + list_action = ListToolsAction() + obs = await env._handle_mcp_action(list_action) + print(f" - Done: {obs.done}") + print(f" - Has 'tools' attribute: {hasattr(obs, 'tools')}") + if hasattr(obs, "tools"): + print(f" - Number of tools: {len(obs.tools)}") + print(f" - Tool names: {[t['name'] for t in obs.tools]}") + else: + print(" - ERROR: No 'tools' attribute!") + return False + + # Test call tool + print("\n3. Testing CallToolAction...") + call_action = CallToolAction( + tool_name="echo_message", + parameters={"message": "Hello MCP!"} + ) + obs = await env._handle_mcp_action(call_action) + print(f" - Done: {obs.done}") + print(f" - Has 'result' attribute: {hasattr(obs, 'result')}") + print(f" - Error: {obs.error}") + if hasattr(obs, "result") and obs.result is not None: + result = obs.result + print(f" - Result type: {type(result)}") + print(f" - Result: {result}") + else: + print(" - ERROR: No 'result' attribute or result is None!") + return False + + print("\n" + "=" * 60) + print("✅ All tests passed!") + print("=" * 60) + return True + + +if __name__ == "__main__": + success = asyncio.run(main()) + sys.exit(0 if success else 1) diff --git a/rfcs/RFC-003-implementation-journal.md b/rfcs/RFC-003-implementation-journal.md new file mode 100644 index 00000000..0fe976bd --- /dev/null +++ b/rfcs/RFC-003-implementation-journal.md @@ -0,0 +1,329 @@ +# RFC-003 Implementation Journal + +## Overview +This journal tracks the implementation of RFC-003: MCP (Model Context Protocol) Support for OpenEnv. + +**RFC Goal**: Integrate MCP as the universal interface for all actions exposed to agents, supporting both tool-calling and CodeAct paradigms. + +**Implementation Phases**: +- **PR #1** (Current): Core MCP infrastructure + echo_env conversion +- **PR #2** (Future): Migrate remaining environments +- **PR #3** (Future): CodeAct environment with MCP integration +- **PR #4** (Future): Fix double marshaling with callable introspection + +--- + +## PR #1: Core MCP Infrastructure + Echo Env + +**Branch**: (will be set when starting sapling commit) + +**Goals**: +1. Add MCP client/server base classes to core package +2. Add new action types (ListToolsAction, CallToolAction) +3. Integrate MCP into Environment base class +4. Add /mcp JSON-RPC endpoint to FastAPI +5. Convert echo_env as reference implementation +6. Maintain backward compatibility with existing envs + +### Architecture Decisions + +#### 1. Dual Interface Model +**Decision**: Environments expose both HTTP (orchestration) and MCP (agent actions) interfaces. + +- **HTTP `/step` endpoint**: Accepts ListToolsAction/CallToolAction, routes to MCP client internally +- **HTTP `/mcp` endpoint**: Direct JSON-RPC access to MCP servers (for production/inference) +- **HTTP `/reset`, `/state`**: Unchanged (orchestration only, not exposed as tools) + +**Rationale**: +- Training code uses `/step` API for gym-style control +- Production agents can use `/mcp` directly without step wrapper +- Minimizes delta between training and production + +#### 2. In-Process MCP Server +**Decision**: Run MCP server in same Python process as FastAPI, using asyncio. + +**Implementation approach**: +- FastMCP server and FastAPI share same event loop +- Communication via stdio using asyncio subprocess +- No threading required + +**Rationale**: +- Simpler deployment (one process) +- Lower latency for local tools +- Easier debugging + +#### 3. Library Choice +**Decision**: Use FastMCP for server creation, MCP SDK for client. + +**Rationale**: +- FastMCP provides high-level decorators for tool registration +- Official MCP SDK for standard client implementation +- Both maintained by Anthropic + +#### 4. No Callable Introspection (Yet) +**Decision**: Defer callable introspection optimization to PR #4. + +**Rationale**: +- Keep PR #1 focused on infrastructure +- Callable introspection only needed for CodeAct (PR #3) +- Easier to review incrementally + +--- + +### Implementation Checklist + +#### Core Infrastructure +- [x] Create `src/core/mcp/__init__.py` +- [x] Create `src/core/mcp/client.py` (MCPClient class) +- [x] Create `src/core/mcp/server.py` (LocalMCPServer class) +- [x] Add dependencies to `src/core/pyproject.toml` + +#### Type System +- [x] Add `ListToolsAction` to `src/core/env_server/types.py` +- [x] Add `CallToolAction` to `src/core/env_server/types.py` +- [x] Export new types from `src/core/env_server/__init__.py` + +#### Environment Integration +- [x] Add `mcp_client` parameter to `Environment.__init__` +- [x] Add `_handle_mcp_action()` method to `Environment` +- [x] Update HTTP server /step endpoint to route MCP actions + +#### HTTP Server +- [x] Add `/mcp` endpoint to `create_fastapi_app()` +- [x] Add mcp_server parameter to HTTPEnvServer +- [x] Update action deserialization to handle MCP actions +- [x] Ensure backward compatibility with existing endpoints + +#### Echo Env Conversion +- [x] Deprecate `src/envs/echo_env/models.py` (EchoAction deprecated) +- [x] Create `src/envs/echo_env/server/mcp_server.py` +- [x] Define `echo_message` tool using FastMCP +- [x] Update `src/envs/echo_env/server/echo_environment.py` +- [x] Update `src/envs/echo_env/server/app.py` to initialize MCP +- [x] Update `src/envs/echo_env/client.py` with MCP actions +- [ ] Update echo_env Dockerfile dependencies + +#### Tests +- [ ] Create `tests/core/mcp/test_client.py` +- [ ] Create `tests/core/mcp/test_server.py` +- [ ] Update `tests/envs/test_echo_env.py` +- [ ] Verify other envs still work (no regressions) + +#### Documentation +- [ ] Create example `examples/echo_mcp_demo.py` +- [ ] Update `CLAUDE.md` with MCP notes +- [ ] Update echo_env README + +--- + +### Implementation Notes + +#### Session: 2025-11-24 (Part 1) + +**Status**: Core infrastructure and echo_env conversion COMPLETED + +**What Was Implemented**: + +1. **Core MCP Infrastructure** (`src/core/mcp/`): + - Created `MCPClient` class for communicating with MCP servers + - Created `LocalMCPServer` wrapper around FastMCP + - Added `compose_servers()` function for combining multiple MCP servers + - In-process communication (FastMCP Server passed directly to client) + +2. **New Action Types** (`src/core/env_server/types.py`): + - `ListToolsAction`: Requests available tools from MCP servers + - `CallToolAction`: Calls a specific tool with parameters + +3. **Environment Integration** (`src/core/env_server/interfaces.py`): + - Added optional `mcp_client` parameter to `Environment.__init__` + - Implemented `_handle_mcp_action()` async method for processing MCP actions + - Returns observations with tools list or tool call results in metadata + +4. **HTTP Server Updates** (`src/core/env_server/http_server.py`): + - Added `/mcp` POST endpoint for direct JSON-RPC access + - Updated `/step` endpoint to handle ListToolsAction and CallToolAction + - Enhanced `_deserialize_action()` to detect and deserialize MCP action types + - Added `mcp_server` parameter to `HTTPEnvServer` and `create_fastapi_app()` + +5. **Echo Env Conversion**: + - Created `mcp_server.py` with `echo_message` tool using FastMCP decorators + - Rewrote `EchoEnvironment` to use MCP client instead of custom actions + - Updated `app.py` to initialize MCP server, client, and wire them together + - Deprecated `EchoAction` and `EchoObservation` in `models.py` with warnings + - Updated `EchoEnv` client to use `CallToolAction` with convenience methods + +**Key Implementation Details**: + +- **MCP Client**: Caches tool list on initialization for performance +- **Error Handling**: Tool call errors returned in observation metadata with "error" key +- **Action Deserialization**: Checks for "type" or "action_type" field to detect MCP actions +- **Backward Compatibility**: Existing environments unaffected (mcp_client is optional) + +**Next Steps**: +1. Write tests for MCP infrastructure +2. Test echo_env end-to-end +3. Create example demonstrating both `/step` and `/mcp` interfaces +4. Update documentation + +--- + +### Open Questions + +1. **Error Handling**: How should MCP errors be surfaced? + - ~~Option A: Raise exceptions (breaks step)~~ + - **Option B: Return error in observation** ✅ CHOSEN + - **Decision**: Errors returned in `observation.metadata["error"]` to maintain step flow + +2. **Tool Result Format**: How to structure tool call results in observations? + - **Current implementation**: `observation.metadata["result"]` contains tool output ✅ + - **List tools**: `observation.metadata["tools"]` contains array of tool schemas ✅ + +3. **Caching**: Should tool results be cached? + - **Current**: Tool list cached on client initialization, tool calls not cached ✅ + - Future: May add tool call caching in later PR + +4. **Testing**: How to test MCP integration without external dependencies? + - **Approach**: Use in-process FastMCP servers for testing + - Mock not needed since FastMCP can be instantiated directly + +--- + +### Testing Strategy + +1. **Unit Tests**: + - Test MCPClient can list tools + - Test MCPClient can call tools + - Test LocalMCPServer tool registration + - Mock not needed since FastMCP can be instantiated directly + +2. **Integration Tests**: + - Test echo_env with ListToolsAction + - Test echo_env with CallToolAction + - Test /mcp endpoint directly + - Verify other envs unchanged + +3. **Manual Testing**: + - Build echo_env Docker image + - Run examples/echo_mcp_demo.py + - Verify both /step and /mcp interfaces work + +--- + +### Running the Server and Tests + +#### Setup + +```bash +# Create virtual environment +uv venv + +# Install core package with MCP dependencies +uv pip install -e src/core/ + +# Clear Python cache (if needed) +find src/envs/echo_env -type d -name "__pycache__" -exec rm -rf {} + +``` + +#### Running Echo Env Server + +```bash +# From project root +cd src +uvicorn envs.echo_env.server.app:app --port 8000 --reload + +# Or using absolute path to uvicorn from venv +../.venv/bin/uvicorn envs.echo_env.server.app:app --port 8000 +``` + +Server will start on http://localhost:8000 + +#### Testing Endpoints + +```bash +# Health check +curl http://localhost:8000/health + +# Reset environment +curl -X POST http://localhost:8000/reset + +# List tools via /step endpoint +curl -X POST http://localhost:8000/step \ + -H "Content-Type: application/json" \ + -d '{"action": {"type": "ListToolsAction"}}' + +# Call tool via /step endpoint +curl -X POST http://localhost:8000/step \ + -H "Content-Type: application/json" \ + -d '{ + "action": { + "type": "CallToolAction", + "tool_name": "echo_message", + "parameters": {"message": "Hello MCP!"} + } + }' + +# Direct MCP access via /mcp endpoint (JSON-RPC) +curl -X POST http://localhost:8000/mcp \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "tools/list", + "id": 1 + }' + +curl -X POST http://localhost:8000/mcp \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "echo_message", + "arguments": {"message": "Hello from MCP endpoint!"} + }, + "id": 2 + }' +``` + +#### Running Tests + +```bash +# Install test dependencies +cd src/core +uv pip install -e ".[dev]" + +# Run all tests +pytest + +# Run specific test file +pytest tests/core/mcp/test_mcp.py + +# Run with verbose output +pytest -v tests/core/mcp/test_mcp.py +``` + +--- + +### Future Work (Later PRs) + +**PR #2: Environment Migration** +- Convert coding_env, browsergym_env, etc. to MCP pattern +- Document migration guide for environment authors + +**PR #3: CodeAct Integration** +- Modify PythonCodeActEnv to support MCP tools +- Add `list_tools()` function in code execution context +- Inject tool functions for direct calling + +**PR #4: Fix Double Marshaling** +- Add callable introspection to LocalMCPServer +- Store `_callables` dict alongside MCP protocol handlers +- Inject raw Python functions into CodeAct namespace +- Benchmark performance improvement + +--- + +## References + +- [RFC-003](../../rfcs/003-mcp-support.md) +- [MCP Specification](https://spec.modelcontextprotocol.io/) +- [FastMCP Documentation](https://github.com/jlowin/fastmcp) diff --git a/src/core/env_server/__init__.py b/src/core/env_server/__init__.py index 79e66535..a08e26c5 100644 --- a/src/core/env_server/__init__.py +++ b/src/core/env_server/__init__.py @@ -9,18 +9,32 @@ from .base_transforms import CompositeTransform, NullTransform from .http_server import HTTPEnvServer, create_app, create_fastapi_app from .interfaces import Environment, Message, ModelTokenizer, Transform -from .types import Action, Observation, State +from .mcp_environment import MCPEnvironment +from .types import ( + Action, + CallToolAction, + CallToolObservation, + ListToolsAction, + ListToolsObservation, + Observation, + State, +) from .web_interface import create_web_interface_app, WebInterfaceManager __all__ = [ # Core interfaces "Environment", + "MCPEnvironment", "Transform", "Message", "ModelTokenizer", # Types "Action", + "ListToolsAction", + "CallToolAction", "Observation", + "ListToolsObservation", + "CallToolObservation", "State", # Base transforms "CompositeTransform", diff --git a/src/core/env_server/http_server.py b/src/core/env_server/http_server.py index 207235f6..15748394 100644 --- a/src/core/env_server/http_server.py +++ b/src/core/env_server/http_server.py @@ -17,11 +17,14 @@ import os from concurrent.futures import ThreadPoolExecutor from dataclasses import asdict -from typing import Any, Dict, Type +from typing import Any, Dict, Optional, Type + +from fastapi import Body, FastAPI, Request from .interfaces import Environment -from .types import Action, Observation -from fastapi import Body, FastAPI +from .mcp_environment import MCPEnvironment +from .types import Action, CallToolAction, ListToolsAction, Observation + class HTTPEnvServer: """ @@ -64,6 +67,7 @@ def __init__( self.env = env self.action_cls = action_cls self.observation_cls = observation_cls + # Create thread pool for running sync code in async context # This is needed for environments using sync libraries (e.g., Playwright sync API) self._executor = ThreadPoolExecutor(max_workers=1) @@ -95,14 +99,23 @@ async def step(request: Dict[str, Any]) -> Dict[str, Any]: action_data = request.get("action", request) # TODO: Handle timeout_s, request_id, episode_id from request if provided - # Deserialize action + # Deserialize action (handle MCP actions specially) action = self._deserialize_action(action_data) - # Execute step in thread pool to avoid blocking asyncio loop - loop = asyncio.get_event_loop() - observation = await loop.run_in_executor( - self._executor, self.env.step, action - ) + # Handle MCP actions asynchronously (don't use thread pool for async operations) + if isinstance(action, (ListToolsAction, CallToolAction)): + if not isinstance(self.env, MCPEnvironment): + raise RuntimeError( + f"Environment {type(self.env).__name__} received MCP action " + f"but is not a MCP environment." + ) + observation = await self.env._handle_mcp_action(action) + else: + # Execute regular step in thread pool to avoid blocking asyncio loop + loop = asyncio.get_event_loop() + observation = await loop.run_in_executor( + self._executor, self.env.step, action + ) # Return serialized observation return self._serialize_observation(observation) @@ -118,6 +131,104 @@ async def health() -> Dict[str, str]: """Health check endpoint.""" return {"status": "healthy"} + @app.post("/mcp") + async def mcp_endpoint(request: Request) -> Dict[str, Any]: + """ + MCP JSON-RPC endpoint for direct tool access (production/inference). + + This endpoint provides direct access to the MCP server without going + through the step() wrapper. Used for production agents that want to + call tools directly. + + Accepts JSON-RPC 2.0 requests: + - method: "tools/list" or "tools/call" + - params: method-specific parameters + - id: request ID (echoed in response) + + Returns: + JSON-RPC 2.0 response + """ + if not hasattr(self.env, "mcp_client") or self.env.mcp_client is None: + return { + "jsonrpc": "2.0", + "error": { + "code": -32603, + "message": "MCP server not configured for this environment", + }, + "id": None, + } + + try: + body = await request.json() + except (ValueError, TypeError): + return { + "jsonrpc": "2.0", + "error": { + "code": -32700, + "message": "Parse error: Invalid JSON", + }, + "id": None, + } + + method = body.get("method") + params = body.get("params", {}) + request_id = body.get("id") + + try: + # Reuse MCP client from environment (avoids creating duplicate client) + async with self.env.mcp_client: + if method == "tools/list": + tools = await self.env.mcp_client.list_tools() + return { + "jsonrpc": "2.0", + "result": { + "tools": [ + { + "name": tool.name, + "description": tool.description, + "inputSchema": tool.inputSchema, + } + for tool in tools + ] + }, + "id": request_id, + } + + elif method == "tools/call": + tool_name = params.get("name") + arguments = params.get("arguments", {}) + result = await self.env.mcp_client.call_tool( + tool_name, arguments + ) + + # Extract data from CallToolResult (FastMCP wraps results) + result_data = result.data if hasattr(result, "data") else result + + return { + "jsonrpc": "2.0", + "result": result_data, + "id": request_id, + } + + else: + return { + "jsonrpc": "2.0", + "error": { + "code": -32601, + "message": f"Method not found: {method}", + }, + "id": request_id, + } + + except Exception as e: + return { + "jsonrpc": "2.0", + "error": { + "code": -32603, + "message": f"Internal error: {str(e)}", + }, + "id": request_id, + } def _deserialize_action(self, action_data: Dict[str, Any]) -> Action: """ @@ -130,12 +241,30 @@ def _deserialize_action(self, action_data: Dict[str, Any]) -> Action: Action instance Note: - This is a simple implementation. Subclasses may need to override - for more complex deserialization logic. + This handles both environment-specific actions and MCP actions + (ListToolsAction, CallToolAction). """ - # Remove metadata if present (it will be set via kw_only field) - metadata = action_data.pop("metadata", {}) - action = self.action_cls(**action_data) + # Check if this is an MCP action by looking at the action type + action_type = action_data.get("type") or action_data.get("action_type") + + if action_type == "ListToolsAction": + return ListToolsAction() + + elif action_type == "CallToolAction": + tool_name = action_data.get("tool_name") + if tool_name is None: + raise ValueError("Missing required field 'tool_name' for CallToolAction") + return CallToolAction( + tool_name=tool_name, + parameters=action_data.get("parameters", {}), + ) + + # Otherwise, use the environment-specific action class + # Get metadata if present (don't mutate input dict) + metadata = action_data.get("metadata", {}) + action = self.action_cls( + **{k: v for k, v in action_data.items() if k != "metadata"} + ) action.metadata = metadata return action @@ -161,7 +290,7 @@ def _serialize_observation(self, observation: Observation) -> Dict[str, Any]: # Convert numpy arrays to lists for JSON serialization def _convert_numpy(obj): """Recursively convert numpy arrays to lists.""" - if hasattr(obj, '__array__'): # numpy array + if hasattr(obj, "__array__"): # numpy array return obj.tolist() elif isinstance(obj, dict): return {k: _convert_numpy(v) for k, v in obj.items()} @@ -174,7 +303,6 @@ def _convert_numpy(obj): # Extract reward and done (these are part of StepResult on client side) reward = obs_dict.pop("reward", None) done = obs_dict.pop("done", False) - obs_dict.pop("metadata", None) # Remove metadata from observation # Return in HTTPEnvClient expected format return { @@ -183,6 +311,7 @@ def _convert_numpy(obj): "done": done, } + def create_app( env: Environment, action_cls: Type[Action], @@ -191,33 +320,36 @@ def create_app( ) -> Any: """ Create a FastAPI application with or without web interface. - + This function creates a FastAPI app with the web interface enabled by default, including README integration for better user experience. - + Args: env: The Environment instance to serve action_cls: The Action subclass this environment expects observation_cls: The Observation subclass this environment returns env_name: Optional environment name for README loading - + Returns: FastAPI application instance with or without web interface and README integration """ # Check if web interface should be enabled # This can be controlled via environment variable or build argument - enable_web = ( - os.getenv("ENABLE_WEB_INTERFACE", "false").lower() in ("true", "1", "yes") + enable_web = os.getenv("ENABLE_WEB_INTERFACE", "false").lower() in ( + "true", + "1", + "yes", ) if enable_web: # Import web interface only when needed from .web_interface import create_web_interface_app + return create_web_interface_app(env, action_cls, observation_cls, env_name) else: # Use standard FastAPI app without web interface return create_fastapi_app(env, action_cls, observation_cls) - + def create_fastapi_app( env: Environment, diff --git a/src/core/env_server/mcp_environment.py b/src/core/env_server/mcp_environment.py new file mode 100644 index 00000000..d0064d0d --- /dev/null +++ b/src/core/env_server/mcp_environment.py @@ -0,0 +1,174 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +MCPEnvironment base class for environments that expose MCP tools. + +This module provides a base class that handles all the boilerplate for +MCP-based environments, allowing developers to just define their tools +using FastMCP decorators. +""" + +from __future__ import annotations + +from typing import Any + +from .interfaces import Environment +from .types import Action, CallToolAction, ListToolsAction, Observation, State + + +class MCPEnvironment(Environment): + """ + Base class for environments that expose tools via MCP. + + This class handles all the boilerplate of setting up MCP client/server + communication. Subclasses just need to provide a FastMCP server instance + with tools registered via decorators. + + The environment automatically handles ListToolsAction and CallToolAction, + delegating to the configured MCP server. + + IMPORTANT: MCPEnvironment is designed to be used via HTTPEnvServer (FastAPI). + The step() method should not be called directly due to async/await requirements + of the MCP client. HTTPEnvServer automatically routes MCP actions to the async + _handle_mcp_action() method. + + Example: + >>> from fastmcp import FastMCP + >>> from core.env_server import MCPEnvironment + >>> + >>> # Define MCP server with tools + >>> mcp = FastMCP("my_env") + >>> + >>> @mcp.tool() + >>> def my_tool(param: str) -> dict: + >>> return {"result": param} + >>> + >>> # Create environment + >>> env = MCPEnvironment(mcp) + >>> + >>> # Use with HTTPEnvServer + >>> from core.env_server import create_fastapi_app + >>> from core.models import Action, Observation + >>> app = create_fastapi_app(env, Action, Observation) + """ + + def __init__(self, mcp_server: Any): + """ + Initialize MCP environment. + + Args: + mcp_server: FastMCP server instance with tools registered + """ + from fastmcp import Client + + self.mcp_server = mcp_server + self.mcp_client = Client(mcp_server) + super().__init__() + + def reset(self) -> Observation: + """ + Reset the environment. + + Returns initial observation with done=False. MCP environments are + stateless by default (state is managed by the MCP server if needed). + """ + return Observation(done=False) + + def step(self, action: Action) -> Observation: + """ + Execute an MCP action in the environment. + + MCPEnvironment ONLY accepts MCP actions (ListToolsAction, CallToolAction). + + NOTE: This is a sync method that internally runs async MCP operations. + When called from sync context, it uses asyncio.run(). When called from + HTTPEnvServer (async context), HTTPEnvServer intercepts MCP actions and + calls _handle_mcp_action() directly to avoid blocking the event loop. + + Args: + action: MCP action to execute (ListToolsAction or CallToolAction) + + Returns: + Observation from action execution + + Raises: + ValueError: If action is not an MCP action type + """ + import asyncio + + if not isinstance(action, (ListToolsAction, CallToolAction)): + raise ValueError( + f"MCP environments only accept MCP actions (ListToolsAction, CallToolAction). " + f"Got: {type(action).__name__}" + ) + + # Handle MCP action - run async operation in sync context + return asyncio.run(self._handle_mcp_action(action)) + + async def _handle_mcp_action(self, action: Action) -> Observation: + """ + Handle MCP actions asynchronously. + + This method is called by HTTPEnvServer to handle MCP actions without + blocking the asyncio event loop. + + Args: + action: ListToolsAction or CallToolAction + + Returns: + ListToolsObservation or CallToolObservation + + Raises: + ValueError: If MCP client not configured or action type invalid + """ + from .types import CallToolObservation, ListToolsObservation + + if self.mcp_client is None: + raise ValueError("MCP client not configured for this environment") + + async with self.mcp_client: + if isinstance(action, ListToolsAction): + tools = await self.mcp_client.list_tools() + return ListToolsObservation( + done=False, + tools=[ + { + "name": tool.name, + "description": tool.description, + "inputSchema": tool.inputSchema, + } + for tool in tools + ], + ) + + elif isinstance(action, CallToolAction): + try: + result = await self.mcp_client.call_tool( + action.tool_name, action.parameters + ) + # Extract data from CallToolResult (FastMCP wraps results) + result_data = result.data if hasattr(result, "data") else result + return CallToolObservation( + done=False, result=result_data, tool_name=action.tool_name + ) + except Exception as e: + return CallToolObservation( + done=False, error=str(e), tool_name=action.tool_name + ) + + else: + raise ValueError(f"Unsupported MCP action type: {type(action)}") + + @property + def state(self) -> State: + """ + Get current environment state. + + MCP environments are stateless by default. State management can be + implemented in the MCP server using FastMCP's session persistence. + """ + return State() diff --git a/src/core/env_server/types.py b/src/core/env_server/types.py index 70da9f3c..7f58973e 100644 --- a/src/core/env_server/types.py +++ b/src/core/env_server/types.py @@ -19,6 +19,30 @@ class Action: metadata: Dict[str, Any] = field(default_factory=dict) +@dataclass(kw_only=True) +class ListToolsAction(Action): + """ + Action to request available tools from MCP servers. + + This action triggers a tools/list call to all configured MCP servers, + returning their tool schemas in the observation. + """ + + pass + + +@dataclass(kw_only=True) +class CallToolAction(Action): + """ + Action to call a specific tool via MCP. + + Triggers a tools/call request to the appropriate MCP server. + """ + + tool_name: str + parameters: Dict[str, Any] = field(default_factory=dict) + + @dataclass(kw_only=True) class Observation: """Base class for all environment observations.""" @@ -28,6 +52,30 @@ class Observation: metadata: Dict[str, Any] = field(default_factory=dict) +@dataclass(kw_only=True) +class ListToolsObservation(Observation): + """ + Observation returned from ListToolsAction. + + Contains the list of available tools with their schemas. + """ + + tools: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass(kw_only=True) +class CallToolObservation(Observation): + """ + Observation returned from CallToolAction. + + Contains the result of calling a tool, or an error if the call failed. + """ + + result: Optional[Any] = None + error: Optional[str] = None + tool_name: Optional[str] = None + + @dataclass class State: """Base class for environment state.""" diff --git a/src/core/pyproject.toml b/src/core/pyproject.toml index 39576bba..bb7103ae 100644 --- a/src/core/pyproject.toml +++ b/src/core/pyproject.toml @@ -19,6 +19,8 @@ dependencies = [ "pydantic>=2.0.0", "uvicorn[standard]>=0.24.0", "requests>=2.25.0", + "mcp>=1.0.0", + "fastmcp>=0.1.0", ] [project.optional-dependencies] diff --git a/src/envs/echo_env/__init__.py b/src/envs/echo_env/__init__.py index 6da62ba4..a8700f15 100644 --- a/src/envs/echo_env/__init__.py +++ b/src/envs/echo_env/__init__.py @@ -7,6 +7,5 @@ """Echo Environment - A simple test environment for HTTP server.""" from .client import EchoEnv -from .models import EchoAction, EchoObservation -__all__ = ["EchoAction", "EchoObservation", "EchoEnv"] +__all__ = ["EchoEnv"] diff --git a/src/envs/echo_env/client.py b/src/envs/echo_env/client.py index d8d1615f..e49e2185 100644 --- a/src/envs/echo_env/client.py +++ b/src/envs/echo_env/client.py @@ -5,86 +5,114 @@ # LICENSE file in the root directory of this source tree. """ -Echo Environment HTTP Client. +Echo Environment HTTP Client (MCP-based). This module provides the client for connecting to an Echo Environment server -over HTTP. +over HTTP using MCP actions. """ -from typing import Any, Dict +from typing import Dict -# Support both in-repo and standalone imports try: - # In-repo imports (when running from OpenEnv repository) from core.client_types import StepResult - from core.env_server.types import State + from core.env_server.types import ( + CallToolAction, + CallToolObservation, + ListToolsObservation, + Observation, + State, + ) from core.http_env_client import HTTPEnvClient - from .models import EchoAction, EchoObservation except ImportError: - # Standalone imports (when environment is standalone with openenv-core from pip) from openenv_core.client_types import StepResult - from openenv_core.env_server.types import State + from openenv_core.env_server.types import ( + CallToolAction, + CallToolObservation, + ListToolsObservation, + Observation, + State, + ) from openenv_core.http_env_client import HTTPEnvClient - from models import EchoAction, EchoObservation -class EchoEnv(HTTPEnvClient[EchoAction, EchoObservation]): +class EchoEnv(HTTPEnvClient[CallToolAction, Observation]): """ - HTTP client for the Echo Environment. + HTTP client for the Echo Environment (MCP-based). This client connects to an EchoEnvironment HTTP server and provides - methods to interact with it: reset(), step(), and state access. + methods to interact with it using MCP actions. Example: + >>> from core.env_server.types import CallToolAction >>> # Connect to a running server >>> client = EchoEnv(base_url="http://localhost:8000") >>> result = client.reset() - >>> print(result.observation.echoed_message) >>> - >>> # Send a message - >>> result = client.step(EchoAction(message="Hello!")) - >>> print(result.observation.echoed_message) - >>> print(result.reward) + >>> # Call echo_message tool using step API + >>> action = CallToolAction(tool_name="echo_message", parameters={"message": "Hello!"}) + >>> result = client.step(action) + >>> print(result.observation.result) # {"echoed_message": "Hello!"} Example with Docker: + >>> from core.env_server.types import CallToolAction >>> # Automatically start container and connect >>> client = EchoEnv.from_docker_image("echo-env:latest") >>> result = client.reset() - >>> result = client.step(EchoAction(message="Test")) + >>> action = CallToolAction(tool_name="echo_message", parameters={"message": "Test"}) + >>> result = client.step(action) """ - def _step_payload(self, action: EchoAction) -> Dict: + def _step_payload(self, action: CallToolAction) -> Dict: """ - Convert EchoAction to JSON payload for step request. + Convert CallToolAction to JSON payload for step request. Args: - action: EchoAction instance + action: CallToolAction instance Returns: Dictionary representation suitable for JSON encoding """ return { - "message": action.message, + "type": "CallToolAction", + "tool_name": action.tool_name, + "parameters": action.parameters, } - def _parse_result(self, payload: Dict) -> StepResult[EchoObservation]: + def _parse_result(self, payload: Dict) -> StepResult[Observation]: """ - Parse server response into StepResult[EchoObservation]. + Parse server response into StepResult with typed Observation. Args: payload: JSON response from server Returns: - StepResult with EchoObservation + StepResult with typed Observation (ListToolsObservation or CallToolObservation) """ obs_data = payload.get("observation", {}) - observation = EchoObservation( - echoed_message=obs_data.get("echoed_message", ""), - message_length=obs_data.get("message_length", 0), - done=payload.get("done", False), - reward=payload.get("reward"), - metadata=obs_data.get("metadata", {}), - ) + + # Create appropriate typed observation based on fields present + if "tools" in obs_data: + observation = ListToolsObservation( + done=obs_data.get("done", False), + reward=obs_data.get("reward"), + metadata=obs_data.get("metadata", {}), + tools=obs_data.get("tools", []), + ) + elif "result" in obs_data or "error" in obs_data or "tool_name" in obs_data: + observation = CallToolObservation( + done=obs_data.get("done", False), + reward=obs_data.get("reward"), + metadata=obs_data.get("metadata", {}), + result=obs_data.get("result"), + error=obs_data.get("error"), + tool_name=obs_data.get("tool_name"), + ) + else: + observation = Observation( + done=obs_data.get("done", False), + reward=obs_data.get("reward"), + metadata=obs_data.get("metadata", {}), + ) return StepResult( observation=observation, diff --git a/src/envs/echo_env/models.py b/src/envs/echo_env/models.py deleted file mode 100644 index c962629b..00000000 --- a/src/envs/echo_env/models.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. - -""" -Data models for the Echo Environment. - -The Echo environment is a simple test environment that echoes back messages. -""" - -from dataclasses import dataclass - -# Support both in-repo and standalone imports -try: - # In-repo imports (when running from OpenEnv repository) - from core.env_server.types import Action, Observation -except ImportError: - # Standalone imports (when environment is standalone with openenv-core from pip) - from openenv_core.env_server.types import Action, Observation - - -@dataclass(kw_only=True) -class EchoAction(Action): - """Action for the Echo environment - just a message to echo.""" - - message: str - - -@dataclass(kw_only=True) -class EchoObservation(Observation): - """Observation from the Echo environment - the echoed message.""" - - echoed_message: str - message_length: int = 0 \ No newline at end of file diff --git a/src/envs/echo_env/server/app.py b/src/envs/echo_env/server/app.py index 83d22b5d..4f34dc14 100644 --- a/src/envs/echo_env/server/app.py +++ b/src/envs/echo_env/server/app.py @@ -5,10 +5,10 @@ # LICENSE file in the root directory of this source tree. """ -FastAPI application for the Echo Environment. +FastAPI application for the Echo Environment (MCP-based). This module creates an HTTP server that exposes the EchoEnvironment -over HTTP endpoints, making it compatible with HTTPEnvClient. +over HTTP endpoints with MCP tool support. Usage: # Development (with auto-reload): @@ -21,23 +21,24 @@ uv run --project . server """ -# Support both in-repo and standalone imports try: - # In-repo imports (when running from OpenEnv repository) - from core.env_server.http_server import create_app - from ..models import EchoAction, EchoObservation + from core.env_server import create_app + from core.env_server.types import Action, Observation + from .echo_environment import EchoEnvironment except ImportError: - # Standalone imports (when environment is standalone with openenv-core from pip) - from openenv_core.env_server.http_server import create_app - from models import EchoAction, EchoObservation + from openenv_core.env_server import create_app + from openenv_core.env_server.types import Action, Observation from server.echo_environment import EchoEnvironment -# Create the environment instance + +# Create the environment instance (MCP client is configured internally) env = EchoEnvironment() -# Create the app with web interface and README integration -app = create_app(env, EchoAction, EchoObservation, env_name="echo_env") +# Create the FastAPI app +# Note: We use Action and Observation base classes since echo_env +# uses MCP actions (ListToolsAction, CallToolAction) instead of custom types +app = create_app(env, Action, Observation) def main(): @@ -48,7 +49,6 @@ def main(): uv run --project . server python -m envs.echo_env.server.app openenv serve echo_env - """ import uvicorn diff --git a/src/envs/echo_env/server/echo_environment.py b/src/envs/echo_env/server/echo_environment.py index 53b383af..e652adcd 100644 --- a/src/envs/echo_env/server/echo_environment.py +++ b/src/envs/echo_env/server/echo_environment.py @@ -5,98 +5,39 @@ # LICENSE file in the root directory of this source tree. """ -Echo Environment Implementation. +Echo Environment Implementation (MCP-based). -A simple test environment that echoes back messages sent to it. -Perfect for testing HTTP server infrastructure. +A simple test environment that echoes back messages via MCP tools. +Perfect for testing HTTP server and MCP infrastructure. """ -from uuid import uuid4 - -# Support both in-repo and standalone imports try: - # In-repo imports (when running from OpenEnv repository) - from core.env_server.interfaces import Environment - from core.env_server.types import State - from ..models import EchoAction, EchoObservation + from core.env_server import MCPEnvironment except ImportError: - # Standalone imports (when environment is standalone with openenv-core from pip) - from openenv_core.env_server.interfaces import Environment - from openenv_core.env_server.types import State - from models import EchoAction, EchoObservation + from openenv_core.env_server import MCPEnvironment + +from .mcp_server import mcp -class EchoEnvironment(Environment): +class EchoEnvironment(MCPEnvironment): """ - A simple echo environment that echoes back messages. + A simple echo environment that echoes back messages via MCP tools. - This environment is designed for testing the HTTP server infrastructure. - It maintains minimal state and simply echoes back whatever message it receives. + This environment demonstrates the simplified MCP integration pattern. + All functionality is defined in mcp_server.py using FastMCP decorators, + and MCPEnvironment handles the rest. Example: + >>> from envs.echo_env.server import EchoEnvironment + >>> from core.env_server import create_fastapi_app + >>> from core.env_server.types import Action, Observation + >>> >>> env = EchoEnvironment() - >>> obs = env.reset() - >>> print(obs.echoed_message) # "Echo environment ready!" + >>> app = create_fastapi_app(env, Action, Observation) >>> - >>> obs = env.step(EchoAction(message="Hello")) - >>> print(obs.echoed_message) # "Hello" - >>> print(obs.message_length) # 5 + >>> # Run with: uvicorn app:app --port 8000 """ def __init__(self): - """Initialize the echo environment.""" - self._state = State(episode_id=str(uuid4()), step_count=0) - self._reset_count = 0 - - def reset(self) -> EchoObservation: - """ - Reset the environment. - - Returns: - EchoObservation with a ready message - """ - self._state = State(episode_id=str(uuid4()), step_count=0) - self._reset_count += 1 - - return EchoObservation( - echoed_message="Echo environment ready!", - message_length=0, - done=False, - reward=0.0, - ) - - def step(self, action: EchoAction) -> EchoObservation: # type: ignore[override] - """ - Execute a step in the environment by echoing the message. - - Args: - action: EchoAction containing the message to echo - - Returns: - EchoObservation with the echoed message and its length - """ - self._state.step_count += 1 - - message = action.message - length = len(message) - - # Simple reward: longer messages get higher rewards - reward = length * 0.1 - - return EchoObservation( - echoed_message=message, - message_length=length, - done=False, - reward=reward, - metadata={"original_message": message, "step": self._state.step_count}, - ) - - @property - def state(self) -> State: - """ - Get the current environment state. - - Returns: - Current State with episode_id and step_count - """ - return self._state + """Initialize the echo environment with the MCP server.""" + super().__init__(mcp_server=mcp) diff --git a/src/envs/echo_env/server/mcp_server.py b/src/envs/echo_env/server/mcp_server.py new file mode 100644 index 00000000..853eab91 --- /dev/null +++ b/src/envs/echo_env/server/mcp_server.py @@ -0,0 +1,33 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +MCP server for Echo Environment. + +This module defines the MCP tools exposed by the Echo environment. +Developers can add new tools by simply decorating functions with @mcp.tool. +""" + +from fastmcp import FastMCP + +mcp = FastMCP("echo_env") + + +@mcp.tool() +def echo_message(message: str) -> dict: + """ + Echo a message back with metadata. + + Args: + message: The message to echo + + Returns: + Dictionary containing the echoed message and reward + """ + return { + "echoed_message": message, + "reward": len(message) * 0.1, + } diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/mcp/__init__.py b/tests/core/mcp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/mcp/test_mcp.py b/tests/core/mcp/test_mcp.py new file mode 100644 index 00000000..cb1b3d6c --- /dev/null +++ b/tests/core/mcp/test_mcp.py @@ -0,0 +1,67 @@ +""" +Minimal tests for MCP infrastructure. + +Tests the core MCP client/server integration and echo_env as reference. +""" + +import pytest + +from core.env_server.types import CallToolAction, ListToolsAction +from fastmcp import Client, FastMCP + + +@pytest.mark.asyncio +async def test_mcp_client_with_local_server(): + """Test FastMCP Client can list and call tools on a local FastMCP server.""" + # Create a simple MCP server + mcp = FastMCP("test_server") + + @mcp.tool + async def add(a: int, b: int) -> int: + """Add two numbers.""" + return a + b + + # Create client connected to server (in-memory) + client = Client(mcp) + + async with client: + # Test list_tools + tools = await client.list_tools() + assert len(tools) == 1 + assert tools[0].name == "add" + assert "Add two numbers" in tools[0].description + + # Test call_tool + result = await client.call_tool("add", {"a": 5, "b": 3}) + # FastMCP returns the raw result from the function + assert result == 8 + + +@pytest.mark.asyncio +async def test_echo_env_mcp_integration(): + """Test echo_env works with MCP actions (ListToolsAction, CallToolAction).""" + from envs.echo_env.server.echo_environment import EchoEnvironment + + # Setup echo environment (MCPEnvironment handles MCP setup automatically) + env = EchoEnvironment() + + # Test ListToolsAction + list_action = ListToolsAction() + obs = await env._handle_mcp_action(list_action) + assert not obs.done + assert hasattr(obs, "tools") + assert len(obs.tools) == 1 + assert obs.tools[0]["name"] == "echo_message" + + # Test CallToolAction + call_action = CallToolAction( + tool_name="echo_message", parameters={"message": "Hello MCP"} + ) + obs = await env._handle_mcp_action(call_action) + assert not obs.done + assert hasattr(obs, "result") + assert obs.error is None + # Result is the dict returned by echo_message tool + result = obs.result + assert isinstance(result, dict) + assert result["echoed_message"] == "Hello MCP"