Tutorial Celery + RabbitMQ dengan FastAPI Python

Buat sebuah code berjalan dibelakang dengan celery!

Hudya
6 min readOct 31, 2020

Halo semua, Kiddy disini dan pada kali ini saya ingin membagikan insight mengenai Background Task dengan Celery + RabbitMQ.

Pada tutorial kali ini saya ingin membagikan insight mengenai cara untuk membuat background task pada Python FastAPI.

Nah sebelumnya saya juga udah pernah share nih caranya, namun itu di Node JS dan dengan broker service yang sama yaitu RabbitMQ.

Kenapa sih harus RabbitMQ? Kenapa ngga Redis Broker Message aja? Sebenernya both of them ya bisa-bisa aja guys cuma secara performa sebenernya banyak keuntungan dari RabbitMQ itu sendiri, soalnya ya RabbitMQ ini emang ditujukan sebagai broker-message service, sedangkan Redis kan In Memory Database, nah kebetulan Redis bisa juga dipake sebagai broker-message, gitu loh guys.

Nah bagi kalian yang bingung memilih apa, saya bisa kasih sedikit saran. Jikalau kalian butuh sebuah broker-message service yang powerful, yang benar-benar persistent alias kuat, saya saranin RabbitMQ, kenapa gitu? RabbitMQ banyak fungsi yang memastikan bahwa pesan kalian ya harus benar-benar terkirim, sedangkan kalo Redis, ya rada lembek nih, cuma kan kalo misalnya nih kalian punya server yang ngga strong-strong amat, sayang ya dua servis bersamaan? Jadi kalo emang masih belom butuh, ngga apa pake Redis aja gan. Untuk perbandingannya saya drop dibawah ya.

Oke lanjut, sekarang saatnya kita mulai kick off coding!

Nah karena saya mau cepet, maka boilerplate akan saya gunakan, yaitu bekas tutorial yang ada di tutorial dibawah ini, jadi kalo mau clone dari boilerplate saya minimal udah ngikut CRUD saya atau kalo emang kamu udah intermediate tinggal clone aja github saya dan pastikan pake MongoDB ya ^^

Clone githubnya disini:

Kalo kamu adalah level advanced alias udah pernah pake Pymongo, ngerti konsep Python dan pokoknya udah paham deh dan kamu cuma butuh sedikit “sentuhan” untuk memahami Celery silahkan diskip aja bagian clone-clonenya projek saya dan sesuaikan sama projekmu.

Oke, kalo udah sampai sini, pokoknya saya anggap kamu udah bisa, udah paham dan udah siap ya, saya ngga ngulang dari 0 lagi.

Pastiin juga projeknya udah ready dan sudah masuk ke mode virtual environment!

Kita install dulu nih kebutuhannya.

pip install celery

Oke kita lanjut, habis install celery, kita akan memulai nih membuat folder init si celery, tujuannya biar kita bisa manggil si celery ini agar berjalan.

Kita akan menggunakan kasus pembuatan log setelah user membuat todo pada artikel ini, nah kalo kalian lanjutin dari github saya, kita buat dulu file modelnya,

todo_log.py

from mongoengine import *

from app.models.todo import Todos
from app.models.user import Users


class TodoLogs(Document):
message = StringField()
todo = LazyReferenceField(Todos, reverse_delete_rule=CASCADE)
user = LazyReferenceField(Users, reverse_delete_rule=CASCADE)
created_at = DateTimeField()

Nah sekarang kita buat dulu folder tasks, sejajar sama si app.

Sekarang didalam folder tasks kita buat file namanya celery_init.py

celery_init.py

from celery import Celery
from app.tasks import SayHelloWorld

broker_url = 'amqp://guest:guest@localhost:5672/'
redis_broker_url = 'redis://guest:@localhost:6379/'
app = Celery('tasks', backend='rpc://', broker=broker_url)
app.conf.beat_schedule = {
'say-hello-world': {
'task': 'app.tasks.celery_init.sayHelloWorld',
'schedule': 10.0
},
}


@app.task
def sayHelloWorld():
SayHelloWorld.execute()

Nah kalo kalian mau pake settingan rabbit mq, ganti aja url si broker ke broker_url dan apabila pengen pake redis, ganti aja ke redis_broker_url, kalo misalnya di local kalian ada passwordnya, tambahin aja passwordnya setelah kata tanda kutip (guest:{password}@localhost:6379)

Ada yang merah error? Yaudah diemin dulu, sekarang kita buat file SayHelloWorld.py sejajar sama si celery_init.py

SayHelloWorld.py

def execute():
print("Hello World")

Simple ngeprint Hello World aja, nanti ini akan saya jelaskan untuk membuat automated tasks layaknya AdonisJS.

