portaldacalheta.pt
  • Principal
  • Rise Of Remote
  • Outils Et Tutoriels
  • Équipes Distribuées
  • Mode De Vie
Back-End

Orchestration d'un workflow de travail d'arrière-plan dans Celery pour Python



Les applications Web modernes et leurs systèmes sous-jacents sont plus rapides et plus réactifs que jamais. Cependant, il existe encore de nombreux cas où vous souhaitez décharger l'exécution d'une tâche lourde vers d'autres parties de l'ensemble de l'architecture de votre système au lieu de les aborder sur votre thread principal. Identifier ces tâches est aussi simple que de vérifier si elles appartiennent à l'une des catégories suivantes:

  • Tâches périodiques - Tâches que vous planifierez pour s'exécuter à une heure spécifique ou après un intervalle, par exemple, la génération de rapports mensuels ou un racleur Web qui s'exécute deux fois par jour.
  • Tâches tierces - L'application Web doit servir les utilisateurs rapidement sans attendre que d'autres actions se terminent pendant le chargement de la page, par exemple, envoyer un e-mail ou une notification ou propager des mises à jour vers des outils internes (comme la collecte de données pour les tests A / B ou la journalisation du système ).
  • Tâches de longue durée - Tâches qui coûtent cher en ressources, où les utilisateurs doivent attendre pendant qu'ils calculent leurs résultats, par exemple, exécution de flux de travail complexes (flux de travail DAG), génération de graphiques, tâches de type Map-Reduce et diffusion de contenu multimédia (vidéo, l'audio).

Une solution simple pour exécuter une tâche en arrière-plan serait de l'exécuter dans un thread ou un processus distinct. Python est un langage de programmation complet Turing de haut niveau, qui ne fournit malheureusement pas de concurrence intégrée à une échelle correspondant à celle d'Erlang, Go, Java, Scala ou Akka. Celles-ci sont basées sur les processus séquentiels de communication de Tony Hoare ( CSP ). Les threads Python, quant à eux, sont coordonnés et planifiés par le verrou d'interprétation global ( GIL ), qui empêche plusieurs threads natifs d'exécuter des bytecodes Python à la fois. Se débarrasser du GIL est un sujet de beaucoup de discussion parmi Développeurs Python , mais ce n'est pas l'objet de cet article. La programmation simultanée en Python est démodée, même si vous êtes invité à en savoir plus sur Tutoriel multithreading Python par son compatriote ApeeScapeer Marcus McCurdy. Ainsi, la conception cohérente de la communication entre les processus est un processus sujet aux erreurs et entraîne un couplage de code et une mauvaise maintenabilité du système, sans oublier que cela affecte négativement l'évolutivité. De plus, le processus Python est un processus normal sous un système d'exploitation (OS) et, avec toute la bibliothèque standard Python, il devient un poids lourd. À mesure que le nombre de processus dans l'application augmente, le passage d'un processus à un autre devient une opération qui prend du temps.



Pour mieux comprendre la concurrence avec Python, regardez ce discours incroyable de David Beazley sur PyCon’15 .



Une bien meilleure solution est de servir un file d'attente distribuée ou son paradigme de frère bien connu appelé publier-s'abonner . Comme le montre la figure 1, il existe deux types d'applications dans lesquelles l'une, appelée éditeur , envoie des messages et l'autre, appelé le abonné , reçoit des messages. Ces deux agents n'interagissent pas directement l'un avec l'autre et ne sont même pas conscients l'un de l'autre. Les éditeurs envoient des messages à une file d'attente centrale, ou courtier , et les abonnés reçoivent des messages d'intérêt de ce courtier. Cette méthode présente deux avantages principaux:



  • Évolutivité - les agents n'ont pas besoin de se connaître les uns les autres dans le réseau. Ils sont focalisés par sujet. Cela signifie donc que chacun peut continuer à fonctionner normalement indépendamment de l'autre de manière asynchrone.
  • Couplage lâche - chaque agent représente sa partie du système (service, module). Comme ils sont faiblement couplés, chacun peut évoluer individuellement au-delà du centre de données.

Il existe de nombreux systèmes de messagerie qui prennent en charge de tels paradigmes et fournissent une API soignée, pilotée soit par des protocoles TCP ou HTTP, par exemple JMS, RabbitMQ, Redis Pub / Sub, Apache ActiveMQ, etc.

Paradigme de publication-abonnement avec Celery Python
Figure 1: Paradigme de publication-abonnement

