Files
reason-flow/server/services/queryModelService.js
T

501 lines
15 KiB
JavaScript
Raw Normal View History

2025-11-06 11:08:59 +01:00
const groqService = require('./groqService');
const { Plan, ToolExecution, Document } = require('../models');
const logger = require('../utils/logger');
const graphRagService = require('./graphRagService');
const embeddingService = require('./embeddingService');
const axios = require('axios');
class QueryModelService {
constructor() {
this.modelType = 'QUERYMODEL';
this.systemPrompt = `You are QUERYMODEL, an expert engineering execution system. Your primary function is to execute engineering plans using various tools and resources.
Your capabilities include:
- Executing step-by-step engineering plans
- Using specialized tools for calculations, analysis, and reporting
- Coordinating with external resources and databases
- Generating detailed execution reports
- Handling complex engineering workflows
- Ensuring quality and safety standards
Available tools:
- Query Expander: Enhance and clarify engineering queries
- Extraction: Search and extract information from documents
- Report1: Generate formatted engineering reports
- Report2: Create detailed engineering files and documents
- Web Search: Find current engineering information and standards
- Encyclopedia PDF: Search specialized engineering documents
When executing plans, always:
1. Follow the plan steps systematically
2. Use appropriate tools for each step
3. Document all results and findings
4. Ensure quality and safety standards
5. Provide detailed progress updates
6. Handle errors and deviations gracefully`;
}
async executePlan(planId, options = {}) {
try {
const startTime = Date.now();
// Get the plan
const plan = await Plan.findByPk(planId);
if (!plan) {
throw new Error('Plan not found');
}
if (plan.status !== 'approved') {
throw new Error('Plan must be approved before execution');
}
// Update plan status to executing
await plan.update({ status: 'executing' });
// Prepare execution context
const executionContext = {
planId,
planTitle: plan.title,
planSteps: plan.steps,
toolsRequired: plan.tools_required,
estimatedDuration: plan.estimated_duration,
complexityScore: plan.complexity_score,
...options
};
// Execute the plan using Groq
const response = await groqService.executePlan(plan.description, plan.tools_required);
const endTime = Date.now();
const processingTime = (endTime - startTime) / 1000;
// Parse execution results
const executionResults = this.parseExecutionResponse(response.content);
// Update plan with execution results
await plan.update({
status: 'completed',
execution_result: executionResults,
metadata: {
...plan.metadata,
execution: {
processingTime,
tokensUsed: response.usage.total_tokens,
model: response.model,
finishReason: response.finishReason
}
}
});
logger.info(`QUERYMODEL execution completed in ${processingTime}s for plan: ${planId}`);
return {
plan,
executionResults,
processingTime,
tokensUsed: response.usage.total_tokens,
model: response.model
};
} catch (error) {
logger.error('QUERYMODEL execution error:', error);
// Update plan status to failed
if (plan) {
await plan.update({
status: 'failed',
execution_result: { error: error.message }
});
}
throw new Error(`QUERYMODEL execution failed: ${error.message}`);
}
}
parseExecutionResponse(content) {
try {
const lines = content.split('\n');
let stepsCompleted = [];
let results = [];
let toolsUsed = [];
let issues = [];
let recommendations = [];
let currentSection = '';
for (const line of lines) {
const trimmedLine = line.trim();
if (trimmedLine.toLowerCase().includes('steps completed:')) {
currentSection = 'steps';
} else if (trimmedLine.toLowerCase().includes('results:')) {
currentSection = 'results';
} else if (trimmedLine.toLowerCase().includes('tools used:')) {
currentSection = 'tools';
} else if (trimmedLine.toLowerCase().includes('issues:')) {
currentSection = 'issues';
} else if (trimmedLine.toLowerCase().includes('recommendations:')) {
currentSection = 'recommendations';
} else if (trimmedLine.match(/^\d+\./)) {
if (currentSection === 'steps') {
stepsCompleted.push(trimmedLine);
}
} else if (trimmedLine.startsWith('-')) {
if (currentSection === 'results') {
results.push(trimmedLine.substring(1).trim());
} else if (trimmedLine === 'tools') {
toolsUsed.push(trimmedLine.substring(1).trim());
} else if (currentSection === 'issues') {
issues.push(trimmedLine.substring(1).trim());
} else if (currentSection === 'recommendations') {
recommendations.push(trimmedLine.substring(1).trim());
}
}
}
return {
stepsCompleted,
results,
toolsUsed,
issues,
recommendations,
rawContent: content,
executionStatus: issues.length > 0 ? 'completed_with_issues' : 'completed_successfully'
};
} catch (error) {
logger.error('Error parsing execution response:', error);
return {
stepsCompleted: [],
results: [content],
toolsUsed: [],
issues: [],
recommendations: [],
rawContent: content,
executionStatus: 'completed'
};
}
}
async executeTool(toolName, toolType, inputParameters, planId) {
try {
const startTime = Date.now();
// Create tool execution record
const toolExecution = await ToolExecution.create({
plan_id: planId,
tool_name: toolName,
tool_type: toolType,
input_parameters: inputParameters,
status: 'running'
});
let result;
// Execute the specific tool
switch (toolType) {
case 'query_expander':
result = await this.executeQueryExpander(inputParameters);
break;
case 'extraction':
result = await this.executeExtraction(inputParameters);
break;
case 'report1':
result = await this.executeReport1(inputParameters);
break;
case 'report2':
result = await this.executeReport2(inputParameters);
break;
case 'web_search':
result = await this.executeWebSearch(inputParameters);
break;
case 'encyclopedia_pdf':
result = await this.executeEncyclopediaPdf(inputParameters);
break;
case 'orchestrate':
result = await this.executeOrchestrate(inputParameters);
break;
default:
throw new Error(`Unknown tool type: ${toolType}`);
}
const endTime = Date.now();
const executionTime = (endTime - startTime) / 1000;
// Update tool execution record
await toolExecution.update({
output_result: result,
status: 'completed',
execution_time: executionTime,
tokens_used: result.tokensUsed || 0
});
logger.info(`Tool executed: ${toolName} in ${executionTime}s`);
return toolExecution;
} catch (error) {
logger.error(`Tool execution error: ${toolName}`, error);
// Update tool execution record with error
if (toolExecution) {
await toolExecution.update({
status: 'failed',
error_message: error.message
});
}
throw new Error(`Tool execution failed: ${error.message}`);
}
}
async executeQueryExpander(inputParameters) {
const { query, context } = inputParameters;
const response = await groqService.expandQuery(query, context);
return {
expandedQuery: response.content,
tokensUsed: response.usage.total_tokens,
processingTime: response.processingTime
};
}
async executeOrchestrate(inputParameters) {
const { query, category, topK = 5, generateReport = true } = inputParameters;
// 1) Expand query
const expanded = await this.executeQueryExpander({ query, context: { category } });
const expandedQuery = (expanded.expandedQuery || '').trim() || query;
// 2) Extract from RAG using original query (use 'general' category for now)
const extraction = await this.executeExtraction({ query: query, category: 'general', topK });
// 3) If low confidence, augment with web search using original query
let web = null;
if (extraction.confidence < 0.7) {
try {
web = await this.executeWebSearch({ query: query, maxResults: 5, searchDepth: 'basic', includeAnswer: true });
} catch (e) {
// continue without web
}
}
// 4) Optionally generate a brief report
let report = null;
if (generateReport) {
// Get full document content for better report generation
const documentDetails = await Promise.all(
extraction.results.slice(0, 3).map(async (result) => {
try {
const doc = await Document.findByPk(result.id, {
attributes: ['id', 'original_filename', 'extracted_text', 'category']
});
return {
filename: result.original_filename,
content: doc?.extracted_text || result.snippet,
score: result.score,
category: result.category
};
} catch (error) {
return {
filename: result.original_filename,
content: result.snippet,
score: result.score,
category: result.category
};
}
})
);
const reportData = {
query,
expandedQuery,
relevantDocuments: documentDetails,
webAnswer: web?.answer || null,
webCount: web?.totalResults || 0
};
const reportResp = await groqService.generateReport(reportData, 'summary');
report = {
content: reportResp.content,
tokensUsed: reportResp.usage?.total_tokens,
processingTime: reportResp.processingTime
};
}
return {
query,
expandedQuery,
extraction,
web,
report,
decision: {
usedWeb: !!web,
confidence: extraction.confidence
}
};
}
async executeExtraction(inputParameters) {
const { query, topK = 5, category } = inputParameters;
// 1) Try Graph RAG first
const graph = await graphRagService.graphSearch({ query, category });
let results = graph.results.map((r) => ({
id: r.id,
original_filename: r.original_filename,
snippet: r.snippet,
category: r.category,
score: r.score,
source: 'graph'
}));
// 2) If not enough, fallback to semantic search
if (results.length < topK) {
const queryEmbedding = await embeddingService.embedText(query);
const where = { is_indexed: true };
if (category) where.category = category;
const docs = await Document.findAll({
where,
attributes: ['id', 'original_filename', 'extracted_text', 'embeddings', 'category']
});
const scored = [];
for (const d of docs) {
const emb = d.embeddings || [];
if (!Array.isArray(emb) || emb.length === 0) continue;
const score = embeddingService.cosineSimilarity(queryEmbedding, emb);
scored.push({
id: d.id,
original_filename: d.original_filename,
snippet: (d.extracted_text || '').slice(0, 400),
category: d.category,
score,
source: 'semantic'
});
}
scored.sort((a, b) => b.score - a.score);
const need = topK - results.length;
results = results.concat(scored.slice(0, Math.max(0, need)));
}
// Trim to topK and return
results.sort((a, b) => b.score - a.score);
results = results.slice(0, topK);
// Confidence heuristic
const confidence = results.length > 0 ? Math.min(0.99, Math.max(0.5, results[0].score)) : 0;
return {
query,
topK,
results,
confidence
};
}
async executeReport1(inputParameters) {
const { data, format, context } = inputParameters;
const response = await groqService.generateReport(data, 'technical');
return {
report: response.content,
format: format || 'technical',
tokensUsed: response.usage.total_tokens,
processingTime: response.processingTime
};
}
async executeReport2(inputParameters) {
const { data, format, filename } = inputParameters;
const response = await groqService.generateReport(data, format);
return {
report: response.content,
filename: filename || `report_${Date.now()}.txt`,
format: format || 'general',
tokensUsed: response.usage.total_tokens,
processingTime: response.processingTime
};
}
async executeWebSearch(inputParameters) {
const { query, maxResults = 5, searchDepth = 'basic', includeAnswer = true } = inputParameters;
const apiKey = process.env.TAVILY_API_KEY;
if (!apiKey) {
throw new Error('TAVILY_API_KEY is not set');
}
const url = 'https://api.tavily.com/search';
const payload = {
query,
search_depth: searchDepth,
include_answer: includeAnswer,
max_results: Math.min(10, Math.max(1, maxResults))
};
try {
const resp = await axios.post(url, payload, {
headers: { Authorization: `Bearer ${apiKey}` }, // ✅ Correct way to pass API key
timeout: 15000,
});
const data = resp.data || {};
const results = (data.results || []).map((r) => ({
title: r.title,
url: r.url,
snippet: r.content || r.snippet || '',
score: r.score ?? undefined,
published: r.published_date || undefined,
}));
return {
query,
answer: data.answer || null,
results,
totalResults: results.length,
source: 'tavily',
};
} catch (error) {
logger.error('Tavily web search error:', error?.response?.data || error.message);
throw new Error('Web search failed');
}
}
async executeEncyclopediaPdf(inputParameters) {
const { query, documents, context } = inputParameters;
// Use our RAG for offline PDF search (category filter could be 'encyclopedia')
const graph = await graphRagService.graphSearch({ query, category: 'encyclopedia' });
return {
query,
results: graph.results,
totalResults: graph.results.length,
source: 'offline_rag'
};
}
async getModelStatus() {
try {
const groqInfo = await groqService.getModelInfo();
const connectionTest = await groqService.testConnection();
return {
modelType: this.modelType,
status: connectionTest.success ? 'active' : 'error',
groqInfo,
connectionTest,
lastChecked: new Date().toISOString()
};
} catch (error) {
logger.error('QUERYMODEL status check error:', error);
return {
modelType: this.modelType,
status: 'error',
error: error.message,
lastChecked: new Date().toISOString()
};
}
}
}
module.exports = new QueryModelService();