Sekarang kita buat file CreateTodoLog.py sejajar dengan celery_init.py

CreateTodoLog.py

from datetime import datetime


from app.models.todo_logs import TodoLogs
from app.tasks.celery_init import app


@app.task
def execute(payload):
title = payload['todo_title']

log = TodoLogs()
log.user = payload['user_id']
log.todo = payload['todo_id']
log.created_at = datetime.utcnow()
log.message = f"Pengguna membuat todo dengan judul {title}"
log.save()

Habis itu kita balik ke controllers -> TodoController.py dan ubah sedikit fungsi store.

from app.tasks import CreateTodoLog@staticmethod
async def store(request: Request) -> JSONResponse:
try:
body = await request.json()
title = body['title']
user_id = body['user_id']

todo = Todos()
todo.title = title
todo.owner = user_id
todo.save()

todos = TodoTransformer.singleTransform(todo)

# Menyiapkan payload yang akan dikirim ke worker
payload = {
"todo_title": title,
"user_id": user_id,
"todo_id": str(todo.id)
}

# Menjalankan paylaod
CreateTodoLog.execute.delay(payload)

return response.ok(todos, "Berhasil Membuat Todo!")
except Exception as e:
return response.badRequest('', f'{e}')

Kira-kira gambaran struktur kita kaya gini gan.

Cakep sekarang kita bisa mulai jalanin!

Pertama jalanin dulu projeknya.

uvicorn main:app --reload --port 5000 --log-level=debug

Kedua, jalanin worker kita.

celery -A app.tasks.celery_init.app worker --loglevel=INFO

Kira-kira kalo udah berhasil jalan ntar tampilannya kaya gini.

Sekarang kita jalanin si automated tasks kita.

celery -A app.tasks.celery_init.app beat --loglevel=INFO

Nah kalo dia si beat (automated task) akan muncul seperti ini dan mulai ngirimn task.

Tentunya si worker akan nerima.

Penjelasan

Jadi si beat (automated task) yang berasal dari kata heart beat, ini merupakan automated task yang akan jalan sesuai yang kita input, nah di celery_init ini kan udah kita settng nih jalan tiap 10 detik, maka tiap 10 detik dia ngirim task.

app.conf.beat_schedule = {
'say-hello-world': {
'task': 'app.tasks.celery_init.sayHelloWorld',
'schedule': 10.0
},
}

Gimana caranya kalo mau per-menit? Kalo pake Crontab bisa ngga? Bisa banget, ganti sikit.

from celery.schedules import crontab
app.conf.beat_schedule = {
'say-hello-world': {
'task': 'app.tasks.celery_init.sayHelloWorld',
'schedule': crontab()
},
}

Nah kalo kaya code diatas dia defaultnya permenit. Bisa juga dibuat seperti aturan crontab.

from celery.schedules import crontab
app.conf.beat_schedule = {
'say-hello-world': {
'task': 'app.tasks.celery_init.sayHelloWorld',
'schedule': crontab(hour=1, minute=3, day_of_week=1)
},
}

Jika ditranslate artinya seperti ini, cara mengetahuinya pergi ke https://crontab.guru.

Nah beat sudah, sekarang kita coba guys untuk jalanin yang kita buat di TodoController. Alurnya gini:

  1. User membuat Todo.
  2. Todo Controller mengirim pesan ke broker service untuk menjalankan fungsi Create Todo Log dan mengirimkan parameter.
  3. Broker service Create Todo Log mengeksekusi perintah yang ditulis dengan menerima parameter.

Sekarang pastikan worker dan aplikasi kita sudah jalan, saya coba hit endpoint create Todo yang sudah dimodifikasi.

Lalu kita cek ke terminal.

Yup tasknya berjalan, dibuktikan dengan received task -> nama task dan dibawahnya succeeded.

Kita buktikan, masuk ngga sih ke DB kita?

Voila, masuk loh!

Jadi gimana? Gampang kan untuk membuat background job di Python dengan Celery serta membuat beat (automated task). Meskipun kamu pengguna Flask, ngga usah bingung, pada dasarnya konsepnya sama kok, tinggal disesuaikan aja.

Selain itu task Hello World yang saya buat masih bisa dikostumisasi lagi sesuai kebutuhan yang kalian inginkan, gunakan task ketika kita harus menjalankan sebuah proses di belakang yang memakan waktu tunggu lumayan.

Kalo gitu sekian dari saya dan adios~

--

--

Hudya
Hudya

Written by Hudya

Which is more difficult, coding or counting? Not both of them, the difficult one is sharing your knowledge to people without asking the payment.

No responses yet