Qu'est-ce que le céleri?

Céleri est l'un des gestionnaires de travaux d'arrière-plan les plus populaires dans le monde Python. Celery est compatible avec plusieurs courtiers de messages comme RabbitMQ ou Redis et peut agir à la fois en tant que producteur et consommateur.



Celery est une file d'attente de tâches / file d'attente de tâches asynchrone basée sur la transmission de messages distribués. Il se concentre sur les opérations en temps réel mais prend également en charge la planification. Les unités d'exécution, appelées tâches, sont exécutées simultanément sur un ou plusieurs serveurs worker en multitraitement, Eventlet , ou ventilé . Les tâches peuvent s'exécuter de manière asynchrone (en arrière-plan) ou synchrone (attendre jusqu'à ce qu'elles soient prêtes). - Projet céleri

Pour démarrer avec Céleri, suivez simplement un guide étape par étape sur le documents officiels .



L'objectif de cet article est de vous donner une bonne compréhension des cas d'utilisation pouvant être couverts par Celery. Dans cet article, nous allons non seulement montrer des exemples intéressants, mais également essayer d'apprendre comment appliquer Celery à des tâches du monde réel telles que l'envoi en arrière-plan, la génération de rapports, la journalisation et le rapport d'erreurs. Je partagerai ma façon de tester les tâches au-delà de l'émulation et, enfin, je fournirai quelques astuces qui ne sont pas (bien) documentées dans la documentation officielle qui m'a pris des heures de recherche à découvrir par moi-même.

connaître l'élasticité-prix d'un produit permet aux économistes de :

Si vous n'avez aucune expérience préalable avec Celery, je vous encourage d'abord à l'essayer en suivant le tutoriel officiel.



Aiguiser votre appétit

Si cet article vous intrigue et vous donne envie de plonger immédiatement dans le code, alors suivez ceci Dépôt GitHub pour le code utilisé dans cet article. Le README file là vous donnera une approche rapide et sale pour exécuter et jouer avec les exemples d'applications.

Premiers pas avec le céleri

Pour commencer, nous allons parcourir une série d'exemples pratiques qui montreront au lecteur comment Céleri résout simplement et élégamment des tâches apparemment non triviales. Tous les exemples seront présentés dans le cadre de Django; cependant, la plupart d'entre eux pourraient facilement être portés vers d'autres frameworks Python (Flask, Pyramid).



La mise en page du projet a été générée par Cookiecutter Django ; cependant, je n'ai conservé que quelques dépendances qui, à mon avis, facilitent le développement et la préparation de ces cas d'utilisation. J'ai également supprimé les modules inutiles pour ce poste et les applications afin de réduire le bruit et de rendre le code plus facile à comprendre.

- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py
  • celery_uncovered/{toyex,tricks,advex} contient différentes applications que nous aborderons dans cet article. Chaque application contient un ensemble d'exemples organisés en fonction du niveau de compréhension du céleri requis.
  • celery_uncovered/celery.py définit une instance Celery.

Fichier: celery_uncovered/celery.py:



from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()

Ensuite, nous devons nous assurer que Celery commencera avec Django. Pour cette raison, nous importons l'application dans celery_uncovered/__init__.py.

Fichier: celery_uncovered/__init__.py:

from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])

config/settings est la source de configuration pour notre application et Celery. En fonction de l'environnement d'exécution, Django lancera les paramètres correspondants: local.py pour le développement ou test.py pour tester. Vous pouvez également définir votre propre environnement si vous le souhaitez en créant un nouveau module python (par exemple, prod.py). Les configurations de céleri sont préfixées par CELERY_. Pour ce post, j'ai configuré RabbitMQ comme courtier et SQLite comme résultat bac-end.

Fichier: config/local.py:

CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest: [email protected] :5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'

Scénario 1 - Génération et exportation de rapports

Le premier cas que nous allons couvrir est la génération et l'exportation de rapports. Dans cet exemple, vous apprendrez à définir une tâche qui produit un rapport CSV et à la planifier à intervalles réguliers avec céleri .

Description du cas d'utilisation: récupérez les cinq cents référentiels les plus chauds de GitHub par période choisie (jour, semaine, mois), regroupez-les par thèmes et exportez le résultat vers le fichier CSV.

