Предположим, что у вас появилось желание перекодировать фильмы на вашем медиасервере, и вы решили использовать production-ready решение для хранения заданий. Вы взяли RabbitMQ для управления очередями сообщений и Python для их обработки. Но почему-то сообщения обрабатываются нестабильно, клиент падает без всяких видимых причин. Попробуем понять почему такое может быть.

Возьмём готовый код из официального туториала RabbitMQ и немного его модифицируем, чтобы он обрабатывал сообщения за различное время.

Producer

#!/usr/bin/python
# -*- coding: utf-8 -*-

__author__ = "Aleksey Lobanov"
__license__ = "MIT"

import sys

import pika

if __name__ == "__main__":
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost')
    )
    channel = connection.channel()


    channel.queue_declare(queue='demo.hello')

    # чтение первого аргумента командной строки, если он есть
    if len(sys.argv) > 1:
        delay_to_send = int(sys.argv[1])
    else:
        delay_to_send = 3
    channel.basic_publish(
        exchange='',
        routing_key='demo.hello',
        body=str(delay_to_send)
    )
    print(" [x] Sent " + str(delay_to_send))
    connection.close()

Consumer

#!/usr/bin/python
# -*- coding: utf-8 -*-

__author__ = "Aleksey Lobanov"
__license__ = "MIT"

import sys
import time
import math

import pika


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    delay = int(body)
    begin_at = time.time()
    time.sleep(delay)
    print(" [x] Finished {}".format(body))


if __name__ == "__main__":
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='demo.hello')

    channel.basic_consume(
        callback,
        queue='demo.hello',
        no_ack=True
    )

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

Пока мы посылаем маленькие числа, всё будет хорошо работать. Но если послать какое-то большое (в моём случае достаточно 200), то consumer потеряет соединение с сервером, будет ошибка. Скорее всего это будет pika.exceptions.ConnectionClosed: (-1, 'EOF') или Socket Error 104 (тут есть обсуждение на GitHub библиотеки, но установка prefetch_count=1 тоже не поможет). Эта проблема актуальна для обоих веток Python.

Реальная причина в том, что при обработке сообщения не происходит необходимого взаимодействия с RabbitMQ, не отправляются hearbeats, а без них сервер считает, что этот клиент погиб окончательно. Нужно отметить, что переход на другой тип соединения не помогает. Например использование примера на Twisted из официальной документации ничего не изменит.

У этой проблемы есть много решений. Будем считать, что сообщение, которое мы обрабатываем не разделяется на подзадачи и рассмотрим некоторые из них:

  1. Отключить hearbeats/увеличить их интервал так, чтобы самprefetch_countая долгая обработка сообщения вела к потере не более, чем одного. Это самое простое решение, но в таком случае мы теряем в надёжности. Внешнем сервисам мониторинга будет сложнее понять, обрабатываются ли сейчас сообщения или уже нет. И чем больше интервал, тем серьёзнее проблема. Не подходит
  2. Разбить обработку сообщения на несколько этапов. Часто это хорошее решение, но в предположении, что данная обработка не разбивается на более мелкие тоже не подходит.
  3. Использовать connection.sleep вместо time.sleep, а также регулярный вызов BlockingConnection.process_data_events. Оба эти решения помогают, но их использование плохо по многим причинам. Во-первых это явное протекании абстракции, когда код для обработки сообщения вынужден работать с очередью. Во-вторых не всегда можно гарантировать, что эти функции вызываются достаточно часто, а это главная проблема. Не подходит
  4. Выделить отдельный процесс под pika. Вероятно, единственно универсальное решение. Если исходный код был правильно написан, то адаптация будет простой. Но у этого решения есть минусы, обязательные при использовании нескольких потоков/процессов. Также сама библиотека pika не является потокобезопасной.

Возможное решение будет заключаться в вынесение обработки данных в отдельный метод:

def real_work(body):
    delay = int(body)
    begin_at = time.time()
    time.sleep(delay)

и небольшой доработке callback-метода:

# Конструктор, при запуске процесс запустит
# функцию с указанными параметрами
work_process = multiprocessing.Process(
    target=real_work,
    args=(body, )
)
work_process.start()  # процесс нужно явно запустить
while True:
    ch.connection.sleep(1)
    # пока поток работает, используем функцию pika
    # для обработки необходимых сообщений
    if not work_process.is_alive():
        break

Тогда готовый код будет выглядеть так:

#!/usr/bin/python
# -*- coding: utf-8 -*-

__author__ = "Aleksey Lobanov"
__license__ = "MIT"

import sys
import time
import math
import multiprocessing

import pika


def real_work(body):
    delay = int(body)
    begin_at = time.time()
    time.sleep(delay)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    work_process = multiprocessing.Process(
        target=real_work,
        args=(body, )
    )
    work_process.start()
    while True:
        ch.connection.sleep(1)
        if not work_process.is_alive():
            break

    print(" [x] Finished " + body)


if __name__ == "__main__":
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='demo.hello')

    channel.basic_consume(
        callback,
        queue='demo.hello',
        no_ack=True
    )

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()