Cat

Бот-автоответчик с ChatGPT для Бизнес-аккаунта в Telegram на Aiogram 3

В этом посте, мы напишем бота для "Telegram для бизнеса", который будет отвечать на личные сообщения при помощи ChatGPT.

Все статьи

Icon Link

Реклама

Icon Link
Telegram-бот на AIOgram3 proDream 22 Май 2024 Просмотров: 964

Не так давно в Telegram вышло большое обновление - "Telegram для бизнеса". В данный момент оно доступно для Premium-пользователей, а в будущем, вероятно, станет отдельным режимом.

"Telegram для бизнеса" предоставляет собой новый способ взаимодействия с клиентами через Telegram, вводя для этого новые функции:

  • Адрес - Позволяет указать адрес и геопозицию в профиле.
  • Часы работы - Позволяет указать график работы бизнеса.
  • Быстрые ответы - Позволяет создать набор "шаблонных" ответов.
  • Приветствия - Позволяет установить автоматическое приветствие для новых клиентов.
  • "Нет на месте" - Позволяет отправлять автоматические ответы, в нерабочее время.
  • Ссылки на чат - Позволяет кастомизировать ссылки на чат с вами.
  • Вид нового чата - Позволяет кастомизировать вид чата для клиента, который открыл чат с вами, но ещё не написал сообщение.
  • Чат-боты - Позволяет подключить к учётной записи бота для взаимодействия с клиентами в личных чатах.

 

Из всего этого набора нас интересует только два пункта: Чат-боты и Часы работы.

 

Что мы с вами сделаем?

В этом посте мы создадим Telegram-бота, который будет принимать личные сообщения только в нерабочее время и для ответа использовать ChatGPT от OpenAI.

Поскольку OpenAI недоступен на территории РФ, вместо него будем использовать сервис NeuroAPI. Он предоставляет доступ к OpenAI из России и СНГ по более низким ценам.

 

Как это можно использовать?

Описанный в посте бот можно будет использовать как частному лицу, сделав личного ассистента на время отсутствия в сети, так и бизнесу для взаимодействия с клиентами в нерабочее время.

Главная сложность будет заключаться в составлении грамотного "системного промта", покрывающего ваши потребности.

 

Подключение бота в профиле.

Для проекта вам нужен бот, как его создать рассказано в посте "AIOgram3 1.5. Регистрация бота"

После создания бота и получения токена, в интерфейсе BotFather, выполните команду /mybots для вывода списка всех ботов.
Выберите нужного бота.

Затем в открывшемся меню выберите пункт "Bot Settings".

 

В следующем меню выберите пункт "Business Mode".

 

Включите бизнес режим.

После того, как включили бизнес режим для бота, откройте настройки Telegram и выберите пункт "Telegram для бизнеса", а в нём пункт "Чат-боты".

В открывшемся окне в первое поле пропишите ссылку на бота t.me/mybot или его имя @mybot.

 

Готово.

 

Подготовка проекта.

Создайте новый проект в удобной для вас IDE и активируйте виртуальное окружение.

Если вы пользуетесь PyCharm, то виртуальное окружение создаст IDE для нового проекта.
Если вы пользуетесь VSCode, то его придётся создать вручную, выполнив следующие команды:

python -m venv .venv

# для Windows
venv\Scripts\activate.ps1 или venv\Scripts\activate.bat

# для *NIX-систем
source venv/bin/activate

 

В проекте используются следующие библиотеки:

  • aiogram - Фреймворк для бота.
  • pydantic-settings - Библиотека для создания классов конфигураций.
  • openai - Официальная библиотека OpenAI для Python.
  • pytz - Библиотека для работы с часовыми поясами.
  • httpx - Современная библиотека для создания синхронных/асинхронных запросов.
  • redis - Библиотека для подключения к Redis.

Установите их, выполнив команду:

pip install -U aiogram pydantic-settings openai pytz httpx redis

 

Создайте файл requirements.txt и внесите в него установленные библиотеки:

aiogram==3.6.0
pydantic-settings==2.2.1
openai==1.29.0
pytz==2024.1
httpx==0.27.0
redis==5.0.4

 