Si nous fournissons un service HTTP qui exécutera cette fonctionnalité déclenchée en cliquant sur un bouton intitulé «Générer un rapport», l'application s'arrêterait et attendrait la fin de la tâche avant de renvoyer une réponse HTTP. C'est mauvais. Nous voulons que notre application Web soit rapide et nous ne voulons pas que nos utilisateurs attendent pendant que notre back-end calcule les résultats. Au lieu d'attendre que les résultats soient produits, nous préférerions mettre en file d'attente la tâche vers les processus de travail via une file d'attente enregistrée dans Celery et répondre par un task_id au front-end. Ensuite, le front-end utiliserait le task_id pour interroger le résultat de la tâche de manière asynchrone (par exemple, AJAX) et tiendra l'utilisateur informé de la progression de la tâche. Enfin, une fois le processus terminé, les résultats peuvent être servis sous forme de fichier à télécharger via HTTP.

Détails d'implémentation

Tout d'abord, décomposons le processus en ses plus petites unités possibles et créons un pipeline:

Les fonctions c++ autres que main sont exécutées :
  1. Récupérateurs sont les travailleurs chargés d'obtenir les référentiels du service GitHub.
  2. La Agrégateur est le travailleur responsable de la consolidation des résultats en une seule liste.
  3. La Importateur est le worker qui produit des rapports CSV des référentiels les plus chauds de GitHub.
Un pipeline de travailleurs Celery Python
Figure 2: Un pipeline de nœuds de calcul avec Celery et Python

La récupération des référentiels est une requête HTTP utilisant le API de recherche GitHub GET /search/repositories. Cependant, il existe une limitation du service de l'API GitHub qui doit être gérée: l'API renvoie jusqu'à 100 référentiels par requête au lieu de 500. Nous pourrions envoyer cinq requêtes une à la fois, mais nous ne voulons pas faire attendre notre utilisateur. pour cinq demandes individuelles puisque les demandes HTTP sont une opération liée aux E / S. Au lieu de cela, nous pouvons exécuter cinq requêtes HTTP simultanées avec un paramètre de page approprié. La page sera donc dans la plage [1..5]. Définissons une tâche appelée fetch_hot_repos/3 -> list dans le toyex/tasks.py module:

Fichier: celery_uncovered/toyex/local.py

@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items

Donc fetch_hot_repos crée une requête à l'API GitHub et répond à l'utilisateur avec une liste de référentiels. Il reçoit trois paramètres qui définiront notre charge utile de requête:

  • since - Filtre les référentiels à la date de création.
  • per_page - Nombre de résultats à renvoyer par demande (limité par 100).
  • page —- Numéro de page demandé (dans la plage [1..5]).

Remarque: Pour utiliser l'API GitHub Search, vous aurez besoin d'un jeton OAuth pour passer les contrôles d'authentification. Dans notre cas, il est enregistré dans les paramètres sous GITHUB_OAUTH.

Ensuite, nous devons définir une tâche maître qui sera chargée d'agréger les résultats et de les exporter dans un fichier CSV: produce_hot_repo_report_task/2->filepath:

Fichier: celery_uncovered/toyex/local.py

@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get() @shared_task def build_report_task(results, ref_date): all_repos = [] for repos in results: all_repos += [Repository(repo) for repo in repos] # 3. group by language grouped_repos = {} for repo in all_repos: if repo.language in grouped_repos: grouped_repos[repo.language].append(repo.name) else: grouped_repos[repo.language] = [repo.name] # 4. create csv lines = [] for lang in sorted(grouped_repos.keys()): lines.append([lang] + grouped_repos[lang]) filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date) return make_csv(filename, lines)

Cette tâche utilise celery.canvas.group pour exécuter cinq appels simultanés de fetch_hot_repos/3. Ces résultats sont attendus puis réduits à une liste d'objets du référentiel. Ensuite, notre jeu de résultats est regroupé par sujet et finalement exporté dans un fichier CSV généré sous le MEDIA_ROOT/ annuaire.

Afin de planifier périodiquement la tâche, vous pouvez ajouter une entrée à la liste de planification dans le fichier de configuration:

Fichier: config/local.py

from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'produce-csv-reports': { 'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today',) }, }

Essayer

Afin de lancer et de tester le fonctionnement de la tâche, nous devons d'abord démarrer le processus Celery:

$ celery -A celery_uncovered worker -l info

Ensuite, nous devons créer le celery_uncovered/media/ annuaire. Ensuite, vous pourrez tester sa fonctionnalité soit via Shell ou Celerybeat:

