612 lines
19 KiB
Python
612 lines
19 KiB
Python
"""
|
|
Comprehensive Integration Tests for MCP Client
|
|
|
|
This module contains end-to-end integration tests for the MCP client,
|
|
testing real server connections, tool calling, and AI provider integration.
|
|
"""
|
|
import pytest
|
|
import asyncio
|
|
import subprocess
|
|
import time
|
|
import signal
|
|
import os
|
|
import sys
|
|
from typing import Optional, Dict, Any, List
|
|
from unittest.mock import Mock, patch
|
|
import httpx
|
|
|
|
# Add src to path for imports
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
|
|
|
|
from mcp_llm_client import MCPAIClient, TransportType
|
|
from mcp_template.llm_client.client_factory import AIClientFactory
|
|
|
|
|
|
class MCPServerProcess:
|
|
"""Helper class to manage MCP server process for testing"""
|
|
|
|
def __init__(self, transport: str = "sse", port: int = 8051):
|
|
self.transport = transport
|
|
self.port = port
|
|
self.process: Optional[subprocess.Popen] = None
|
|
self.url = f"http://localhost:{port}"
|
|
|
|
async def start(self):
|
|
"""Start the MCP server process"""
|
|
cmd = [
|
|
sys.executable,
|
|
"run_mcp_server.py",
|
|
"--transport", self.transport,
|
|
"--port", str(self.port)
|
|
]
|
|
|
|
self.process = subprocess.Popen(
|
|
cmd,
|
|
cwd=os.path.join(os.path.dirname(__file__), '..', '..'),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
# Wait for server to be ready
|
|
await self._wait_for_server()
|
|
return self
|
|
|
|
async def _wait_for_server(self, timeout: int = 30):
|
|
"""Wait for server to be ready"""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
if self.transport == "sse":
|
|
response = await client.get(f"{self.url}/sse", timeout=5.0)
|
|
if response.status_code == 200:
|
|
return
|
|
else:
|
|
# For stdio, just wait a bit
|
|
await asyncio.sleep(2)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
raise TimeoutError(f"Server did not start within {timeout} seconds")
|
|
|
|
async def stop(self):
|
|
"""Stop the MCP server process"""
|
|
if self.process:
|
|
try:
|
|
self.process.terminate()
|
|
self.process.wait(timeout=10)
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill()
|
|
self.process.wait()
|
|
|
|
|
|
@pytest.fixture
|
|
async def sse_server():
|
|
"""Fixture to start and stop SSE MCP server"""
|
|
server = MCPServerProcess(transport="sse", port=8051)
|
|
await server.start()
|
|
yield server
|
|
await server.stop()
|
|
|
|
|
|
@pytest.fixture
|
|
async def stdio_server():
|
|
"""Fixture to start and stop STDIO MCP server"""
|
|
server = MCPServerProcess(transport="stdio", port=8052)
|
|
await server.start()
|
|
yield server
|
|
await server.stop()
|
|
|
|
|
|
class TestMCPClientIntegration:
|
|
"""Comprehensive integration tests for MCP client"""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_sse_transport_connection(self, sse_server):
|
|
"""Test MCP client can connect to server using SSE transport"""
|
|
# Skip if no OpenAI API key
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1,
|
|
max_tokens=100
|
|
)
|
|
|
|
try:
|
|
# Test connection
|
|
await client.connect(sse_server.url)
|
|
|
|
# Test getting tools
|
|
tools = await client.get_mcp_tools()
|
|
assert isinstance(tools, list)
|
|
assert len(tools) > 0
|
|
|
|
# Verify tool structure
|
|
for tool in tools:
|
|
assert "name" in tool
|
|
assert "description" in tool
|
|
assert "inputSchema" in tool
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_stdio_transport_connection(self):
|
|
"""Test MCP client can connect using STDIO transport"""
|
|
# Skip if no OpenAI API key
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.STDIO,
|
|
provider="openai",
|
|
temperature=0.1,
|
|
max_tokens=100
|
|
)
|
|
|
|
try:
|
|
# For STDIO, we need to provide server command
|
|
await client.connect_stdio(
|
|
server_command=[sys.executable, "run_mcp_server.py", "--transport", "stdio"]
|
|
)
|
|
|
|
# Test getting tools
|
|
tools = await client.get_mcp_tools()
|
|
assert isinstance(tools, list)
|
|
assert len(tools) > 0
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.slow
|
|
@pytest.mark.asyncio
|
|
async def test_end_to_end_tool_calling_sse(self, sse_server):
|
|
"""Test complete end-to-end tool calling with SSE transport"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1,
|
|
max_tokens=200
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Test a simple mathematical query that should trigger tool calls
|
|
query = "Calculate 15 + 27 and then multiply the result by 2"
|
|
response = await client.process_query(query)
|
|
|
|
assert isinstance(response, str)
|
|
assert len(response) > 0
|
|
|
|
# The response should contain the calculation result
|
|
# We can't predict exact wording but should contain numbers
|
|
assert any(char.isdigit() for char in response)
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_provider_support(self, sse_server):
|
|
"""Test MCP client works with different AI providers"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
providers_to_test = ["openai"]
|
|
|
|
for provider in providers_to_test:
|
|
client = MCPAIClient(
|
|
model="gpt-4o" if provider == "openai" else "claude-3-opus-20240229",
|
|
transport=TransportType.SSE,
|
|
provider=provider,
|
|
temperature=0.1,
|
|
max_tokens=100
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Test basic tool listing
|
|
tools = await client.get_mcp_tools()
|
|
assert len(tools) > 0
|
|
|
|
# Test simple query
|
|
response = await client.process_query("What tools are available?")
|
|
assert isinstance(response, str)
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_error_handling_connection_failure(self):
|
|
"""Test error handling when server connection fails"""
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai"
|
|
)
|
|
|
|
# Try to connect to non-existent server
|
|
with pytest.raises(Exception):
|
|
await client.connect("http://localhost:9999")
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_error_handling_invalid_query(self, sse_server):
|
|
"""Test error handling with invalid queries"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Test with empty query
|
|
response = await client.process_query("")
|
|
assert isinstance(response, str)
|
|
|
|
# Test with very long query
|
|
long_query = "test " * 1000
|
|
response = await client.process_query(long_query)
|
|
assert isinstance(response, str)
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_interactive_session_mode(self, sse_server):
|
|
"""Test interactive session functionality"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1,
|
|
max_tokens=100
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Mock user inputs for interactive session
|
|
inputs = ["Calculate 5 + 3", "quit"]
|
|
|
|
with patch('builtins.input', side_effect=inputs):
|
|
# This would normally run an interactive loop
|
|
# For testing, we'll just verify the client is ready
|
|
assert client.session is not None
|
|
assert client.ai_client is not None
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_tool_schema_validation(self, sse_server):
|
|
"""Test that tool schemas are properly formatted for AI providers"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai"
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
tools = await client.get_mcp_tools()
|
|
|
|
# Verify OpenAI-specific tool formatting
|
|
for tool in tools:
|
|
assert "type" in tool
|
|
assert tool["type"] == "function"
|
|
assert "function" in tool
|
|
assert "name" in tool["function"]
|
|
assert "description" in tool["function"]
|
|
assert "parameters" in tool["function"]
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.slow
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_connections(self, sse_server):
|
|
"""Test multiple clients can connect simultaneously"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
async def create_and_test_client(client_id: int):
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
tools = await client.get_mcp_tools()
|
|
return len(tools)
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
# Test 3 concurrent connections
|
|
tasks = [create_and_test_client(i) for i in range(3)]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# All should succeed and return same number of tools
|
|
assert all(r > 0 for r in results)
|
|
assert len(set(results)) == 1 # All should return same count
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.asyncio
|
|
async def test_configuration_parameters(self):
|
|
"""Test various configuration parameters work correctly"""
|
|
# Test different temperature settings
|
|
for temp in [0.1, 0.5, 0.9]:
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=temp,
|
|
max_tokens=100
|
|
)
|
|
|
|
assert client.temperature == temp
|
|
assert client.ai_client.temperature == temp
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_resource_access(self, sse_server):
|
|
"""Test accessing MCP resources"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai"
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Test listing resources
|
|
resources = await client.session.list_resources()
|
|
assert isinstance(resources.resources, list)
|
|
|
|
# Test reading resources if any exist
|
|
if resources.resources:
|
|
for resource in resources.resources[:2]: # Test first 2 resources
|
|
content = await client.session.read_resource(resource.uri)
|
|
assert content is not None
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_prompt_access(self, sse_server):
|
|
"""Test accessing MCP prompts"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai"
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Test listing prompts
|
|
prompts = await client.session.list_prompts()
|
|
assert isinstance(prompts.prompts, list)
|
|
|
|
# Test getting prompts if any exist
|
|
if prompts.prompts:
|
|
for prompt in prompts.prompts[:2]: # Test first 2 prompts
|
|
prompt_content = await client.session.get_prompt(prompt.name)
|
|
assert prompt_content is not None
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
|
|
class TestMCPClientUtilities:
|
|
"""Test utility functions and edge cases"""
|
|
|
|
@pytest.mark.unit
|
|
def test_transport_type_enum(self):
|
|
"""Test TransportType enum values"""
|
|
assert TransportType.SSE.value == "sse"
|
|
assert TransportType.STDIO.value == "stdio"
|
|
|
|
@pytest.mark.unit
|
|
def test_client_initialization_validation(self):
|
|
"""Test client initialization with various parameters"""
|
|
# Test with minimal parameters
|
|
client = MCPAIClient()
|
|
assert client.model == "gpt-4o"
|
|
assert client.provider == "openai"
|
|
assert client.transport == TransportType.SSE
|
|
|
|
# Test with custom parameters
|
|
client = MCPAIClient(
|
|
model="gpt-4o-mini",
|
|
provider="openai",
|
|
transport=TransportType.STDIO,
|
|
temperature=0.5,
|
|
max_tokens=500,
|
|
top_p=0.9
|
|
)
|
|
|
|
assert client.model == "gpt-4o-mini"
|
|
assert client.provider == "openai"
|
|
assert client.transport == TransportType.STDIO
|
|
assert client.temperature == 0.5
|
|
assert client.max_tokens == 500
|
|
|
|
@pytest.mark.unit
|
|
@patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"})
|
|
def test_ai_client_factory_integration(self):
|
|
"""Test that AI client factory creates correct client types"""
|
|
# Test OpenAI client creation
|
|
openai_client = AIClientFactory.create_client(
|
|
provider="openai",
|
|
model_name="gpt-4o",
|
|
temperature=0.5
|
|
)
|
|
|
|
assert openai_client is not None
|
|
assert hasattr(openai_client, 'chat_completion')
|
|
assert hasattr(openai_client, '_format_tools_for_provider')
|
|
|
|
@pytest.mark.unit
|
|
def test_missing_llm_client_handling(self):
|
|
"""Test behavior when LLM client is not available"""
|
|
with patch('mcp_llm_client.LLM_CLIENT_AVAILABLE', False):
|
|
with pytest.raises(ImportError, match="LLM client not available"):
|
|
MCPAIClient()
|
|
|
|
|
|
class TestPerformanceAndLoad:
|
|
"""Performance and load testing for MCP client"""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.slow
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_rapid_queries(self, sse_server):
|
|
"""Test handling multiple rapid queries"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1,
|
|
max_tokens=50
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
queries = [
|
|
"What is 2 + 2?",
|
|
"Calculate 10 * 5",
|
|
"What tools do you have?",
|
|
"Test query 4"
|
|
]
|
|
|
|
# Execute queries concurrently
|
|
tasks = [client.process_query(query) for query in queries]
|
|
responses = await asyncio.gather(*tasks)
|
|
|
|
# Verify all responses
|
|
assert len(responses) == len(queries)
|
|
assert all(isinstance(r, str) for r in responses)
|
|
assert all(len(r) > 0 for r in responses)
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_api_key
|
|
@pytest.mark.asyncio
|
|
async def test_connection_reuse(self, sse_server):
|
|
"""Test reusing connection for multiple operations"""
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
pytest.skip("OpenAI API key not available")
|
|
|
|
client = MCPAIClient(
|
|
model="gpt-4o",
|
|
transport=TransportType.SSE,
|
|
provider="openai",
|
|
temperature=0.1
|
|
)
|
|
|
|
try:
|
|
await client.connect(sse_server.url)
|
|
|
|
# Perform multiple operations on same connection
|
|
for i in range(5):
|
|
tools = await client.get_mcp_tools()
|
|
assert len(tools) > 0
|
|
|
|
response = await client.process_query(f"Test query {i}")
|
|
assert isinstance(response, str)
|
|
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
|
|
# Cleanup fixture to ensure no processes are left running
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
async def cleanup_processes():
|
|
"""Clean up any remaining MCP server processes"""
|
|
yield
|
|
|
|
# Kill any remaining MCP server processes
|
|
try:
|
|
# Find and kill any remaining server processes
|
|
result = subprocess.run(
|
|
["pgrep", "-f", "run_mcp_server.py"],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
pids = result.stdout.strip().split('\n')
|
|
for pid in pids:
|
|
if pid:
|
|
try:
|
|
os.kill(int(pid), signal.SIGTERM)
|
|
except (ProcessLookupError, OSError):
|
|
pass # Process already dead
|
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
pass # pgrep not available or no processes found
|