Далее создайте файл .env для хранения переменных окружения.
Необходимы следующие переменные:

  • token - Токен бота, полученный от BotFather.
  • admin_id - Telegram-id администратора.
  • openai_key - API-ключ полученный на сайте NeuroAPI или OpenAI.
  • openai_base_url - Адрес прокси-сервера для OpenAI.
  • redis_host - Хост для подключения к Redis. В нашем случае используется Docker compose, поэтому прописываем имя сервиса - redis.
  • delay - Задержка между ответами в минутах. Об этом ниже.

 

Пример:

token=12345:abcd
admin_id=123456789
openai_key=sk-abcd
openai_base_url=https://lk.neuroapi.host/v1
redis_host=redis  
delay=10

 

Также создайте файл main.py и пакет (Python package) app.

 

Файл конфигурации.

В пакете app создайте файл settings.py.
В нём будем получать данные из .env-файла и определим инстанс бота и Redis.

Создайте класс Secrets, унаследованный от BaseSettings. Этот класс будет получать из .env-файла данные и преобразовывать их в Python-объекты. Для этого используется библиотека pydantic-settings.

В теле класса пропишите шесть полей с указанием типа данных:

token: str
admin_id: int
openai_key: str
openai_base_url: str
redis_host: str
delay: int

 

После полей, внутри класса напишите внутренний класс Config, в котором укажите из какого файла брать данные и его кодировку:

class Config:  
    env_file = ".env"  
    env_file_encoding = "utf-8"

 

Под классом создадим переменную secrets и объявим её экземпляром класса Secrets.

Далее создайте переменную redis_conn, это будет экземпляр класса Redis, в который передаём адрес хоста. Будьте внимательны во время импорта класса! Нам нужен асинхронный Redis.

redis_conn = Redis(host=secrets.redis_host)

 

Последней будет переменная bot. Объявите её экземпляром класса Bot, передав в него токен и режим форматирования сообщений.

bot = Bot(token=secrets.token, parse_mode="Markdown")

 

Про parse_mode: Поскольку в ответе ChatGPT может находиться блок кода или другое форматирование, для корректного отображения его необходимо "распарсить". Передав параметр parse_mode="Markdown", мы сообщаем боту, что все сообщения будут с Markdown-форматированием.

 

Полный код файла:

from aiogram import Bot  
from pydantic_settings import BaseSettings  
from redis.asyncio import Redis  


class Secrets(BaseSettings):  
    token: str  
    admin_id: int  
    openai_key: str  
    openai_base_url: str
    redis_host: str  
    delay: int  

    class Config:  
        env_file = ".env"  
        env_file_encoding = "utf-8"  


secrets = Secrets()  

redis_conn = Redis(host=secrets.redis_host)  

bot = Bot(token=secrets.token, parse_mode="Markdown")

 

Хранилище строк.

Для хранения текстовых строк в одном месте в пакете app создайте файл views.py.

Этого можно и не делать. Кроме того, вариант с функциями можно заменить на получение текста из файла или иной способ.

Создайте три простые функции, которые ничего не принимают и возвращают текстровую строку:

  • start_bot_message - Сообщение о запуске бота для администратора.
  • stop_bot_message - Сообщение об остановке бота для администратора.
  • system_prompt - Системный промт, описывающий поведение ChatGPT.

 

Код:

def start_bot_message():  
    return "Бот запущен"  


def stop_bot_message():  
    return "Бот остановлен"  


def system_prompt():  
    return """Ты бот помощник и ты должен помогать людям."""

 

Проверка рабочего времени.

В Telegram часы работы указываются по дням с понедельника по воскресенье. В коде же это выглядит как список объектов класса BusinessOpeningHoursInterval.

В объекте класса BusinessOpeningHoursInterval есть два поля: opening_minute и closing_minute, представленные в виде количества минут прошедших с 00:00 ближайшего понедельника, с учётом указанной временной зоны.

Необходимо получить текущее количество минут, прошедших с понедельника, и пройтись по списку, проверяя, входит ли текущее число в один из диапазонов.
Если входит, то бот будет игнорировать сообщения.
Если не входит, бот будет отвечать на сообщения.

В пакете app, создайте новый пакет utils. В этом пакете создайте файл opening_hours.py.

Создайте функцию check_opening_hours, принимающую opening_hours - объект класса BusinessOpeningHours.

Класс BusinessOpeningHours содержит два поля:

  • time_zone_name - Название временной зоны. Определяется в профиле Telegram при заполнении графика работы.
  • opening_hours - Упомянутый выше список с объектами класса BusinessOpeningHoursInterval.