coquille :

from datetime import date from celery_uncovered.toyex.tasks import produce_hot_repo_report_task produce_hot_repo_report_task.delay('today').get(timeout=5)

Céleri :

# Start celerybeat with the following command $ celery -A celery_uncovered beat -l info

Vous pouvez regarder les résultats sous le MEDIA_ROOT/ annuaire.

Scénario 2 - Rapport sur Serveur 500 Erreurs par e-mail

L'un des cas d'utilisation les plus courants de Celery est l'envoi de notifications par e-mail. La notification par e-mail est une opération liée aux E / S hors ligne qui exploite un serveur SMTP local ou un SES tiers. Il existe de nombreux cas d'utilisation qui impliquent l'envoi d'un e-mail et, pour la plupart d'entre eux, l'utilisateur n'a pas besoin d'attendre la fin de ce processus avant de recevoir une réponse HTTP. C’est pourquoi il est préférable d’exécuter ces tâches en arrière-plan et de répondre immédiatement à l’utilisateur.

Description du cas d'utilisation: Signalez les erreurs 50X au courrier électronique de l'administrateur via Celery.

Python et Django ont l'expérience nécessaire pour effectuer journalisation du système . Je n’entrerai pas dans les détails du fonctionnement réel de la journalisation de Python. Cependant, si vous ne l'avez jamais essayé auparavant ou si vous avez besoin d'un rappel, lisez la documentation du enregistrement module. Vous voulez certainement cela dans votre environnement de production. Django a un gestionnaire de journalisation spécial appelé AdminEmailHandler qui envoie un e-mail aux administrateurs pour chaque message de journal qu'il reçoit.

Détails d'implémentation

L'idée principale est d'étendre le send_mail méthode du AdminEmailHandler classe de manière à pouvoir envoyer du courrier via Celery. Cela pourrait être fait comme illustré dans la figure ci-dessous:

Gérer les e-mails des administrateurs avec Celery et Python
Figure 3: Gestion des e-mails des administrateurs avec Celery et Python

Tout d'abord, nous devons configurer une tâche appelée report_error_task qui appelle mail_admins avec l'objet et le message fournis:

Fichier: celery_uncovered/toyex/tasks.py

@shared_task def report_error_task(subject, message, *args, **kwargs): mail_admins(subject, message, *args, **kwargs)

Ensuite, nous étendons AdminEmailHandler afin qu'il n'appelle en interne que la tâche Celery définie:

Fichier: celery_uncovered/toyex/admin_email.py

from django.utils.log import AdminEmailHandler from celery_uncovered.handlers.tasks import report_error_task class CeleryHandler(AdminEmailHandler): def send_mail(self, subject, message, *args, **kwargs): report_error_task.delay(subject, message, *args, **kwargs)

Enfin, nous devons configurer la journalisation. La configuration de la journalisation dans Django est assez simple. Ce dont vous avez besoin est de remplacer LOGGING afin que le moteur de journalisation démarre en utilisant un gestionnaire nouvellement défini:

Fichier config/settings/local.py

LOGGING = { 'version': 1, 'disable_existing_loggers': False, ..., 'handlers': { ... 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler' } }, 'loggers': { 'django': { 'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ... } }

Notez que j'ai volontairement configuré des filtres de gestionnaire require_debug_true afin de tester cette fonctionnalité pendant que l'application s'exécute en mode débogage.

Essayer

Pour le tester, j'ai préparé une vue Django qui sert une opération «division par zéro» à localhost:8000/report-error. Vous devez également démarrer un conteneur MailHog Docker pour tester que l'e-mail est réellement envoyé.

$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver $ # with your browser navigate to [http://localhost:8000](http://localhost:8000) $ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)

Détails supplémentaires

En tant qu'outil de test de messagerie, j'ai configuré MailHog et configuré le mailing Django pour l'utiliser pour la livraison SMTP. Il existe de nombreuses façons de déployer et exécuter MailHog. J'ai décidé d'aller avec un conteneur Docker. Vous pouvez trouver les détails dans le fichier README correspondant:

Fichier: docker/mailhog/README.md

$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest $ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ # navigate with your browser to localhost:8025

Pour configurer votre application pour utiliser MailHog, vous devez ajouter les lignes suivantes dans votre configuration:

Fichier: config/settings/local.py

EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend') EMAIL_PORT = 1025 EMAIL_HOST = env('EMAIL_HOST', default='mailhog')

