Блокирующие операции в Tornado

/
2014-03-06 14:04
Просмотры: 1838

Tornado frameworkВ рассылке Tornado возникло обсуждение на счет блокирующих операций и способов решения обхода блокировки при выполнении. Оказывается, есть 3 приемлемые варианты, в порядке возрастания сложности:

- Оптимизация блокирующих вызовов. Зачастую это медленные запросы к базе данных или чрезмерно сложные шаблоны для отображения контента. В первую очередь нужно подумать над ускорением этих операций. Они могут занимать до 99% времени выполнения программы.

- Выполнение медленных задач в отдельных потоках или процессах. Это означает, что можно разгрузить задачу выполнением иной задачи в другом треде или процессе по отношению к одному работающему IOLoop, который после освобождения способен принимать другие запросы.

- Использование асинхронных драйверов или библиотек для запуска задач или выполнения операций. Например, gevent, motor или что то подобное.

Этот пост о втором способе реализации с частичным использованием Python пакета concurrent.futures.

Для примера, рассмотрим веб сервер с простым блокирующим обработчиком HTTP запросов(SleepHandler):

import time
import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):

    def get(self):
        self.write("Hello, world %s" % time.time())


class SleepHandler(tornado.web.RequestHandler):

    def get(self, n):
        time.sleep(float(n))
        self.write("Awake! %s" % time.time())


application = tornado.web.Application([
    (r"/", MainHandler),
    (r"/sleep/(\d+)", SleepHandler),
])


if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Попробуйте открыть в одной вкладке браузера http://localhost:8888/sleep/10, а в другой http://localhost:8888/. Вы увидите, что "Hello, world" не выведется во второй вкладке, это произойдет лишь после того, как закончиться обработка запроса в первой вкладке, то есть через 10 секунд. Фактически первый вызов блокирует IOLoop, как результат, обработка во второй вкладке невозможна.

Вы можете сделать "SleepHandler" дружественным по отношению к Tornado, вынести выполнение в отдельный тред. Ниже приведен декоратор, который может использоваться как "неблокирующий":

from concurrent.futures import ThreadPoolExecutor
from functools import partial, wraps
import tornado.ioloop
import tornado.web


EXECUTOR = ThreadPoolExecutor(max_workers=4)


def unblock(f):

    @tornado.web.asynchronous
    @wraps(f)
    def wrapper(*args, **kwargs):
        self = args[0]

        def callback(future):
            self.write(future.result())
            self.finish()

        EXECUTOR.submit(
            partial(f, *args, **kwargs)
        ).add_done_callback(
            lambda future: tornado.ioloop.IOLoop.instance().add_callback(
                partial(callback, future)))

    return wrapper


class SleepHandler(tornado.web.RequestHandler):

    @unblock
    def get(self, n):
        time.sleep(float(n))
        return "Awake! %s" % time.time()

Все очень просто, декоратор @unblock отправляет выполнение декорируемой функции в пул потоков, который возвращает future результат; обратный вызов возвращает управление в IOLoop, передавая future результат выполнения функции, который в конечном счете вызывает self.finish и завершает обработку запроса.

Обратите внимание, что декорируемая функция должна быть декорируемая tornado.web.asynchronous, для того что бы self.finish не вызвался раньше, чем нужно. Кроме того, self.write не потоко безопасна, поэтому должна вызываться в главном потоке с результатом future в качестве параметра.

Полный код:

from concurrent.futures import ThreadPoolExecutor
from functools import partial, wraps
import time
 
import tornado.ioloop
import tornado.web
 
 
EXECUTOR = ThreadPoolExecutor(max_workers=4)
 
 
def unblock(f):
 
@tornado.web.asynchronous
@wraps(f)
    def wrapper(*args, **kwargs):
    self = args[0]
     
    def callback(future):
    self.write(future.result())
    self.finish()
 
    EXECUTOR.submit(
        partial(f, *args, **kwargs)
    ).add_done_callback(
        lambda future: tornado.ioloop.IOLoop.instance().add_callback(
            partial(callback, future)))
     
    return wrapper
 
 
class MainHandler(tornado.web.RequestHandler):
 
    def get(self):
        self.write("Hello, world %s" % time.time())
 
 
class SleepHandler(tornado.web.RequestHandler):
 
    @unblock
    def get(self, n):
        time.sleep(float(n))
        return "Awake! %s" % time.time()
 
 
class SleepAsyncHandler(tornado.web.RequestHandler):
 
@tornado.web.asynchronous
def get(self, n):
 
    def callback(future):
        self.write(future.result())
        self.finish()
 
    EXECUTOR.submit(
        partial(self.get_, n)
    ).add_done_callback(
        lambda future: tornado.ioloop.IOLoop.instance().add_callback(
            partial(callback, future)))
 
    def get_(self, n):
        time.sleep(float(n))
        return "Awake! %s" % time.time()
 
application = tornado.web.Application([
    (r"/", MainHandler),
    (r"/sleep/(\d+)", SleepHandler),
    (r"/sleep_async/(\d+)", SleepAsyncHandler),
])
 
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Источник: Blocking tasks in Tornado

Добавить комментарий

comments powered by Disqus