Далее создайте четыре переменные:

  1. tz - В ней при помощи библиотеки pytz получаем информацию об указанной временной зоне.
  2. now - В ней получаем текущее время с учётом временной зоны.
  3. monday_start - В ней высчитываем время до начала понедельника.
  4. minutes_since_monday - В ней высчитываем сколько прошло минут с начала недели.
tz = pytz.timezone(opening_hours.time_zone_name)  
now = datetime.datetime.now(tz)  
monday_start = now - datetime.timedelta(  
    days=now.weekday(),  
    hours=now.hour,  
    minutes=now.minute,  
    seconds=now.second,  
    microseconds=now.microsecond,  
)  
minutes_since_monday = (now - monday_start).total_seconds() / 60

 

Далее создайте цикл, в котором будем итерироваться по списку интервалов и проверять, входит ли текущее время в этот список.

for day in opening_hours.opening_hours:  
    if day.opening_minute <= minutes_since_monday <= day.closing_minute:  
        return False  

return True

 

Полный код:

import datetime  

import pytz  
from aiogram.types import BusinessOpeningHours  


def check_opening_hours(opening_hours: BusinessOpeningHours):  
    tz = pytz.timezone(opening_hours.time_zone_name)  
    now = datetime.datetime.now(tz)  
    monday_start = now - datetime.timedelta(  
        days=now.weekday(),  
        hours=now.hour,  
        minutes=now.minute,  
        seconds=now.second,  
        microseconds=now.microsecond,  
    )  
    minutes_since_monday = (now - monday_start).total_seconds() / 60  

    for day in opening_hours.opening_hours:  
        if day.opening_minute <= minutes_since_monday <= day.closing_minute:  
            return False  

    return True

 

Проверка входящих сообщений.

При получении входящего сообщения необходимо проверить актуальный режим работы и либо передать сообщение дальше в обработчик, либо "сбросить" его, тем самым никак не реагируя.

Для этого будем использовать миддлвари (middleware) - это так называзываемые "посредники", срабатывающие до передачи сообщения в обработчик и в зависимости от логики выполняющие различные действия, например, запись в БД, проверку аутентификации и многое другое.

В пакете app создайте пакет middlewares. В нём создайте файл business_middleware.py.

В этом файле создайте класс BusinessMiddleware, унаследованный от BaseMiddleware.

В нём нам нужно переопределить dunder-метод __call__, принимающий self, handler, event, data.

Далее нам необходимо получить из текущего чата объект класса BusinessOpeningHours.

Для получения актуального графика работы мы обратимся к объекту бота из файла settings и вызовем метод .get_chat, передав в него Telegram-ID администратора.

Затем в переменную hours помещаем объект класса BusinessOpeningHours.

from app.settings import secrets, bot


chat = await bot.get_chat(secrets.admin_id)  
hours = chat.business_opening_hours  

 

Далее в блоке if вызываем ранее написанную функцию check_opening_hours, передав в неё переменную hours.

Если возвращается True, мы продолжаем.

Внутри условия создаём переменную context, в которую присваиваем значение ключа event_context из переменной data.

Дальше ещё одно условие if, в котором проверяем, что сообщение содержит business_connection_id, т.е. является личным и что отправитель сообщения не админ, иначе бот будет реагировать и на ваши сообщения тоже.
Если условия соблюдаются, передаём сообщение дальше в обработчик.

if check_opening_hours(full_chat.business_opening_hours):  
    context: EventContext = data.get("event_context")  

    if (  
        context.user.id != secrets.admin_id  
        and context.business_connection_id  
    ):  
        return await handler(event, data)

 

Полный код файла:

from typing import Callable, Dict, Any, Awaitable  
  
import httpx  
from aiogram import BaseMiddleware  
from aiogram.dispatcher.middlewares.user_context import EventContext  
from aiogram.types import TelegramObject, ChatFullInfo  
  
from app.settings import secrets  
from app.utils.opening_hours import check_opening_hours  
  
  
class BusinessMiddleware(BaseMiddleware):  
    async def __call__(  
        self,  
        handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],  
        event: TelegramObject,  
        data: Dict[str, Any],  
    ) -> Any:  
        async with httpx.AsyncClient() as client:  
            response = await client.get(  
                f"https://api.telegram.org/bot{secrets.token}/getChat?chat_id={secrets.admin_id}"  
            )  
            chat = response.json()  
            full_chat = ChatFullInfo(**chat["result"])  
  
            if check_opening_hours(full_chat.business_opening_hours):  
                context: EventContext = data.get("event_context")  
  
                if (  
                    context.user.id != secrets.admin_id  
                    and context.business_connection_id  
                ):  
                    return await handler(event, data)

 