Au-delà des tâches de céleri par défaut

Les tâches de céleri peuvent être créées à partir de n'importe quelle fonction appelable. Par défaut, toute tâche définie par l'utilisateur est injectée avec celery.app.task.Task en tant que classe parent (abstraite). Cette classe contient la fonctionnalité d'exécution de tâches de manière asynchrone (en la passant via le réseau à un ouvrier Celery) ou de manière synchrone (à des fins de test), en créant des signatures et de nombreux autres utilitaires. Dans les exemples suivants, nous allons essayer d'étendre Celery.app.task.Task puis utilisez-le comme classe de base afin d'ajouter quelques comportements utiles à nos tâches.

les modifications apportées au code source d'un programme open source ne peuvent pas être ajoutées au produit.

Scénario 3 - Journalisation des fichiers par tâche

Dans l'un de mes projets, je développais une application qui fournit à l'utilisateur final un outil de type Extract, Transform, Load (ETL) capable d'ingérer puis de filtrer une énorme quantité de données hiérarchiques. Le back-end a été divisé en deux modules:

  • Orchestration d'un pipeline de traitement de données avec Celery
  • Traitement des données avec Go

Celery a été déployé avec une instance de Celerybeat et plus de 40 ouvriers. Il y avait plus de vingt tâches différentes qui composaient les activités de pipeline et d'orchestration. Chacune de ces tâches peut échouer à un moment donné. Tous ces échecs ont été sauvegardés dans le journal système de chaque travailleur. À un moment donné, le débogage et la maintenance de la couche Céleri ont commencé à devenir peu pratiques. Finalement, nous avons décidé d'isoler le journal des tâches dans un fichier spécifique à la tâche.

Description du cas d'utilisation: Étendez Celery afin que chaque tâche enregistre sa sortie standard et ses erreurs dans des fichiers

Celery fournit aux applications Python un excellent contrôle sur ce qu'il fait en interne. Il est livré avec un cadre de signaux familier. Les applications qui utilisent Celery peuvent s'abonner à quelques-unes de celles-ci afin d'augmenter le comportement de certaines actions. Nous allons exploiter les signaux au niveau des tâches pour fournir un suivi détaillé des cycles de vie des tâches individuelles. Le céleri est toujours livré avec un back-end de journalisation, et nous allons en profiter tout en ne dépassant que légèrement à quelques endroits pour atteindre nos objectifs.

Détails d'implémentation

Celery prend déjà en charge la journalisation par tâche. Pour enregistrer dans un fichier, il est nécessaire d'envoyer la sortie du journal à l'emplacement approprié. Dans notre cas, l'emplacement approprié de la tâche est un fichier correspondant au nom de la tâche. Sur l'instance Celery, nous remplacerons la configuration de journalisation intégrée avec des gestionnaires de journalisation déduits dynamiquement. Il est possible de souscrire à la celeryd_after_setup signal, puis configurez-y la journalisation du système:

Fichier: celery_uncovered/toyex/celery_conf.py

