GOOGLE ADS

lundi 2 mai 2022

Faire une tornade pour servir une demande sur un fil séparé

J'ai un service Web écrit en Flask, enveloppé dans un WSGIContaineret servi par Tornado en utilisant son FallbackHandlermécanisme. L'une de mes routes dans le service Web Flask exécute une opération très longue (elle prend environ 5 minutes), et lorsque cette route est déclenchée, tous les autres appels vers n'importe quelle route sont bloqués jusqu'à la fin de l'opération. Comment puis-je contourner ce problème ?

Voici comment mon application Flask est servie avec Tornado:

parse_command_line()
frontend_path = os.path.join(os.path.dirname(__file__),"..","webapp")
rest_app = WSGIContainer(app)
tornado_app = Application(
[
(r"/api/(.*)", FallbackHandler, dict(fallback=rest_app)),
(r"/app/(.*)", StaticFileHandler, dict(path=frontend_path))
]
)


Solution du problème

J'ai créé une coutume WSGIHandlerqui prend en charge les requêtes multithreads pour les applications WSGI dans Tornado en utilisant un fichier ThreadPoolExecutor. Tous les appels dans l'application WSGI sont effectués dans des threads séparés, de sorte que la boucle principale reste libre même si votre réponse WSGI prend beaucoup de temps. Le code suivant est basé sur ce Gist et étendu de sorte que :


  • Vous pouvez diffuser une réponse (à l'aide d'une réponse d'itérateur) ou des fichiers volumineux directement depuis l'application WSGI vers le client, de sorte que vous pouvez maintenir une faible utilisation de la mémoire même lorsque vous générez des réponses volumineuses.

  • Vous pouvez télécharger des fichiers volumineux. Si le corps de la demande dépasse 1 Mo, le corps entier de la demande est vidé dans un fichier temporaire qui est ensuite transmis à l'application WSGI.


Actuellement, le code n'a été testé qu'avec Python 3.4, donc je ne sais pas s'il fonctionne avec Python 2.7. Il n'a pas encore été testé sous contrainte, mais semble bien fonctionner jusqu'à présent.

# tornado_wsgi.py
import itertools
import logging
import sys
import tempfile
from concurrent import futures
from io import BytesIO
from tornado import escape, gen, web
from tornado.iostream import StreamClosedError
from tornado.wsgi import to_wsgi_str
_logger = logging.getLogger(__name__)
@web.stream_request_body
class WSGIHandler(web.RequestHandler):
thread_pool_size = 20
def initialize(self, wsgi_application):
self.wsgi_application = wsgi_application
self.body_chunks = []
self.body_tempfile = None
def environ(self, request):
"""
Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
"""
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
port = int(hostport[1])
else:
host = request.host
port = 443 if request.protocol == "https" else 80
if self.body_tempfile is not None:
body = self.body_tempfile
body.seek(0)
elif self.body_chunks:
body = BytesIO(b''.join(self.body_chunks))
else:
body = BytesIO()
environ = {
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
"PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None, plus=False)),
"QUERY_STRING": request.query,
"REMOTE_ADDR": request.remote_ip,
"SERVER_NAME": host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": request.version,
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.protocol,
"wsgi.input": body,
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if "Content-Type" in request.headers:
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
if "Content-Length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
for key, value in request.headers.items():
environ["HTTP_" + key.replace("-", "_").upper()] = value
return environ
def prepare(self):
# Accept up to 2GB upload data.
self.request.connection.set_max_body_size(2 << 30)
@gen.coroutine
def data_received(self, chunk):
if self.body_tempfile is not None:
yield self.executor.submit(lambda: self.body_tempfile.write(chunk))
else:
self.body_chunks.append(chunk)
# When the request body grows larger than 1 MB we dump all receiver chunks into
# a temporary file to prevent high memory use. All subsequent body chunks will
# be directly written into the tempfile.
if sum(len(c) for c in self.body_chunks) > (1 << 20):
self.body_tempfile = tempfile.NamedTemporaryFile('w+b')
def copy_to_file():
for c in self.body_chunks:
self.body_tempfile.write(c)
# Remove the chunks to clear the memory.
self.body_chunks[:] = []
yield self.executor.submit(copy_to_file)
@web.asynchronous
@gen.coroutine
def get(self):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data['status'] = status
data['headers'] = response_headers
return response.append
environ = self.environ(self.request)
app_response = yield self.executor.submit(self.wsgi_application, environ, start_response)
app_response = iter(app_response)
if not data:
raise Exception('WSGI app did not call start_response')
try:
exhausted = object()
def next_chunk():
try:
return next(app_response)
except StopIteration:
return exhausted
for i in itertools.count():
chunk = yield self.executor.submit(next_chunk)
if i == 0:
status_code, reason = data['status'].split(None, 1)
status_code = int(status_code)
headers = data['headers']
self.set_status(status_code, reason)
for key, value in headers:
self.set_header(key, value)
c = b''.join(response)
if c:
self.write(c)
yield self.flush()
if chunk is not exhausted:
self.write(chunk)
yield self.flush()
else:
break
except StreamClosedError:
_logger.debug('stream closed early')
finally:
# Close the temporary file to make sure that it gets deleted.
if self.body_tempfile is not None:
try:
self.body_tempfile.close()
except OSError as e:
_logger.warning(e)
if hasattr(app_response, 'close'):
yield self.executor.submit(app_response.close)
post = put = delete = head = options = get
@property
def executor(self):
cls = type(self)
if not hasattr(cls, '_executor'):
cls._executor = futures.ThreadPoolExecutor(cls.thread_pool_size)
return cls._executor

Ce qui suit est une simple application Flask qui illustre le WSGIHandler. La hello()fonction se bloque pendant une seconde, donc si vous ThreadPoolExecutorutilisez 20 threads, vous pourrez charger 20 requêtes en même temps (en une seconde).

La stream()fonction crée une réponse d'itérateur et transmet 50 blocs de données au client en 5 secondes. Il convient de noter qu'il ne sera probablement pas possible d'utiliser le stream_with_contextdécorateur de Flask ici : étant donné que chaque chargement de l'itérateur entraîne un nouveau executor.submit(), il est très probable que différents morceaux de la réponse en continu seront chargés à partir de différents threads, ce qui interrompra l'utilisation de thread par Flask. -des locaux.

import time
from flask import Flask, Response
from tornado import ioloop, log, web
from tornado_wsgi import WSGIHandler
def main():
app = Flask(__name__)
@app.route("/")
def hello():
time.sleep(1)
return "Hello World!"
@app.route("/stream")
def stream():
def generate():
for i in range(50):
time.sleep(0.1)
yield '%d\n' % i
return Response(generate(), mimetype='text/plain')
application = web.Application([
(r'/.*', WSGIHandler, {'wsgi_application': app}),
])
log.enable_pretty_logging()
application.listen(8888)
ioloop.IOLoop.instance().start()
if __name__ == '__main__':
main()

Aucun commentaire:

Enregistrer un commentaire

Comment utiliseriez-vous .reduce() sur des arguments au lieu d'un tableau ou d'un objet spécifique&nbsp;?

Je veux définir une fonction.flatten qui aplatit plusieurs éléments en un seul tableau. Je sais que ce qui suit n'est pas possible, mais...