Integrate OpenWebUI bot with AI service

This commit is contained in:
Iyeoluwa Akinrinola
2025-05-20 02:18:46 +01:00
parent 730009ae87
commit 0a27103875
46 changed files with 1749 additions and 3012 deletions
+171
View File
@@ -0,0 +1,171 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# PyPI configuration file
.pypirc
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Open WebUI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+30
View File
@@ -0,0 +1,30 @@
# open-webui/bot
This repository provides an experimental boilerplate for building bots compatible with the **Open WebUI** "Channels" feature (introduced in version 0.5.0). It serves as a proof of concept to demonstrate bot-building capabilities while highlighting the potential of asynchronous communication enabled by Channels.
## ⚡ Key Highlights
- **Highly Experimental**: This is an early-stage project showcasing basic bot-building functionality. Expect major API changes in the future.
- **Extensible Framework**: Designed as a foundation for further development, with plans to enhance APIs, developer tooling, and usability.
- **Asynchronous Communication**: Leverages Open WebUI Channels for event-driven workflows.
## 🛠️ Getting Started with Examples
This repository includes an `/examples` folder containing runnable example bots that demonstrate basic functionality.
To run an example, execute the corresponding module using the `-m` flag in Python. For example, to run the `ai` example:
```bash
python -m examples.ai
```
> **Note**: Ensure that your current working directory (PWD) is the root of this repository when running examples, as this is required for proper execution.
Replace `ai` in the command above with the specific example youd like to execute from the `/examples` folder.
## 🚧 Disclaimer
This project is an early-stage proof of concept. **APIs will break** and existing functionality may change as Open WebUI evolves to include native bot support. This repository is not production-ready and primarily serves experimental and exploratory purposes.
## 🎯 Future Vision
We aim to introduce improved APIs, enhanced developer tooling, and seamless native support for bots directly within Open WebUI. The ultimate goal is to make building bots easier, faster, and more intuitive.
---
Contributions, feedback, and experimentation are encouraged. Join us in shaping the future of bot-building on Open WebUI!
View File
+12
View File
@@ -0,0 +1,12 @@
import os
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
print("dotenv not installed, skipping...")
WEBUI_URL = os.getenv("WEBUI_URL", "http://localhost:8080")
TOKEN = os.getenv("TOKEN", "")
+132
View File
@@ -0,0 +1,132 @@
# WARNING: This might not work in the future. Do NOT use this in production.
import asyncio
import socketio
from env import WEBUI_URL, TOKEN
from utils import send_message, send_typing
MODEL_ID = "llama3.2:latest"
# Create an asynchronous Socket.IO client instance
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
# Event handlers
@sio.event
async def connect():
print("Connected!")
@sio.event
async def disconnect():
print("Disconnected from the server!")
import aiohttp
import asyncio
async def openai_chat_completion(messages):
payload = {
"model": MODEL_ID,
"messages": messages,
"stream": False,
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{WEBUI_URL}/api/chat/completions",
headers={"Authorization": f"Bearer {TOKEN}"},
json=payload,
) as response:
if response.status == 200:
return await response.json()
else:
# Optional: Handle errors or return raw response text
return {"error": await response.text(), "status": response.status}
# Define a function to handle channel events
def events(user_id):
@sio.on("channel-events")
async def channel_events(data):
if data["user"]["id"] == user_id:
# Ignore events from the bot itself
return
if data["data"]["type"] == "message":
print(f'{data["user"]["name"]}: {data["data"]["data"]["content"]}')
await send_typing(sio, data["channel_id"])
async def send_typing_until_complete(channel_id, coro):
"""
Sends typing indicators every second until the provided coroutine completes.
"""
task = asyncio.create_task(coro) # Begin the provided coroutine task
try:
# While the task is running, send typing indicators every second
while not task.done():
await send_typing(sio, channel_id)
await asyncio.sleep(1)
# Await the actual result of the coroutine
return await task
except Exception as e:
task.cancel()
raise e # Propagate any exceptions that occurred in the coroutine
# OpenAI API coroutine
# This uses naive implementation of OpenAI API, that does not utilize the context of the conversation
openai_task = openai_chat_completion(
[
{"role": "system", "content": "You are a friendly AI."},
{"role": "user", "content": data["data"]["data"]["content"]},
]
)
try:
# Run OpenAI coroutine while showing typing indicators
response = await send_typing_until_complete(
data["channel_id"], openai_task
)
if response.get("choices"):
completion = response["choices"][0]["message"]["content"]
await send_message(data["channel_id"], completion)
else:
await send_message(
data["channel_id"], "I'm sorry, I don't understand."
)
except Exception:
await send_message(
data["channel_id"],
"Something went wrong while processing your request.",
)
# Define an async function for the main workflow
async def main():
try:
print(f"Connecting to {WEBUI_URL}...")
await sio.connect(
WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"]
)
print("Connection established!")
except Exception as e:
print(f"Failed to connect: {e}")
return
# Callback function for user-join
async def join_callback(data):
events(data["id"]) # Attach the event handlers dynamically
# Authenticate with the server
await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback)
# Wait indefinitely to keep the connection open
await sio.wait()
# Actually run the async `main` function using `asyncio`
if __name__ == "__main__":
asyncio.run(main())
+247
View File
@@ -0,0 +1,247 @@
# Custom AI bot for our specific needs
import asyncio
import socketio
import os
import sys
from env import WEBUI_URL, TOKEN
from utils import send_message, send_typing
import aiohttp
# Get configuration from environment variables
MODEL_ID = os.getenv("MODEL_ID", "llama3.1")
SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", "You are a helpful AI assistant.")
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
MAX_TOKENS = int(os.getenv("MAX_TOKENS", "2048"))
TOP_P = float(os.getenv("TOP_P", "0.9"))
TRIGGERS = os.getenv("TRIGGERS", "@ai,@bot,@assistant,@chatbot").split(",")
RESPOND_TO_ALL = os.getenv("RESPOND_TO_ALL", "false").lower() == "true"
# Create an asynchronous Socket.IO client instance
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
# Event handlers
@sio.event
async def connect():
print("Connected to OpenWebUI!")
@sio.event
async def disconnect():
print("Disconnected from OpenWebUI!")
# Function to call the OpenAI-compatible API
async def openai_chat_completion(messages):
payload = {
"model": MODEL_ID,
"messages": messages,
"stream": False,
"temperature": TEMPERATURE,
"max_tokens": MAX_TOKENS,
"top_p": TOP_P
}
try:
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"{WEBUI_URL}/api/chat/completions",
headers={"Authorization": f"Bearer {TOKEN}"},
json=payload,
timeout=300 # 5-minute timeout
) as response:
if response.status == 200:
return await response.json()
else:
# Handle errors or return raw response text
error_text = await response.text()
print(f"API error: {response.status} - {error_text}")
return {"error": error_text, "status": response.status}
except aiohttp.ClientError as e:
print(f"HTTP request error: {str(e)}")
return {"error": f"HTTP request error: {str(e)}", "status": 500}
except Exception as e:
print(f"Unexpected error in openai_chat_completion: {str(e)}")
return {"error": f"Unexpected error: {str(e)}", "status": 500}
# Helper function to send typing indicators while waiting for a response
async def send_typing_until_complete(channel_id, coro):
"""
Sends typing indicators every second until the provided coroutine completes.
"""
task = asyncio.create_task(coro) # Begin the provided coroutine task
try:
# While the task is running, send typing indicators every second
while not task.done():
await send_typing(sio, channel_id)
await asyncio.sleep(1)
# Await the actual result of the coroutine
return await task
except Exception as e:
task.cancel()
raise e # Propagate any exceptions that occurred in the coroutine
# Define a function to handle channel events
def events(user_id):
# Use the configured triggers and respond_to_all setting
global TRIGGERS, RESPOND_TO_ALL
@sio.on("channel-events")
async def channel_events(data):
if data["user"]["id"] == user_id:
# Ignore events from the bot itself
return
if data["data"]["type"] == "message":
message_content = data["data"]["data"]["content"]
channel_id = data["channel_id"]
sender_name = data["user"]["name"]
print(f"{sender_name}: {message_content}")
# Check if we should respond
should_respond = RESPOND_TO_ALL
message_lower = message_content.lower()
if not should_respond:
# Check if the message mentions the bot
for trigger in TRIGGERS:
trigger_lower = trigger.lower()
if trigger_lower in message_lower:
should_respond = True
break
if not should_respond:
# Skip messages that don't mention the bot
return
# Remove the trigger from the message
processed_message = message_content
# Only try to remove triggers if we're not responding to all messages
if not RESPOND_TO_ALL:
for trigger in TRIGGERS:
trigger_lower = trigger.lower()
if trigger_lower in message_lower:
# Find the trigger in the message
index = message_lower.find(trigger_lower)
if index != -1:
# Remove the trigger
processed_message = processed_message[:index] + processed_message[index + len(trigger):].strip()
# If the message is empty after removing the trigger, use a default prompt
if not processed_message.strip():
processed_message = "Hello, how can I help you?"
break
# Show typing indicator
await send_typing(sio, channel_id)
try:
# Prepare the messages for the API
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": processed_message}
]
# Call the API while showing typing indicators
response = await send_typing_until_complete(
channel_id, openai_chat_completion(messages)
)
# Process the response
if response.get("choices"):
completion = response["choices"][0]["message"]["content"]
# Add a robot emoji to the response
formatted_response = f"🤖 {completion}"
await send_message(channel_id, formatted_response)
else:
error_message = response.get("error", "I'm sorry, I couldn't generate a response.")
await send_message(channel_id, f"🤖 Error: {error_message}")
except Exception as e:
print(f"Error generating response: {str(e)}")
await send_message(
channel_id,
"🤖 Something went wrong while processing your request."
)
# Define an async function for the main workflow
async def main():
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(1, max_retries + 1):
try:
print(f"Connecting to {WEBUI_URL}... (Attempt {attempt}/{max_retries})")
await sio.connect(
WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"]
)
print("Connection established!")
break # Connection successful, exit the retry loop
except Exception as e:
print(f"Failed to connect: {e}")
if attempt < max_retries:
print(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
else:
print("Maximum connection attempts reached. Exiting.")
return
try:
# Callback function for user-join
async def join_callback(data):
try:
bot_id = data["id"]
print(f"Bot connected with ID: {bot_id}")
events(bot_id) # Attach the event handlers dynamically
except Exception as e:
print(f"Error in join_callback: {str(e)}")
# Authenticate with the server
print("Authenticating with the server...")
await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback)
print("Authentication request sent")
# Wait indefinitely to keep the connection open
print("Waiting for events...")
await sio.wait()
except Exception as e:
print(f"Error in main loop: {str(e)}")
# Actually run the async `main` function using `asyncio`
async def shutdown():
"""Gracefully shut down the bot."""
print("\nShutting down bot...")
if sio.connected:
print("Disconnecting from OpenWebUI...")
await sio.disconnect()
print("Bot shutdown complete.")
if __name__ == "__main__":
print("Starting custom AI bot...")
print(f"OpenWebUI URL: {WEBUI_URL}")
print(f"Model: {MODEL_ID}")
print(f"System prompt: {SYSTEM_PROMPT[:50]}..." if len(SYSTEM_PROMPT) > 50 else f"System prompt: {SYSTEM_PROMPT}")
print(f"Temperature: {TEMPERATURE}")
print(f"Max tokens: {MAX_TOKENS}")
print(f"Top-p: {TOP_P}")
print(f"Triggers: {TRIGGERS}")
print(f"Respond to all: {RESPOND_TO_ALL}")
print("Press Ctrl+C to stop")
try:
# Run the main function
asyncio.run(main())
except KeyboardInterrupt:
print("\nBot stopped by user")
# Run the shutdown function
try:
asyncio.run(shutdown())
except Exception as e:
print(f"Error during shutdown: {str(e)}")
except Exception as e:
print(f"Error running bot: {str(e)}")
# Try to shut down gracefully
try:
asyncio.run(shutdown())
except Exception as shutdown_error:
print(f"Error during shutdown: {str(shutdown_error)}")
+100
View File
@@ -0,0 +1,100 @@
# WARNING: This might not work in the future. Do NOT use this in production.
import asyncio
import socketio
from smolagents import ToolCallingAgent, LiteLLMModel, DuckDuckGoSearchTool
from env import WEBUI_URL, TOKEN
from utils import send_message, send_typing
search_tool = DuckDuckGoSearchTool()
MODEL_ID = "gpt-4o"
model = LiteLLMModel(
model_id=f"openai/{MODEL_ID}", api_base=f"{WEBUI_URL}/api/", api_key=TOKEN
)
agent = ToolCallingAgent(tools=[search_tool], model=model)
# Create an asynchronous Socket.IO client instance
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
# Event handlers
@sio.event
async def connect():
print("Connected!")
@sio.event
async def disconnect():
print("Disconnected from the server!")
# Define a function to handle channel events
def events(user_id):
@sio.on("channel-events")
async def channel_events(data):
if data["user"]["id"] == user_id:
# Ignore events from the bot itself
return
if data["data"]["type"] == "message":
print(f'{data["user"]["name"]}: {data["data"]["data"]["content"]}')
# Send typing events every second while processing the input
async def simulate_typing(channel_id):
try:
while not processing_event.is_set():
await send_typing(sio, channel_id)
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
# Create an asyncio.Event to manage typing simulation
processing_event = asyncio.Event()
typing_task = asyncio.create_task(simulate_typing(data["channel_id"]))
try:
# Run the blocking agent.run in a non-blocking way using asyncio
loop = asyncio.get_running_loop()
output = await loop.run_in_executor(
None, agent.run, data["data"]["data"]["content"]
)
finally:
# Signal that typing simulation should stop
processing_event.set()
# Wait for the typing task to finish
await typing_task
# Send the generated output as a message
await send_message(data["channel_id"], f"{output}")
# Define an async function for the main workflow
async def main():
try:
print(f"Connecting to {WEBUI_URL}...")
await sio.connect(
WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"]
)
print("Connection established!")
except Exception as e:
print(f"Failed to connect: {e}")
return
# Callback function for user-join
async def join_callback(data):
events(data["id"]) # Attach the event handlers dynamically
# Authenticate with the server
await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback)
# Wait indefinitely to keep the connection open
await sio.wait()
# Actually run the async `main` function using `asyncio`
if __name__ == "__main__":
asyncio.run(main())
+103
View File
@@ -0,0 +1,103 @@
# WARNING: This might not work in the future. Do NOT use this in production.
import asyncio
import socketio
from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool
from env import WEBUI_URL, TOKEN
from utils import send_message, send_typing
# search_tool = DuckDuckGoSearchTool()
MODEL_ID = "llama3.2:latest"
model = LiteLLMModel(
model_id=f"openai/{MODEL_ID}", api_base=f"{WEBUI_URL}/api/", api_key=TOKEN
)
agent = CodeAgent(
tools=[], model=model, additional_authorized_imports=["requests", "bs4"]
)
# Create an asynchronous Socket.IO client instance
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
# Event handlers
@sio.event
async def connect():
print("Connected!")
@sio.event
async def disconnect():
print("Disconnected from the server!")
# Define a function to handle channel events
def events(user_id):
@sio.on("channel-events")
async def channel_events(data):
if data["user"]["id"] == user_id:
# Ignore events from the bot itself
return
if data["data"]["type"] == "message":
print(f'{data["user"]["name"]}: {data["data"]["data"]["content"]}')
# Send typing events every second while processing the input
async def simulate_typing(channel_id):
try:
while not processing_event.is_set():
await send_typing(sio, channel_id)
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
# Create an asyncio.Event to manage typing simulation
processing_event = asyncio.Event()
typing_task = asyncio.create_task(simulate_typing(data["channel_id"]))
try:
# Run the blocking agent.run in a non-blocking way using asyncio
loop = asyncio.get_running_loop()
output = await loop.run_in_executor(
None, agent.run, data["data"]["data"]["content"]
)
finally:
# Signal that typing simulation should stop
processing_event.set()
# Wait for the typing task to finish
await typing_task
# Send the generated output as a message
await send_message(data["channel_id"], f"{output}")
# Define an async function for the main workflow
async def main():
try:
print(f"Connecting to {WEBUI_URL}...")
await sio.connect(
WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"]
)
print("Connection established!")
except Exception as e:
print(f"Failed to connect: {e}")
return
# Callback function for user-join
async def join_callback(data):
events(data["id"]) # Attach the event handlers dynamically
# Authenticate with the server
await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback)
# Wait indefinitely to keep the connection open
await sio.wait()
# Actually run the async `main` function using `asyncio`
if __name__ == "__main__":
asyncio.run(main())
+61
View File
@@ -0,0 +1,61 @@
import asyncio
import socketio
from env import WEBUI_URL, TOKEN
from utils import send_message, send_typing
# Create an asynchronous Socket.IO client instance
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
# Event handlers
@sio.event
async def connect():
print("Connected!")
@sio.event
async def disconnect():
print("Disconnected from the server!")
# Define a function to handle channel events
def events(user_id):
@sio.on("channel-events")
async def channel_events(data):
if data["user"]["id"] == user_id:
# Ignore events from the bot itself
return
if data["data"]["type"] == "message":
print(f'{data["user"]["name"]}: {data["data"]["data"]["content"]}')
await send_typing(sio, data["channel_id"])
await asyncio.sleep(1) # Simulate a delay
await send_message(data["channel_id"], "Pong!")
# Define an async function for the main workflow
async def main():
try:
print(f"Connecting to {WEBUI_URL}...")
await sio.connect(
WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"]
)
print("Connection established!")
except Exception as e:
print(f"Failed to connect: {e}")
return
# Callback function for user-join
async def join_callback(data):
events(data["id"]) # Attach the event handlers dynamically
# Authenticate with the server
await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback)
# Wait indefinitely to keep the connection open
await sio.wait()
# Actually run the async `main` function using `asyncio`
if __name__ == "__main__":
asyncio.run(main())
+33
View File
@@ -0,0 +1,33 @@
import aiohttp
import socketio
from env import WEBUI_URL, TOKEN
async def send_message(channel_id: str, message: str):
url = f"{WEBUI_URL}/api/v1/channels/{channel_id}/messages/post"
headers = {"Authorization": f"Bearer {TOKEN}"}
data = {"content": str(message)}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=data) as response:
if response.status != 200:
# Raise an exception if the request fails
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=await response.text(),
headers=response.headers,
)
# Return response JSON if successful
return await response.json()
async def send_typing(sio: socketio.AsyncClient, channel_id: str):
await sio.emit(
"channel-events",
{
"channel_id": channel_id,
"data": {"type": "typing", "data": {"typing": True}},
},
)