Подключение ChatGPT.

В этой функции будем отправлять запрос к ChatGPT и возвращать полученный ответ.

В пакете utils, создайте файл openai_actions.py.

Создайте асинхронную функцию get_chat_completion, принимающую message - объект класса Message.

В переменной http_client определите объект класса httpx.AsyncClient. Это объект HTTP-клиента, используя который будет произведён запрос.

В переменной client определите объект класса AsyncOpenAI, передав в него аргументы: api_key, http_client и base_url. Это объект клиента для OpenAI.

http_client = httpx.AsyncClient(  
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)  
)  
client = AsyncOpenAI(  
    api_key=secrets.openai_key,  
    http_client=http_client,  
    base_url=secrets.openai_base_url,  
)

 

Далее в переменной messages создайте список словарей, где первый словарь – это системный промт, а второй – сообщение от пользователя:

messages = [  
    {"role": "system", "content": system_prompt()},  
    {"role": "user", "content": message.text},  
]

 

В переменную response создайте запрос, передав в него:

  • model - Выбранная модель ChatGPT, например, gpt-3.5-turbo, gpt-4-turbo, gpt-4o или любую другую поддерживаемую OpenAI.
  • messages - Список словарей с сообщениями.
  • max_tokens - Ограничение на максимальное количество токенов в ответе.
  • temperature - Температура в диапазоне от 0 до 1. Определяет уровень "фантазии" бота. Чем ближе число к нулю, тем более предсказуемы будут ответы и наоборот, чем ближе к единице, тем более случайными будут ответы.

И возвращаем результат запроса в обработчик:

response = await client.chat.completions.create(  
    model="gpt-3.5-turbo", messages=messages, max_tokens=1000, temperature=0.8  
)  

return response.choices[0].message.content

 

Полный код:

import httpx  
from aiogram.types import Message  
from openai import AsyncOpenAI  

from app.settings import secrets  
from app.views import system_prompt  


async def get_chat_completion(message: Message):  
    http_client = httpx.AsyncClient(  
        limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)  
    )  
    client = AsyncOpenAI(  
        api_key=secrets.openai_key,  
        http_client=http_client,  
        base_url=secrets.openai_base_url,  
    )  

    messages = [  
        {"role": "system", "content": system_prompt()},  
        {"role": "user", "content": message.text},  
    ]  

    response = await client.chat.completions.create(  
        model="gpt-3.5-turbo", messages=messages, max_tokens=1000, temperature=0.8  
    )  

    return response.choices[0].message.content

 

Задержка обработки сообщений.

Для того, чтобы пользователи не спамили и не использовали личные сообщения как "бесплатный GPT", добавим задержку в обработке сообщений.

В вашей реализации логики она может быть не нужна.

В пакете utils создайте файл check_delay.py, а в нём асинхронную функцию check_user_delay, принимающую user_id.

Тут-то нам и понадобится Redis для хранения пользовательских ID и времени последнего сообщения. Вы можете использовать для этого другую БД или вовсе словарь в коде, это не принципиально.

В переменную last_message_time получаем из Redis по user_id время последнего сообщения, если оно есть. Если его нет - вернётся None.

В блоке if проверяем, что last_message_time True (проще говоря, не None).
Внутри блока в переменную time_since_last_message получаем разницу между текущим временем и полученным из хранилища.
Ниже проверяем, если оно меньше указанной в .env допустимой задержки, то возвращаем False.

Во всех остальных случаях возвращаем True.

 

Полный код:

import asyncio  

from app.settings import redis_conn, secrets  


async def check_user_delay(user_id: int):  
    last_message_time = await redis_conn.get(f"users:{user_id}")  
    if last_message_time:  
        time_since_last_message = asyncio.get_event_loop().time() - float(  
            last_message_time  
        )  
        if time_since_last_message < secrets.delay * 60:  
            return False  
    return True

 

Обработчик бизнес сообщений.

Осталось написать обработчик, в который middleware будет передавать сообщение.

В пакете app создайте пакет handlers, а в нём файл business_handler.py.

В этом файле создайте асинхронную функцию handle_business_message, принимающую message - объект класса Message.

