celeryは、非同期でタスクを実行できるqueue/job queueのフレームワークです。
celeryは、ジョブキューイングシステムです。
ジョブキューイングシステムとは、何かしらの処理をジョブ単位で監視して、それをキューに積んでおき、あとでキューから1つずつジョブを取り出して実行していくシステムのことを指します。
キューに積んで1つずつ処理していくため、サーバーの負荷軽減に利用できたりします。
<目次>
前提条件
Macで動作確認しています。
事前準備
rabbitmqをインストールしてください。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
mkdir brew install rabbitmq # version確認 ls /usr/local/Cellar/rabbitmq/ 3.7.12 # bash登録 export PATH=/usr/local/Cellar/rabbitmq/3.7.12/sbin:$PATH # rabbitmq起動 rabbitmq-server start ## ## ## ## RabbitMQ 3.7.12. Copyright (C) 2007-2019 Pivotal Software, Inc. ########## Licensed under the MPL. See http://www.rabbitmq.com/ ###### ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log Starting broker... completed with 6 plugins. # ステータス確認 # 停止 # rabbitmqctl stop Stopping and halting node rabbit@localhost ... Gracefully halting Erlang VM |
rabbitmqは、メッセージキューイング処理を行うことができるオープンソースソフトウェアです。
pythonからceleryを利用するために、以下のモジュールをインストールしてください。
1 2 3 4 |
pip install celery pip freeze | grep celery celery==4.4.2 |
実装
早速実装していきましょう。先ほどインストールしたrabbitmqは起動したままにしておいてください。
また、以下のフォルダとファイルを作成してください。
1 2 3 |
mkdir async touch async/tasks.py touch async/main.py |
バックグラウンド実行
最初に、celeryのworkerを書きます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# tasks.py import time import random import celery # celeryの基本設定 app = celery.Celery( 'tasks', broker='amqp://guest@localhost', # rabbitmqをブローカーに指定 backend='amqp://guest@localhost', # rabbitmqをbackendに指定。Task実行後のデータを保存する(radisでも可能) ) # taskのデコレータ @app.task def build_server(): """擬似Server 構築""" print('wait 10 sec') time.sleep(10) server_id = random.randint(1, 100) return server_id |
別のterminalを立ち上げて、以下のコマンドを実行してください。
1 2 3 4 5 6 |
pwd /Users/hoge/Desktop/django-db/async # 必ずtasks.pyがある場所で実行 # workerが作成される celery -A tasks worker --loglevel=info |
workerが動いたらmain.pyに実行処理を書きます。
1 2 3 4 5 6 7 |
# main.py import tasks # delayは、バックグラウンド実行 result = tasks.build_server.delay() print('doing...') print(result) |
プログラムを実行します。
1 2 |
doing… 6da55c24-9fa2-4923-8d00-6401f0ac1731 |
workerを動かしているterminalを確認すると以下の出力がされています。
1 2 |
[2020-03-31 06:07:18,360: WARNING/ForkPoolWorker-2] wait 10 sec [2020-03-31 06:07:18,428: INFO/ForkPoolWorker-2] Task tasks.build_server[6da55c24-9fa2-4923-8d00-6401f0ac1731] succeeded in 0.07167153699998607s: 89 |
実行いただけるとわかると思いますが、tasks.pyのbuild_serverは、
関数で10秒間処理を待っています。time.sleep(10)
しかし、main.pyの
関数は、直ぐに実行されたと思います。print('doing...')
build_serverのタスク処理は、バックグラウンドで処理されているため、main.pyの処理は即座に終了したわけです。
ループで処理を確認するとよりわかりやすいかもしれません。
1 2 3 4 5 6 7 8 9 10 |
# main.py import tasks for i in range(100): # delayは、バックグラウンド実行 result = tasks.build_server.delay() print('doing...') print(result) |
グルーピング
先ほどのbuild_serverをグルーピングすることもできます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# tasks.py import time import random import celery app = celery.Celery( 'tasks', broker='amqp://guest@localhost', # rabbitmqをブローカーに指定 backend='amqp://guest@localhost', # rabbitmqをbackendに指定。Task実行後のデータを保存する(radisでも可能) ) @app.task def build_server(): """擬似Server 構築""" print('wait 10 sec') time.sleep(10) server_id = random.randint(1, 100) return server_id # 追加 @app.task def build_servers(): g = celery.group( # s -> async(非同期で実行) build_server.s() for _ in range(10)) return g() |
workerをctrl+cで止めて、再び下記のコマンドを実行します(taskを読み直す)。
1 |
celery -A tasks worker --loglevel=info |
main.pyの呼び出し先を変更します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# main.py import tasks def start_build_server(): for i in range(100): # delayは、バックグラウンド実行 result = tasks.build_sevrer.delay() print('doing...') print(result) # 追加 def start_build_servers(): result = tasks.build_servers.delay() print('doing...') print(result) if __name__ == '__main__': # start_build_server() start_build_servers() |
実行します。
1 2 |
doing... 7e1f4a08-e87d-4335-9998-72aa4c1de2ff |
1 |
[2020-03-31 06:33:07,179: INFO/ForkPoolWorker-2] Task tasks.build_servers[7c78b212-c72e-46d4-89f0-f2d8744e7bd0] succeeded in 0.06139554300000061s: <GroupResult: 21ed6404-6eee-45d3-afb5-d8e24edbefa9 [ad209d47-a784-4de6-98f4-f8707eda3251, 6829fc12-b2d2-4c36-a856-333e7db746d8, f44ef254-dff8-4fb8-9263-536765debd91, 35856d5c-37ec-435b-b444-b7017a99057f, 71e69503-bfda-496b-b8df-0691b85e1773, 891398c4-f633-4358-9c2e-56ec7f862d43, 67c60310-bd10-4aed-845b-bb7672ce3e7d, 7119d4ca-3929-4f27-af51-11c9e1ceae16, 484b12ce-3032-4f36-a007-f6bfa7d807a4, 82f5ef8f-567d-4fb1-a97c-d7935f73ab66]> |
無事にグルーピングされました。
コールバック
コールバックも実装できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# tasks.py import time import random import celery app = celery.Celery( 'tasks', broker='amqp://guest@localhost', # rabbitmqをブローカーに指定 backend='amqp://guest@localhost', # rabbitmqをbackendに指定。Task実行後のデータを保存する(radisでも可能) ) @app.task def build_server(): """擬似Server 構築""" print('wait 10 sec') time.sleep(10) server_id = random.randint(1, 100) return server_id @app.task def build_servers(): g = celery.group( # s -> async(非同期で実行) build_server.s() for _ in range(10)) return g() # 追加 @app.task def callback(result): for server_id in result: print(server_id) print('clean up') return 'done' # 追加 @app.task def build_servers_with_cleanup(): c = celery.chord( (build_server.s() for _ in range(10)), callback.s()) return c() |
workerをctrl+cで止めて、再び下記のコマンドを実行します(taskを読み直す)。
1 |
celery -A tasks worker --loglevel=info |
main.pyも以下のように変更します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# main.py import tasks def start_build_server(): for i in range(100): # delayは、バックグラウンド実行 result = tasks.build_sevrer.delay() print('doing...') print(result) def start_build_servers(): result = tasks.build_servers.delay() print('doing...') print(result) def start_build_servers_with_cleanup(): result = tasks.build_servers_with_cleanup.delay() print('doing...') print(result) if __name__ == '__main__': # start_build_server() # start_build_servers() start_build_servers_with_cleanup() |
実行してみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 |
[2020-03-31 06:39:42,216: INFO/MainProcess] Received task: tasks.callback[18970158-9de9-4a40-8c24-4b07bbe291b3] [2020-03-31 06:39:42,217: WARNING/ForkPoolWorker-2] 68 [2020-03-31 06:39:42,217: WARNING/ForkPoolWorker-2] 75 [2020-03-31 06:39:42,218: WARNING/ForkPoolWorker-2] 22 [2020-03-31 06:39:42,218: WARNING/ForkPoolWorker-2] 62 [2020-03-31 06:39:42,219: WARNING/ForkPoolWorker-2] 14 [2020-03-31 06:39:42,219: WARNING/ForkPoolWorker-2] 23 [2020-03-31 06:39:42,219: WARNING/ForkPoolWorker-2] 11 [2020-03-31 06:39:42,220: WARNING/ForkPoolWorker-2] 36 [2020-03-31 06:39:42,220: WARNING/ForkPoolWorker-2] 100 [2020-03-31 06:39:42,220: WARNING/ForkPoolWorker-2] 74 [2020-03-31 06:39:42,221: WARNING/ForkPoolWorker-2] clean up |
taskが完了したら、
が表示されています。clearn up
パイプライン
パイプラインも利用することができます。
1 2 3 4 5 6 7 8 9 10 11 12 |
# tasks.py @app.task def setup_dns(server_id): print('setup dns for{}'.format(server_id)) return 'done for {}'.format(server_id) @app.task def deploy_customer_server(): # | を使うことによりbuild_serverの結果をsetup_dnsに渡せる chain = build_server.s() | setup_dns.s() return chain() |
workerをctrl+cで止めて、再び下記のコマンドを実行します(taskを読み直す)。
1 |
celery -A tasks worker --loglevel=info |
main.pyを書き換えます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# main.py import tasks def start_build_server(): for i in range(100): # delayは、バックグラウンド実行 result = tasks.build_sevrer.delay() print('doing...') print(result) def start_build_servers(): result = tasks.build_servers.delay() print('doing...') print(result) def start_build_servers_with_cleanup(): result = tasks.build_servers_with_cleanup.delay() print('doing...') print(result) def start_deploy_customer_server(): result = tasks.deploy_customer_server.delay() print(result) if __name__ == '__main__': # start_build_server() # start_build_servers() # start_build_servers_with_cleanup() start_deploy_customer_server() |
実行します。
1 2 3 4 5 6 7 |
[2020-03-31 06:44:39,376: WARNING/ForkPoolWorker-3] wait 10 sec [2020-03-31 06:44:39,406: INFO/ForkPoolWorker-2] Task tasks.deploy_customer_server[d7955b72-cbba-4330-91c1-23c019d09d1e] succeeded in 0.0534093569999996s: <AsyncResult: d9eca776-ebb7-40f6-8b91-ec622586ab51> [2020-03-31 06:44:49,401: INFO/MainProcess] Received task: tasks.setup_dns[d9eca776-ebb7-40f6-8b91-ec622586ab51] [2020-03-31 06:44:49,402: WARNING/ForkPoolWorker-2] setup dns for76 [2020-03-31 06:44:49,451: INFO/ForkPoolWorker-3] Task tasks.build_server[1cba829a-fb16-44a0-9b48-ab26286238ac] succeeded in 10.075051987999998s: 76 [2020-03-31 06:44:49,479: INFO/ForkPoolWorker-2] Task tasks.setup_dns[d9eca776-ebb7-40f6-8b91-ec622586ab51] succeeded in 0.07694268100000556s: 'done for 76' |
便利ですね。
まとめ
簡単なceleryの使い方をまとめました。
次は、Djangoを使った方法を勉強しようと思います。
コメントを残す
コメントを投稿するにはログインしてください。