""" Comprehensive Integration Tests for MCP Client This module contains end-to-end integration tests for the MCP client, testing real server connections, tool calling, and AI provider integration. """ import pytest import asyncio import subprocess import time import signal import os import sys from typing import Optional, Dict, Any, List from unittest.mock import Mock, patch import httpx # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src')) from mcp_llm_client import MCPAIClient, TransportType from mcp_template.llm_client.client_factory import AIClientFactory class MCPServerProcess: """Helper class to manage MCP server process for testing""" def __init__(self, transport: str = "sse", port: int = 8051): self.transport = transport self.port = port self.process: Optional[subprocess.Popen] = None self.url = f"http://localhost:{port}" async def start(self): """Start the MCP server process""" cmd = [ sys.executable, "run_mcp_server.py", "--transport", self.transport, "--port", str(self.port) ] self.process = subprocess.Popen( cmd, cwd=os.path.join(os.path.dirname(__file__), '..', '..'), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for server to be ready await self._wait_for_server() return self async def _wait_for_server(self, timeout: int = 30): """Wait for server to be ready""" start_time = time.time() while time.time() - start_time < timeout: try: async with httpx.AsyncClient() as client: if self.transport == "sse": response = await client.get(f"{self.url}/sse", timeout=5.0) if response.status_code == 200: return else: # For stdio, just wait a bit await asyncio.sleep(2) return except Exception: pass await asyncio.sleep(1) raise TimeoutError(f"Server did not start within {timeout} seconds") async def stop(self): """Stop the MCP server process""" if self.process: try: self.process.terminate() self.process.wait(timeout=10) except subprocess.TimeoutExpired: self.process.kill() self.process.wait() @pytest.fixture async def sse_server(): """Fixture to start and stop SSE MCP server""" server = MCPServerProcess(transport="sse", port=8051) await server.start() yield server await server.stop() @pytest.fixture async def stdio_server(): """Fixture to start and stop STDIO MCP server""" server = MCPServerProcess(transport="stdio", port=8052) await server.start() yield server await server.stop() class TestMCPClientIntegration: """Comprehensive integration tests for MCP client""" @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_sse_transport_connection(self, sse_server): """Test MCP client can connect to server using SSE transport""" # Skip if no OpenAI API key if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1, max_tokens=100 ) try: # Test connection await client.connect(sse_server.url) # Test getting tools tools = await client.get_mcp_tools() assert isinstance(tools, list) assert len(tools) > 0 # Verify tool structure for tool in tools: assert "name" in tool assert "description" in tool assert "inputSchema" in tool finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_stdio_transport_connection(self): """Test MCP client can connect using STDIO transport""" # Skip if no OpenAI API key if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.STDIO, provider="openai", temperature=0.1, max_tokens=100 ) try: # For STDIO, we need to provide server command await client.connect_stdio( server_command=[sys.executable, "run_mcp_server.py", "--transport", "stdio"] ) # Test getting tools tools = await client.get_mcp_tools() assert isinstance(tools, list) assert len(tools) > 0 finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.slow @pytest.mark.asyncio async def test_end_to_end_tool_calling_sse(self, sse_server): """Test complete end-to-end tool calling with SSE transport""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1, max_tokens=200 ) try: await client.connect(sse_server.url) # Test a simple mathematical query that should trigger tool calls query = "Calculate 15 + 27 and then multiply the result by 2" response = await client.process_query(query) assert isinstance(response, str) assert len(response) > 0 # The response should contain the calculation result # We can't predict exact wording but should contain numbers assert any(char.isdigit() for char in response) finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_multiple_provider_support(self, sse_server): """Test MCP client works with different AI providers""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") providers_to_test = ["openai"] for provider in providers_to_test: client = MCPAIClient( model="gpt-4o" if provider == "openai" else "claude-3-opus-20240229", transport=TransportType.SSE, provider=provider, temperature=0.1, max_tokens=100 ) try: await client.connect(sse_server.url) # Test basic tool listing tools = await client.get_mcp_tools() assert len(tools) > 0 # Test simple query response = await client.process_query("What tools are available?") assert isinstance(response, str) finally: await client.disconnect() @pytest.mark.integration @pytest.mark.asyncio async def test_error_handling_connection_failure(self): """Test error handling when server connection fails""" client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai" ) # Try to connect to non-existent server with pytest.raises(Exception): await client.connect("http://localhost:9999") @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_error_handling_invalid_query(self, sse_server): """Test error handling with invalid queries""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1 ) try: await client.connect(sse_server.url) # Test with empty query response = await client.process_query("") assert isinstance(response, str) # Test with very long query long_query = "test " * 1000 response = await client.process_query(long_query) assert isinstance(response, str) finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_interactive_session_mode(self, sse_server): """Test interactive session functionality""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1, max_tokens=100 ) try: await client.connect(sse_server.url) # Mock user inputs for interactive session inputs = ["Calculate 5 + 3", "quit"] with patch('builtins.input', side_effect=inputs): # This would normally run an interactive loop # For testing, we'll just verify the client is ready assert client.session is not None assert client.ai_client is not None finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_tool_schema_validation(self, sse_server): """Test that tool schemas are properly formatted for AI providers""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai" ) try: await client.connect(sse_server.url) tools = await client.get_mcp_tools() # Verify OpenAI-specific tool formatting for tool in tools: assert "type" in tool assert tool["type"] == "function" assert "function" in tool assert "name" in tool["function"] assert "description" in tool["function"] assert "parameters" in tool["function"] finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.slow @pytest.mark.asyncio async def test_concurrent_connections(self, sse_server): """Test multiple clients can connect simultaneously""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") async def create_and_test_client(client_id: int): client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1 ) try: await client.connect(sse_server.url) tools = await client.get_mcp_tools() return len(tools) finally: await client.disconnect() # Test 3 concurrent connections tasks = [create_and_test_client(i) for i in range(3)] results = await asyncio.gather(*tasks) # All should succeed and return same number of tools assert all(r > 0 for r in results) assert len(set(results)) == 1 # All should return same count @pytest.mark.integration @pytest.mark.asyncio async def test_configuration_parameters(self): """Test various configuration parameters work correctly""" # Test different temperature settings for temp in [0.1, 0.5, 0.9]: client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=temp, max_tokens=100 ) assert client.temperature == temp assert client.ai_client.temperature == temp @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_resource_access(self, sse_server): """Test accessing MCP resources""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai" ) try: await client.connect(sse_server.url) # Test listing resources resources = await client.session.list_resources() assert isinstance(resources.resources, list) # Test reading resources if any exist if resources.resources: for resource in resources.resources[:2]: # Test first 2 resources content = await client.session.read_resource(resource.uri) assert content is not None finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_prompt_access(self, sse_server): """Test accessing MCP prompts""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai" ) try: await client.connect(sse_server.url) # Test listing prompts prompts = await client.session.list_prompts() assert isinstance(prompts.prompts, list) # Test getting prompts if any exist if prompts.prompts: for prompt in prompts.prompts[:2]: # Test first 2 prompts prompt_content = await client.session.get_prompt(prompt.name) assert prompt_content is not None finally: await client.disconnect() class TestMCPClientUtilities: """Test utility functions and edge cases""" @pytest.mark.unit def test_transport_type_enum(self): """Test TransportType enum values""" assert TransportType.SSE.value == "sse" assert TransportType.STDIO.value == "stdio" @pytest.mark.unit def test_client_initialization_validation(self): """Test client initialization with various parameters""" # Test with minimal parameters client = MCPAIClient() assert client.model == "gpt-4o" assert client.provider == "openai" assert client.transport == TransportType.SSE # Test with custom parameters client = MCPAIClient( model="gpt-4o-mini", provider="openai", transport=TransportType.STDIO, temperature=0.5, max_tokens=500, top_p=0.9 ) assert client.model == "gpt-4o-mini" assert client.provider == "openai" assert client.transport == TransportType.STDIO assert client.temperature == 0.5 assert client.max_tokens == 500 @pytest.mark.unit @patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}) def test_ai_client_factory_integration(self): """Test that AI client factory creates correct client types""" # Test OpenAI client creation openai_client = AIClientFactory.create_client( provider="openai", model_name="gpt-4o", temperature=0.5 ) assert openai_client is not None assert hasattr(openai_client, 'chat_completion') assert hasattr(openai_client, '_format_tools_for_provider') @pytest.mark.unit def test_missing_llm_client_handling(self): """Test behavior when LLM client is not available""" with patch('mcp_llm_client.LLM_CLIENT_AVAILABLE', False): with pytest.raises(ImportError, match="LLM client not available"): MCPAIClient() class TestPerformanceAndLoad: """Performance and load testing for MCP client""" @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.slow @pytest.mark.asyncio async def test_multiple_rapid_queries(self, sse_server): """Test handling multiple rapid queries""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1, max_tokens=50 ) try: await client.connect(sse_server.url) queries = [ "What is 2 + 2?", "Calculate 10 * 5", "What tools do you have?", "Test query 4" ] # Execute queries concurrently tasks = [client.process_query(query) for query in queries] responses = await asyncio.gather(*tasks) # Verify all responses assert len(responses) == len(queries) assert all(isinstance(r, str) for r in responses) assert all(len(r) > 0 for r in responses) finally: await client.disconnect() @pytest.mark.integration @pytest.mark.requires_api_key @pytest.mark.asyncio async def test_connection_reuse(self, sse_server): """Test reusing connection for multiple operations""" if not os.getenv("OPENAI_API_KEY"): pytest.skip("OpenAI API key not available") client = MCPAIClient( model="gpt-4o", transport=TransportType.SSE, provider="openai", temperature=0.1 ) try: await client.connect(sse_server.url) # Perform multiple operations on same connection for i in range(5): tools = await client.get_mcp_tools() assert len(tools) > 0 response = await client.process_query(f"Test query {i}") assert isinstance(response, str) finally: await client.disconnect() # Cleanup fixture to ensure no processes are left running @pytest.fixture(scope="session", autouse=True) async def cleanup_processes(): """Clean up any remaining MCP server processes""" yield # Kill any remaining MCP server processes try: # Find and kill any remaining server processes result = subprocess.run( ["pgrep", "-f", "run_mcp_server.py"], capture_output=True, text=True ) if result.returncode == 0: pids = result.stdout.strip().split('\n') for pid in pids: if pid: try: os.kill(int(pid), signal.SIGTERM) except (ProcessLookupError, OSError): pass # Process already dead except (subprocess.SubprocessError, FileNotFoundError): pass # pgrep not available or no processes found