В самом начале создайте блок if, проверяющий задержку и наличие текста в сообщении (отправить могут картинку или видео, а это другая логика работы с ChatGPT).

Если условие не выполняется, то сообщение просто игнорируется.

Если условие выполнено, переходим к обработке.

В переменной answer вызываем функцию get_chat_completion, передав в неё message.

Затем отвечаем пользователю полученным сообщением.

Сохраняем в Redis время текущего сообщения.

 

Полный код:

import asyncio  

from aiogram.types import Message  

from app.settings import redis_conn  
from app.utils.check_delay import check_user_delay  
from app.utils.openai_actions import get_chat_completion  


async def handle_business_message(message: Message):  
    if await check_user_delay(message.from_user.id) and message.text:  
        answer = await get_chat_completion(message)  
        await message.reply(answer)  
        await redis_conn.set(  
            f"users:{message.from_user.id}", asyncio.get_event_loop().time()  
        )

 

Обработка уведомлений о запуске/остановке бота.

Небольшое, но удобное дополнение.

В пакете handlers создайте файл events.py.

В нём создайте две асинхронные функции: start_bot и stop_bot.

В функциях отправляем сообщение администратору.

from app.settings import bot, secrets  
from app import views  


async def start_bot():  
    await bot.send_message(secrets.admin_id, views.start_bot_message())  


async def stop_bot():  
    await bot.send_message(secrets.admin_id, views.stop_bot_message())

 

Основной файл.

Логику написали. Теперь осталось соединить всё вместе.

Откройте созданный ранее файл main.py. Он должен находиться в корне проекта рядом с файлом .env.

В нём создайте асинхронную функцию start.

В переменной dp объявите экземпляр класса Dispatcher.

Далее в несколько строк зарегистрируйте middleware и обработчики:

dp = Dispatcher()  

dp.update.middleware(BusinessMiddleware())  

dp.startup.register(start_bot)  
dp.shutdown.register(stop_bot)  

dp.business_message.register(handle_business_message)

 

Обратите внимание на dp.business_message.register. Регистрируется обработка business_message, а не обычного message.

Далее в блоке try вызывается очистка сообщений, отправленных, когда бот был офлайн, и запуск пуллинга, а в блоке finally выполняется остановка бота.

Вне функции в блоке if __name__ == "__main__" запускаем функцию старт.

 

Полный код:

import asyncio  
  
from aiogram import Dispatcher  
from aiogram.methods import DeleteWebhook  
  
from app.handlers.business_handler import handle_business_message  
from app.handlers.events import start_bot, stop_bot  
from app.middlewares.business_middleware import BusinessMiddleware  
from app.settings import bot  
  
  
async def start():  
    dp = Dispatcher()  
  
    dp.update.middleware(BusinessMiddleware())  
  
    dp.startup.register(start_bot)  
    dp.shutdown.register(stop_bot)  
  
    dp.business_message.register(handle_business_message)  
  
    try:  
        await bot(DeleteWebhook(drop_pending_updates=True))  
        await dp.start_polling(bot)  
    finally:  
        await bot.session.close()  
  
  
if __name__ == "__main__":  
    asyncio.run(start())

 

Запуск бота.

Для запуска бота и Redis будем использовать Docker compose.

Сперва необходимо создать образ с ботом, для этого создайте файл Dockerfile со следующим содержимым:

FROM python:3.11-slim  

WORKDIR /code  

COPY requirements.txt /code  

RUN pip install --upgrade pip && pip install -r requirements.txt  

COPY . /code  

CMD [ "python", "./main.py" ]

 

В нём создаётся Docker-образ, в котором устанавливаются все зависимости из файла requirements.txt. Затем копируются файлы проекта и выполняется команда запуска бота.

Затем создайте файл docker-compose.yaml со следующим содержимым:

services:  
  bot:  
    build: .  
    restart: always  
    env_file:  
      - .env  
    volumes:  
      - .:/code  

  redis:  
    image: redis  
    restart: always  
    volumes:  
      - ./redis_data:/data  

 

В нём описываются два сервиса:

Первый bot. Указываем, что необходимо создать образ из Dockerfile, передать в него .env-файл и подключить текущую папку внутри контейнера.

Второй redis. Указываем, что будет использоваться официальный образ redis последней версии, и подключаем папку redis_data внутри контейнера, чтобы не потерять данные.

Готово. 

Запустить бота можно командой:

docker compose up -d

Автор

    Нет комментариев

    Реклама