Долгие сообщения в RabbitMQ
Предположим, что у вас появилось желание перекодировать фильмы на вашем медиасервере, и вы решили использовать production-ready решение для хранения заданий. Вы взяли RabbitMQ для управления очередями сообщений и Python для их обработки. Но почему-то сообщения обрабатываются нестабильно, клиент падает без всяких видимых причин. Попробуем понять почему такое может быть.
Возьмём готовый код из официального туториала RabbitMQ и немного его модифицируем, чтобы он обрабатывал сообщения за различное время.
Producer
#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = "Aleksey Lobanov"
__license__ = "MIT"
import sys
import pika
if __name__ == "__main__":
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='demo.hello')
# чтение первого аргумента командной строки, если он есть
if len(sys.argv) > 1:
delay_to_send = int(sys.argv[1])
else:
delay_to_send = 3
channel.basic_publish(
exchange='',
routing_key='demo.hello',
body=str(delay_to_send)
)
print(" [x] Sent " + str(delay_to_send))
connection.close()
Consumer
#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = "Aleksey Lobanov"
__license__ = "MIT"
import sys
import time
import math
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
delay = int(body)
begin_at = time.time()
time.sleep(delay)
print(" [x] Finished {}".format(body))
if __name__ == "__main__":
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='demo.hello')
channel.basic_consume(
callback,
queue='demo.hello',
no_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Пока мы посылаем маленькие числа, всё будет хорошо работать. Но если послать какое-то большое (в моём случае достаточно 200), то consumer потеряет соединение с сервером, будет ошибка. Скорее всего это будет pika.exceptions.ConnectionClosed: (-1, 'EOF')
или Socket Error 104
(тут есть обсуждение на GitHub библиотеки, но установка prefetch_count=1
тоже не поможет). Эта проблема актуальна для обоих веток Python.
Реальная причина в том, что при обработке сообщения не происходит необходимого взаимодействия с RabbitMQ, не отправляются hearbeats, а без них сервер считает, что этот клиент погиб окончательно. Нужно отметить, что переход на другой тип соединения не помогает. Например использование примера на Twisted из официальной документации ничего не изменит.
У этой проблемы есть много решений. Будем считать, что сообщение, которое мы обрабатываем не разделяется на подзадачи и рассмотрим некоторые из них:
- Отключить hearbeats/увеличить их интервал так, чтобы самprefetch_countая долгая обработка сообщения вела к потере не более, чем одного. Это самое простое решение, но в таком случае мы теряем в надёжности. Внешнем сервисам мониторинга будет сложнее понять, обрабатываются ли сейчас сообщения или уже нет. И чем больше интервал, тем серьёзнее проблема. Не подходит
- Разбить обработку сообщения на несколько этапов. Часто это хорошее решение, но в предположении, что данная обработка не разбивается на более мелкие тоже не подходит.
- Использовать
connection.sleep
вместоtime.sleep
, а также регулярный вызовBlockingConnection.process_data_events
. Оба эти решения помогают, но их использование плохо по многим причинам. Во-первых это явное протекании абстракции, когда код для обработки сообщения вынужден работать с очередью. Во-вторых не всегда можно гарантировать, что эти функции вызываются достаточно часто, а это главная проблема. Не подходит - Выделить отдельный процесс под
pika
. Вероятно, единственно универсальное решение. Если исходный код был правильно написан, то адаптация будет простой. Но у этого решения есть минусы, обязательные при использовании нескольких потоков/процессов. Также сама библиотекаpika
не является потокобезопасной.
Возможное решение будет заключаться в вынесение обработки данных в отдельный метод:
def real_work(body):
delay = int(body)
begin_at = time.time()
time.sleep(delay)
и небольшой доработке callback-метода:
# Конструктор, при запуске процесс запустит
# функцию с указанными параметрами
work_process = multiprocessing.Process(
target=real_work,
args=(body, )
)
work_process.start() # процесс нужно явно запустить
while True:
ch.connection.sleep(1)
# пока поток работает, используем функцию pika
# для обработки необходимых сообщений
if not work_process.is_alive():
break
Тогда готовый код будет выглядеть так:
#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = "Aleksey Lobanov"
__license__ = "MIT"
import sys
import time
import math
import multiprocessing
import pika
def real_work(body):
delay = int(body)
begin_at = time.time()
time.sleep(delay)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
work_process = multiprocessing.Process(
target=real_work,
args=(body, )
)
work_process.start()
while True:
ch.connection.sleep(1)
if not work_process.is_alive():
break
print(" [x] Finished " + body)
if __name__ == "__main__":
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='demo.hello')
channel.basic_consume(
callback,
queue='demo.hello',
no_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()