Files
2025-09-11 23:13:58 +01:00

256 lines
8.7 KiB
Python

"""
End-to-end tests for complete MCP workflow
"""
import pytest
import asyncio
import tempfile
import os
from pathlib import Path
from unittest.mock import Mock, AsyncMock, patch
from mcp_template.core.types import MCPTool, MCPServerConfig, TransportType
from mcp_template.server.fastmcp_server import FastMCPServer
from mcp_template.llm_client.openai_client import OpenAIClient
from mcp_template.config.config_manager import ConfigManager
class TestEndToEnd:
"""End-to-end tests for complete MCP workflow"""
@pytest.fixture
async def knowledge_base_server(self):
"""Create a knowledge base server for e2e testing"""
kb_data = {
"company_policy": "Our company offers 20 days of paid vacation annually.",
"benefits": "We provide health insurance, 401k matching, and flexible work hours.",
"faq": "Common questions about our policies and procedures."
}
async def get_kb():
formatted = "Knowledge Base:\n\n"
for key, value in kb_data.items():
formatted += f"**{key.replace('_', ' ').title()}:**\n{value}\n\n"
return formatted
async def search_kb(query: str):
query_lower = query.lower()
results = []
for key, value in kb_data.items():
if query_lower in key.lower() or query_lower in value.lower():
results.append(f"**{key.replace('_', ' ').title()}:**\n{value}")
if not results:
return f"No information found for: {query}"
return "\n\n".join(results)
tools = [
MCPTool(
name="get_knowledge_base",
description="Retrieve the entire knowledge base",
input_schema={"type": "object", "properties": {}},
handler=get_kb
),
MCPTool(
name="search_kb",
description="Search the knowledge base for specific information",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
},
handler=search_kb
)
]
config = MCPServerConfig(
name="Knowledge Base Server",
tools=tools
)
server = FastMCPServer(config)
await server.initialize()
return server
@pytest.fixture
def mock_openai_response(self):
"""Mock OpenAI API response with tool calls"""
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "I'll search the knowledge base for vacation policy information.",
"tool_calls": [
{
"id": "call_123",
"function": {
"name": "search_kb",
"arguments": '{"query": "vacation"}'
}
}
]
}
}
]
}
@patch('src.clients.openai_client.AsyncOpenAI')
@pytest.mark.asyncio
async def test_complete_mcp_workflow(self, mock_openai_class, knowledge_base_server, mock_openai_response):
"""Test complete MCP workflow from query to response"""
# Mock OpenAI client setup
mock_client = Mock()
mock_response = Mock()
mock_response.choices = [
Mock(message=Mock(
role="assistant",
content="I'll search the knowledge base for vacation policy information.",
tool_calls=[
Mock(
id="call_123",
function=Mock(
name="search_kb",
arguments='{"query": "vacation"}'
)
)
]
))
]
# Second call response (after tool execution)
mock_final_response = Mock()
mock_final_response.choices = [
Mock(message=Mock(
content="Based on the knowledge base, our company offers 20 days of paid vacation annually."
))
]
mock_client.chat.completions.create = AsyncMock(side_effect=[mock_response, mock_final_response])
mock_openai_class.return_value = mock_client
# Create AI client
ai_client = OpenAIClient("gpt-4o", "test-key")
await ai_client.initialize()
# Create mock MCP client that uses the real server
class MockMCPClient:
def __init__(self, server):
self.server = server
async def call_tool(self, name, arguments):
return await self.server.call_tool(name, arguments)
async def list_tools(self):
return await self.server.list_tools()
mcp_client = MockMCPClient(knowledge_base_server)
# Test the complete workflow
query = "What's the company vacation policy?"
response = await ai_client.process_with_tools(query, [], mcp_client)
# Verify the workflow
assert "vacation" in response.lower() or "20 days" in response
# Verify OpenAI was called twice (initial + after tool execution)
assert mock_client.chat.completions.create.call_count == 2
# Verify the tool was called correctly
tool_result = await mcp_client.call_tool("search_kb", {"query": "vacation"})
assert "vacation" in tool_result.lower()
@pytest.mark.asyncio
async def test_configuration_workflow(self, knowledge_base_server):
"""Test configuration loading and usage"""
# Create temporary config file
config_data = {
"server": {
"name": "Test Server",
"port": 8080,
"transport": "sse"
},
"client": {
"provider": "openai",
"model": "gpt-4o"
},
"api_keys": {
"openai": "test-key"
}
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
import json
json.dump(config_data, f)
config_file = f.name
try:
# Test config manager
config_manager = ConfigManager(config_file)
server_config = await config_manager.get_server_config()
assert server_config["name"] == "Test Server"
assert server_config["port"] == 8080
client_config = await config_manager.get_client_config()
assert client_config["provider"] == "openai"
assert client_config["model"] == "gpt-4o"
api_key = await config_manager.get_api_key("openai")
assert api_key == "test-key"
finally:
# Clean up
os.unlink(config_file)
@pytest.mark.asyncio
async def test_error_recovery(self, knowledge_base_server):
"""Test error recovery in MCP operations"""
# Test with invalid tool call
try:
await knowledge_base_server.call_tool("nonexistent_tool", {})
assert False, "Should have raised ValueError"
except ValueError as e:
assert "not found" in str(e).lower()
# Test with valid tool but invalid arguments
try:
await knowledge_base_server.call_tool("search_kb", {"invalid": "args"})
assert False, "Should have raised error"
except Exception:
# Should handle gracefully
pass
# Verify server still works after errors
result = await knowledge_base_server.call_tool("get_knowledge_base", {})
assert "Knowledge Base" in result
@pytest.mark.asyncio
async def test_multiple_queries_workflow(self, knowledge_base_server):
"""Test multiple queries in sequence"""
queries = [
"vacation policy",
"benefits",
"faq"
]
for query in queries:
# Direct tool call (simulating what AI would do)
result = await knowledge_base_server.call_tool("search_kb", {"query": query})
# Verify we get relevant results
assert query.lower() in result.lower() or len(result) > 50
# Test getting full knowledge base
full_kb = await knowledge_base_server.call_tool("get_knowledge_base", {})
assert "company_policy" in full_kb
assert "benefits" in full_kb
assert "faq" in full_kb