Files
ds_zagres_ai/ai_service/openwebui_channels.py
T
Iyeoluwa Akinrinola 463923278a ..
2025-05-16 15:52:20 +01:00

301 lines
9.0 KiB
Python

"""
OpenWebUI channels integration for team chats.
This module provides functions to interact with OpenWebUI channels API
for creating and managing team chats through OpenWebUI's channels feature.
It also includes functionality to listen for and respond to messages in OpenWebUI channels.
"""
import requests
import json
from typing import List, Dict, Any, Optional
from ai_service.config import config
class OpenWebUIChannels:
"""Class for interacting with OpenWebUI channels."""
def __init__(self):
"""Initialize the OpenWebUI channels integration."""
self.openwebui_url = config.OPENWEBUI_URL
self.openwebui_api_key = config.OPENWEBUI_API_KEY
self.headers = {
"Content-Type": "application/json"
}
# Add API key if available
if self.openwebui_api_key:
self.headers["Authorization"] = f"Bearer {self.openwebui_api_key}"
def create_channel(self, name: str, description: str = "", is_private: bool = False) -> Optional[Dict[str, Any]]:
"""
Create a new channel in OpenWebUI.
Args:
name: Name of the channel.
description: Description of the channel.
is_private: Whether the channel is private.
Returns:
Channel data if creation was successful, None otherwise.
"""
try:
# Prepare channel data
channel_data = {
"name": name,
"description": description,
"is_private": is_private
}
# Make API request to create channel
response = requests.post(
f"{self.openwebui_url}/api/channels",
headers=self.headers,
json=channel_data,
timeout=30
)
if response.status_code == 200 or response.status_code == 201:
return response.json()
else:
print(f"Error creating channel: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error creating channel: {str(e)}")
return None
def get_channel(self, channel_id: str) -> Optional[Dict[str, Any]]:
"""
Get a channel by ID.
Args:
channel_id: ID of the channel.
Returns:
Channel data if found, None otherwise.
"""
try:
# Make API request to get channel
response = requests.get(
f"{self.openwebui_url}/api/channels/{channel_id}",
headers=self.headers,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
print(f"Error getting channel: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error getting channel: {str(e)}")
return None
def get_channels(self) -> List[Dict[str, Any]]:
"""
Get all channels.
Returns:
List of channel data.
"""
try:
# Make API request to get channels
response = requests.get(
f"{self.openwebui_url}/api/channels",
headers=self.headers,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
print(f"Error getting channels: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"Error getting channels: {str(e)}")
return []
def add_member(self, channel_id: str, user_id: str) -> bool:
"""
Add a user to a channel.
Args:
channel_id: ID of the channel.
user_id: ID of the user to add.
Returns:
True if addition was successful, False otherwise.
"""
try:
# Prepare member data
member_data = {
"user_id": user_id
}
# Make API request to add member
response = requests.post(
f"{self.openwebui_url}/api/channels/{channel_id}/members",
headers=self.headers,
json=member_data,
timeout=30
)
return response.status_code == 200 or response.status_code == 201
except Exception as e:
print(f"Error adding member to channel: {str(e)}")
return False
def remove_member(self, channel_id: str, user_id: str) -> bool:
"""
Remove a user from a channel.
Args:
channel_id: ID of the channel.
user_id: ID of the user to remove.
Returns:
True if removal was successful, False otherwise.
"""
try:
# Make API request to remove member
response = requests.delete(
f"{self.openwebui_url}/api/channels/{channel_id}/members/{user_id}",
headers=self.headers,
timeout=30
)
return response.status_code == 200 or response.status_code == 204
except Exception as e:
print(f"Error removing member from channel: {str(e)}")
return False
def delete_channel(self, channel_id: str) -> bool:
"""
Delete a channel.
Args:
channel_id: ID of the channel.
Returns:
True if deletion was successful, False otherwise.
"""
try:
# Make API request to delete channel
response = requests.delete(
f"{self.openwebui_url}/api/channels/{channel_id}",
headers=self.headers,
timeout=30
)
return response.status_code == 200 or response.status_code == 204
except Exception as e:
print(f"Error deleting channel: {str(e)}")
return False
def register_webhook(self, webhook_url: str) -> bool:
"""
Register a webhook to receive channel messages.
Args:
webhook_url: URL of the webhook endpoint.
Returns:
True if registration was successful, False otherwise.
"""
try:
# Prepare webhook data
webhook_data = {
"url": webhook_url,
"events": ["channel_message"]
}
# Make API request to register webhook
response = requests.post(
f"{self.openwebui_url}/api/webhooks",
headers=self.headers,
json=webhook_data,
timeout=30
)
if response.status_code == 200 or response.status_code == 201:
print(f"Successfully registered webhook: {webhook_url}")
return True
else:
print(f"Error registering webhook: {response.status_code} - {response.text}")
return False
except Exception as e:
print(f"Error registering webhook: {str(e)}")
return False
def send_message(self, channel_id: str, message: str, user_id: str) -> Optional[Dict[str, Any]]:
"""
Send a message to a channel.
Args:
channel_id: ID of the channel.
message: Message content.
user_id: ID of the user sending the message.
Returns:
Message data if sending was successful, None otherwise.
"""
try:
# Prepare message data
message_data = {
"content": message,
"user_id": user_id
}
# Make API request to send message
response = requests.post(
f"{self.openwebui_url}/api/channels/{channel_id}/messages",
headers=self.headers,
json=message_data,
timeout=30
)
if response.status_code == 200 or response.status_code == 201:
return response.json()
else:
print(f"Error sending message to channel: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error sending message to channel: {str(e)}")
return None
def get_webhooks(self) -> List[Dict[str, Any]]:
"""
Get all registered webhooks.
Returns:
List of webhook data.
"""
try:
# Make API request to get webhooks
response = requests.get(
f"{self.openwebui_url}/api/webhooks",
headers=self.headers,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
print(f"Error getting webhooks: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"Error getting webhooks: {str(e)}")
return []
# Create a singleton instance
openwebui_channels = OpenWebUIChannels()