@signals.celeryd_after_setup.connect def configure_task_logging(instance=None, **kwargs): tasks = instance.app.tasks.keys() LOGS_DIR = settings.ROOT_DIR.path('logs') if not os.path.exists(str(LOGS_DIR)): os.makedirs(str(LOGS_DIR)) print 'dir created' default_handler = { 'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': '' } default_logger = { 'handlers': [], 'level': 'DEBUG', 'propogate': True } LOG_CONFIG = { 'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {} } for task in tasks: task = str(task) if not task.startswith('celery_uncovered.'): continue task_handler = copy_dict(default_handler) task_handler['filename'] = str(LOGS_DIR.path(task + '.log')) task_logger = copy_dict(default_logger) task_logger['handlers'] = [task] LOG_CONFIG['handlers'][task] = task_handler LOG_CONFIG['loggers'][task] = task_logger logging.config.dictConfig(LOG_CONFIG)

Notez que pour chaque tâche enregistrée dans l'application Celery, nous construisons un enregistreur correspondant avec son gestionnaire. Chaque gestionnaire est du type logging.FileHandler, et par conséquent, chaque instance de ce type reçoit un nom de fichier en entrée. Tout ce dont vous avez besoin pour le faire fonctionner est d'importer ce module dans celery_uncovered/celery.py à la fin du fichier:

import celery_uncovered.tricks.celery_conf

Un enregistreur de tâches particulier peut être reçu en appelant get_task_logger(task_name). Afin de généraliser ce comportement pour chaque tâche, il est nécessaire d'étendre légèrement celery.current_app.Task avec quelques méthodes utilitaires:

Fichier: celery_uncovered/tricks/celery_ext.py

class LoggingTask(current_app.Task): abstract = True ignore_result = False @property def logger(self): logger = get_task_logger(self.name) return logger def log_msg(self, msg, *msg_args): self.logger.debug(msg, *msg_args)

Désormais, dans le cas d'un appel à task.log_msg('Hello, my name is: %s', task.request.id), la sortie du journal sera acheminée vers le fichier correspondant sous le nom de la tâche.

Essayer

Afin de lancer et de tester le fonctionnement de cette tâche, commencez par démarrer le processus Celery:

$ celery -A celery_uncovered worker -l info

Ensuite, vous pourrez tester les fonctionnalités via Shell:

from datetime import date from celery_uncovered.tricks.tasks import add add.delay(1, 3)

Enfin, pour voir le résultat, accédez au celery_uncovered/logs répertoire et ouvrez le fichier journal correspondant appelé celery_uncovered.tricks.tasks.add.log. Vous pouvez voir quelque chose de similaire à celui ci-dessous après avoir exécuté cette tâche plusieurs fois:

Result of 1 + 2 = 3 Result of 1 + 2 = 3 ...

Scénario 4 - Tâches sensibles à l'étendue

Imaginons une application Python pour les utilisateurs internationaux basée sur Celery et Django. Les utilisateurs peuvent définir la langue (locale) dans laquelle ils utilisent votre application.

Vous devez concevoir un système de notification par e-mail multilingue et compatible avec les paramètres régionaux. Pour envoyer des notifications par e-mail, vous avez enregistré une tâche spéciale de céleri qui est gérée par une file d'attente spécifique. Cette tâche reçoit certains arguments clés en entrée et les paramètres régionaux actuels de l'utilisateur afin que l'e-mail soit envoyé dans la langue choisie par l'utilisateur.

Imaginez maintenant que nous avons de nombreuses tâches de ce type, mais que chacune de ces tâches accepte un argument de locale. Dans ce cas, ne serait-il pas préférable de le résoudre à un niveau d'abstraction plus élevé? Ici, nous voyons comment faire cela.

Description du cas d'utilisation: Hérite automatiquement de la portée d'un contexte d'exécution et injecte-la dans le contexte d'exécution actuel en tant que paramètre.

Détails d'implémentation

Encore une fois, comme nous l'avons fait avec la journalisation des tâches, nous voulons étendre une classe de tâches de base celery.current_app.Task et remplacez quelques méthodes chargées d'appeler des tâches. Pour les besoins de cette démonstration, je remplace le celery.current_app.Task::apply_async méthode. Il existe des tâches supplémentaires pour ce module qui vous aideront à produire un remplacement pleinement fonctionnel.

Fichier: celery_uncovered/tricks/celery_ext.py

comment écrire un langage de programmation
class ScopeBasedTask(current_app.Task): abstract = True ignore_result = False default_locale_id = DEFAULT_LOCALE_ID scope_args = ('locale_id',) def __init__(self, *args, **kwargs): super(ScopeBasedTask, self).__init__(*args, **kwargs) self.set_locale(locale=kwargs.get('locale_id', None)) def set_locale(self, scenario_id=None): self.locale_id = self.default_locale_id if locale_id: self.locale_id = locale_id else: self.locale_id = get_current_locale().id def apply_async(self, args=None, kwargs=None, **other_kwargs): self.inject_scope_args(kwargs) return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs) def __call__(self, *args, **kwargs): task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs) return task_rv def inject_scope_args(self, kwargs): for arg in self.scope_args: if arg not in kwargs: kwargs[arg] = getattr(self, arg)

L'indice clé est de transmettre les paramètres régionaux actuels en tant qu'argument clé-valeur dans une tâche appelante par défaut. Si une tâche a été appelée avec un certain paramètre régional comme argument, elle reste inchangée.

Essayer

Pour tester cette fonctionnalité, définissons une tâche factice de type ScopeBasedTask. Il localise un fichier par ID de paramètres régionaux et lit son contenu au format JSON:

Fichier: celery_uncovered/tricks/tasks.py

@shared_task(bind=True, base=ScopeBasedTask) def read_scenario_file_task(self, **kwargs): fixture_parts = ['locales', 'sc_%i.json' % kwargs['scenario_id']] return read_fixture(*fixture_parts)

