# Custom AI bot for our specific needs import asyncio import socketio import os import sys import traceback from env import WEBUI_URL, TOKEN from utils import send_message, send_typing import aiohttp # Get configuration from environment variables MODEL_ID = os.getenv("MODEL_ID", "llama3.1") SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", "You are a helpful AI assistant.") TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7")) MAX_TOKENS = int(os.getenv("MAX_TOKENS", "2048")) TOP_P = float(os.getenv("TOP_P", "0.9")) TRIGGERS = os.getenv("TRIGGERS", "@ai,@bot,@assistant,@chatbot").split(",") RESPOND_TO_ALL = os.getenv("RESPOND_TO_ALL", "false").lower() == "true" # Create an asynchronous Socket.IO client instance sio = socketio.AsyncClient(logger=False, engineio_logger=False) # Event handlers @sio.event async def connect(): print("Connected to OpenWebUI!") @sio.event async def disconnect(): print("Disconnected from OpenWebUI!") # Function to call the OpenAI-compatible API async def openai_chat_completion(messages): payload = { "model": MODEL_ID, "messages": messages, "stream": False, "temperature": TEMPERATURE, "max_tokens": MAX_TOKENS, "top_p": TOP_P } try: async with aiohttp.ClientSession() as session: try: async with session.post( f"{WEBUI_URL}/api/chat/completions", headers={"Authorization": f"Bearer {TOKEN}"}, json=payload, timeout=300 # 5-minute timeout ) as response: if response.status == 200: return await response.json() else: # Handle errors or return raw response text error_text = await response.text() print(f"API error: {response.status} - {error_text}") return {"error": error_text, "status": response.status} except aiohttp.ClientError as e: print(f"HTTP request error: {str(e)}") return {"error": f"HTTP request error: {str(e)}", "status": 500} except Exception as e: print(f"Unexpected error in openai_chat_completion: {str(e)}") return {"error": f"Unexpected error: {str(e)}", "status": 500} # Helper function to send typing indicators while waiting for a response async def send_typing_until_complete(channel_id, coro): """ Sends typing indicators every second until the provided coroutine completes. """ task = asyncio.create_task(coro) # Begin the provided coroutine task try: # While the task is running, send typing indicators every second while not task.done(): await send_typing(sio, channel_id) await asyncio.sleep(1) # Await the actual result of the coroutine return await task except Exception as e: task.cancel() raise e # Propagate any exceptions that occurred in the coroutine # Define a function to handle channel events def events(user_id): # Use the configured triggers and respond_to_all setting global TRIGGERS, RESPOND_TO_ALL @sio.on("channel-events") async def channel_events(data): if data["user"]["id"] == user_id: # Ignore events from the bot itself return if data["data"]["type"] == "message": message_content = data["data"]["data"]["content"] channel_id = data["channel_id"] sender_name = data["user"]["name"] print(f"{sender_name}: {message_content}") # Check if we should respond should_respond = RESPOND_TO_ALL message_lower = message_content.lower() if not should_respond: # Check if the message mentions the bot for trigger in TRIGGERS: trigger_lower = trigger.lower() if trigger_lower in message_lower: should_respond = True break if not should_respond: # Skip messages that don't mention the bot return # Remove the trigger from the message processed_message = message_content # Only try to remove triggers if we're not responding to all messages if not RESPOND_TO_ALL: for trigger in TRIGGERS: trigger_lower = trigger.lower() if trigger_lower in message_lower: # Find the trigger in the message index = message_lower.find(trigger_lower) if index != -1: # Remove the trigger processed_message = processed_message[:index] + processed_message[index + len(trigger):].strip() # If the message is empty after removing the trigger, use a default prompt if not processed_message.strip(): processed_message = "Hello, how can I help you?" break # Show typing indicator await send_typing(sio, channel_id) try: # Prepare the messages for the API messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": processed_message} ] # Call the API while showing typing indicators response = await send_typing_until_complete( channel_id, openai_chat_completion(messages) ) # Process the response if response.get("choices"): completion = response["choices"][0]["message"]["content"] # Add a robot emoji to the response formatted_response = f"🤖 {completion}" await send_message(channel_id, formatted_response) else: error_message = response.get("error", "I'm sorry, I couldn't generate a response.") await send_message(channel_id, f"🤖 Error: {error_message}") except Exception as e: print(f"Error generating response: {str(e)}") await send_message( channel_id, "🤖 Something went wrong while processing your request." ) # Define an async function for the main workflow async def main(): max_retries = 3 retry_delay = 5 # seconds for attempt in range(1, max_retries + 1): try: print(f"Connecting to {WEBUI_URL}... (Attempt {attempt}/{max_retries})") await sio.connect( WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"] ) print("Connection established!") break # Connection successful, exit the retry loop except Exception as e: print(f"Failed to connect: {e}") if attempt < max_retries: print(f"Retrying in {retry_delay} seconds...") await asyncio.sleep(retry_delay) else: print("Maximum connection attempts reached. Exiting.") return try: # Callback function for user-join async def join_callback(*args): try: if args and len(args) > 0: data = args[0] print(f"Received callback data: {data}") if isinstance(data, dict) and "id" in data: bot_id = data["id"] print(f"Bot connected with ID: {bot_id}") events(bot_id) # Attach the event handlers dynamically else: print(f"Invalid callback data format: {data}") else: print("No callback data received") # If no data is received, use a default ID print("Using default bot ID") events("bot-default-id") # Attach the event handlers with a default ID except Exception as e: print(f"Error in join_callback: {str(e)}") print(traceback.format_exc()) # Authenticate with the server print("Authenticating with the server...") await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback) print("Authentication request sent") # Wait indefinitely to keep the connection open print("Waiting for events...") await sio.wait() except Exception as e: print(f"Error in main loop: {str(e)}") # Actually run the async `main` function using `asyncio` async def shutdown(): """Gracefully shut down the bot.""" print("\nShutting down bot...") if sio.connected: print("Disconnecting from OpenWebUI...") await sio.disconnect() print("Bot shutdown complete.") if __name__ == "__main__": print("Starting custom AI bot...") print(f"OpenWebUI URL: {WEBUI_URL}") print(f"Model: {MODEL_ID}") print(f"System prompt: {SYSTEM_PROMPT[:50]}..." if len(SYSTEM_PROMPT) > 50 else f"System prompt: {SYSTEM_PROMPT}") print(f"Temperature: {TEMPERATURE}") print(f"Max tokens: {MAX_TOKENS}") print(f"Top-p: {TOP_P}") print(f"Triggers: {TRIGGERS}") print(f"Respond to all: {RESPOND_TO_ALL}") print("Press Ctrl+C to stop") try: # Run the main function asyncio.run(main()) except KeyboardInterrupt: print("\nBot stopped by user") # Run the shutdown function try: asyncio.run(shutdown()) except Exception as e: print(f"Error during shutdown: {str(e)}") except Exception as e: print(f"Error running bot: {str(e)}") # Try to shut down gracefully try: asyncio.run(shutdown()) except Exception as shutdown_error: print(f"Error during shutdown: {str(shutdown_error)}")