Issue
I have created a celery task as below
import os
import time
from celery import Celery
from dotenv import load_dotenv
load_dotenv()
celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND")
@celery.task(name="create_task")
def message_sender(sender_func, numbers: list, message: str):
sender_func(numbers, message)
return "Sent Successful"
And calling the task as below
modem_conn = Modem()
task = message_sender.apply_async(
kwargs={
"sender_func": modem_conn.sms,
"numbers": ["00000000"],
"message": "sms sent",
}
)
But I am getting bellow error
kombu.exceptions.EncodeError: Object of type method is not JSON serializable
But if I call the task without delay
or apply_async
, then it workes. What could be the problem here and how can I achive this.
All I want to do is pass a function or instance while calling the celery task.
Solution
The celery task is run in another instance than your app, and both instances communicate via the broker. Since you don't "call" the task function, but only send messages with serialized data that tell the worker which function to call, you can't send objects or functions. This is similar to multiprocessing, where only serialized text messages can be sent between the processes.
My approach would be to make the function known to the worker and then send e.g. a string with the name of the function and call it.
sender:
task = message_sender.apply_async(
kwargs={
"sender_func": "sms",
"numbers": ["00000000"],
"message": "sms sent",
}
)
worker:
@celery.task(name="create_task")
def message_sender(sender_func, numbers: list, message: str):
modem_conn = Modem()
if sender_func == "sms":
modem_conn.sms(numbers, message)
return "Sent Successful"
You could also use getattr or locals()
Answered By - bechtold Answer Checked By - Katrina (PHPFixing Volunteer)
0 Comments:
Post a Comment
Note: Only a member of this blog may post a comment.