form id added for generating theme
This commit is contained in:
@@ -1,195 +1,13 @@
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
from typing import List, Dict, Optional, TypedDict, Sequence, Annotated
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
|
||||
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
|
||||
from langgraph.checkpoint.memory import MemorySaver
|
||||
from langgraph.graph import START, MessagesState, StateGraph
|
||||
from utils.utils import format_questions_text
|
||||
from src.prompts import chat_prompt
|
||||
from langchain_openai import ChatOpenAI
|
||||
@dataclass
|
||||
class Message:
|
||||
role: str # 'human' or 'ai'
|
||||
content: str
|
||||
timestamp: str
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
doc_id = 2
|
||||
|
||||
QUESTIONS_PATH = "./data/config_files/questions.json"
|
||||
with open(QUESTIONS_PATH, "r") as f:
|
||||
questions = json.load(f)
|
||||
x_api_key = os.getenv("BACKEND_XAPI_KEY")
|
||||
url = f"{os.getenv('BACKEND_BASE_URL')}/v3/api/custom/theme-document/answer/{doc_id}?x-project={x_api_key}"
|
||||
|
||||
prompt_template = None
|
||||
MODEL = "gpt-4o"
|
||||
def initialize_workflow(model) -> StateGraph:
|
||||
"""Initialize LangGraph workflow"""
|
||||
workflow = StateGraph(state_schema=MessagesState)
|
||||
memory = MemorySaver()
|
||||
|
||||
def call_model(state: MessagesState):
|
||||
prompt = prompt_template.invoke({"messages": state["messages"], "language": state["language"]})
|
||||
response = model.invoke(prompt)
|
||||
return {"messages": [response]}
|
||||
|
||||
workflow.add_edge(START, "model")
|
||||
workflow.add_node("model", call_model)
|
||||
return workflow.compile(checkpointer=memory)
|
||||
|
||||
|
||||
def setup_prompt_template(theme: int, resume: str) -> ChatPromptTemplate:
|
||||
"""Set up the prompt template"""
|
||||
return ChatPromptTemplate.from_messages([
|
||||
("system", chat_prompt(theme, resume)),
|
||||
MessagesPlaceholder(variable_name="messages")
|
||||
])
|
||||
|
||||
def parse_ai_response(content: str) -> Dict:
|
||||
"""Parse AI response content into expected format"""
|
||||
try:
|
||||
response = json.loads(content)
|
||||
return {
|
||||
"message": response.get("message", ""),
|
||||
"end": response.get("end", "no") == "yes"
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
return {
|
||||
"message": content,
|
||||
"end": False
|
||||
}
|
||||
|
||||
def add_message(storage_path: Path, conversation_id: str, role: str, content: str) -> None:
|
||||
"""Add a message to the conversation history"""
|
||||
message_data = {
|
||||
"role": role,
|
||||
"content": content,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
conversations = load_conversations(storage_path)
|
||||
if conversation_id not in conversations:
|
||||
conversations[conversation_id] = {"messages": []}
|
||||
conversations[conversation_id]["messages"].append(message_data)
|
||||
save_conversations(storage_path, conversations)
|
||||
|
||||
|
||||
def get_conversation_history(conversation_id: str, storage_path: Path) -> List[Message]:
|
||||
"""Get the conversation history"""
|
||||
conversations = load_conversations(storage_path)
|
||||
if conversation_id not in conversations:
|
||||
return None
|
||||
|
||||
return [
|
||||
Message(
|
||||
role=msg["role"],
|
||||
content=msg["content"],
|
||||
timestamp=msg["timestamp"]
|
||||
)
|
||||
for msg in conversations[conversation_id]["messages"]
|
||||
]
|
||||
|
||||
def load_conversations(storage_path: Path) -> Dict:
|
||||
"""Load conversations from storage file"""
|
||||
try:
|
||||
with open(storage_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
def save_conversations(storage_path: Path, conversations: Dict) -> None:
|
||||
"""Save conversations to storage file"""
|
||||
with open(storage_path, 'w') as f:
|
||||
json.dump(conversations, f, indent=2)
|
||||
|
||||
def convert_to_langchain_messages(messages: List[Message]) -> List[HumanMessage | AIMessage]:
|
||||
"""Convert our Message objects to LangChain message objects"""
|
||||
converted_messages = []
|
||||
for msg in messages:
|
||||
if msg.role == "human":
|
||||
converted_messages.append(HumanMessage(content=msg.content))
|
||||
else:
|
||||
converted_messages.append(AIMessage(content=msg.content))
|
||||
return converted_messages
|
||||
def ai_chat(query: str, conversation_id: str, theme_id: int, resume: str) -> str:
|
||||
"""Main chat function that processes queries and manages conversation"""
|
||||
storage_path = Path("conversations.json")
|
||||
|
||||
class State(TypedDict):
|
||||
messages: Annotated[Sequence[BaseMessage], "The messages in the conversation"]
|
||||
language: str
|
||||
# Initialize model and workflow
|
||||
model = ChatOpenAI(model=MODEL)
|
||||
workflow = StateGraph(state_schema=State)
|
||||
|
||||
|
||||
|
||||
def call_model(state: State):
|
||||
prompt_template = setup_prompt_template(theme_id, resume)
|
||||
prompt = prompt_template.invoke({
|
||||
"messages": state["messages"],
|
||||
"language": state["language"]
|
||||
})
|
||||
response = model.invoke(prompt)
|
||||
return {"messages": [response]}
|
||||
|
||||
workflow.add_edge(START, "model")
|
||||
workflow.add_node("model", call_model)
|
||||
|
||||
memory = MemorySaver()
|
||||
app = workflow.compile(checkpointer=memory)
|
||||
|
||||
# Get conversation history
|
||||
history = get_conversation_history(conversation_id, storage_path)
|
||||
|
||||
config = {"configurable": {"thread_id": conversation_id}}
|
||||
language = "English"
|
||||
|
||||
if not history:
|
||||
# New conversation
|
||||
input_messages = [HumanMessage(content=query)] if query else [HumanMessage(content="Let's get started")]
|
||||
output = app.invoke(
|
||||
{"messages": input_messages, "language": language},
|
||||
config
|
||||
)
|
||||
else:
|
||||
# Existing conversation
|
||||
history = convert_to_langchain_messages(history)
|
||||
input_messages = history + [HumanMessage(content=query)] if query else history
|
||||
output = app.invoke(
|
||||
{"messages": input_messages, "language": language},
|
||||
config
|
||||
)
|
||||
|
||||
# Store messages
|
||||
if query:
|
||||
add_message(storage_path, conversation_id, "human", query)
|
||||
add_message(storage_path, conversation_id, "ai", output["messages"][-1].content)
|
||||
|
||||
return output["messages"][-1].content
|
||||
|
||||
|
||||
# Example usage:
|
||||
if __name__ == "__main__":
|
||||
# Sample resume
|
||||
sample_resume = """
|
||||
John Doe
|
||||
EMT-B Certified
|
||||
5 years experience as volunteer firefighter
|
||||
Bachelor's in Fire Science
|
||||
"""
|
||||
|
||||
# Sample conversation
|
||||
conversation_id = "12345"
|
||||
theme_id = 1 # Customer Service theme
|
||||
|
||||
# Start conversation
|
||||
|
||||
|
||||
# Continue conversation
|
||||
follow_up = ai_chat(
|
||||
query="What was my last questions?",
|
||||
conversation_id=conversation_id,
|
||||
theme_id=theme_id,
|
||||
resume=sample_resume
|
||||
)
|
||||
print("AI:", follow_up)
|
||||
result = requests.get(url)
|
||||
response_json = result.json() # Return response in JSON format
|
||||
print(response_json)
|
||||
Reference in New Issue
Block a user