Maintenant, ce que vous devez faire est de répéter les étapes de lancement de Celery, de démarrage du shell et de test de l'exécution de cette tâche sur différents scénarios. Les luminaires sont situés sous le celery_uncovered/tricks/fixtures/locales/ annuaire.

Conclusion

Cet article visait à explorer le céleri sous différents angles. J'ai fait la démonstration du céleri dans des exemples conventionnels tels que l'envoi et la génération de rapports, ainsi que des astuces partagées pour des cas d'utilisation commerciaux de niche intéressants. Celery repose sur une philosophie axée sur les données et votre équipe peut simplifier considérablement sa vie en l'introduisant dans sa pile système. Développer des services basés sur Celery n'est pas très compliqué si vous avez une expérience de base en Python, et vous devriez être en mesure de le récupérer assez rapidement. La configuration par défaut est suffisante pour la plupart des utilisations, mais si nécessaire, elles peuvent être très flexibles.

Notre équipe a fait le choix d'utiliser Celery comme back-end d'orchestration pour les travaux d'arrière-plan et les tâches de longue durée. Nous l'utilisons largement pour une variété de cas d'utilisation, dont seuls quelques-uns ont été mentionnés dans cet article. Nous ingérons et analysons des gigaoctets de données chaque jour, mais ce n'est que le début des techniques de mise à l'échelle horizontale.

Comprendre les bases

Qu'est-ce que le céleri pour Python?

Celery est l'un des gestionnaires de travail d'arrière-plan les plus populaires dans le monde Python. Celery est compatible avec plusieurs courtiers de messages comme RabbitMQ ou Redis et peut agir à la fois en tant que producteur et consommateur.

Qu'est-ce que Pub-Sub?

Le modèle de publication-abonnement (ou producteur-consommateur) est un modèle de messagerie distribuée dans les systèmes informatiques où les éditeurs diffusent des messages via un courtier de messages et les abonnés écoutent les messages. Les deux peuvent être des composants isolés du système, ni conscients ni en communication directe avec l'autre.

Plugin Zeplin Sketch - Le pont de flux de travail entre la conception et l'ingénierie

Outils Et Tutoriels

Plugin Zeplin Sketch - Le pont de flux de travail entre la conception et l'ingénierie
Brands Still Matter - Boom sans marque à éclater

Brands Still Matter - Boom sans marque à éclater

Conception Ux

Articles Populaires
Création d'une API REST Node.js / TypeScript, partie 2: modèles, middleware et services
Création d'une API REST Node.js / TypeScript, partie 2: modèles, middleware et services
Un didacticiel sur la radio définie par logiciel: images de la Station spatiale internationale et écoute de jambons avec un RTL-SDR
Un didacticiel sur la radio définie par logiciel: images de la Station spatiale internationale et écoute de jambons avec un RTL-SDR
Les drones commerciaux révolutionnent les opérations commerciales
Les drones commerciaux révolutionnent les opérations commerciales
Faire des affaires dans l'Union européenne
Faire des affaires dans l'Union européenne
AI vs BI: différences et synergies
AI vs BI: différences et synergies
 
Stratège de contenu produit
Stratège de contenu produit
Risque vs récompense: un guide pour comprendre les conteneurs logiciels
Risque vs récompense: un guide pour comprendre les conteneurs logiciels
Explorer SMACSS: architecture évolutive et modulaire pour CSS
Explorer SMACSS: architecture évolutive et modulaire pour CSS
Si vous n'utilisez pas de données UX, ce n'est pas de la conception UX
Si vous n'utilisez pas de données UX, ce n'est pas de la conception UX
Simplification de l'utilisation des API RESTful et de la persistance des données sur iOS avec Mantle et Realm
Simplification de l'utilisation des API RESTful et de la persistance des données sur iOS avec Mantle et Realm
Articles Populaires
  • mood board dans la création de mode
  • tailles d'appareils pour un design réactif
  • meilleurs cours de c++
  • meilleur cours de programmation c en ligne
  • site de rencontre numéro 1 2015
  • ce que les investisseurs recherchent dans les startups
  • différence entre ai et vi
Catégories
  • Rise Of Remote
  • Outils Et Tutoriels
  • Équipes Distribuées
  • Mode De Vie
  • © 2022 | Tous Les Droits Sont Réservés

    portaldacalheta.pt