Ami | 6 Apr 09:13 2016
Gravatar

Celery Task not posted on time

Dear Community,

We've been using Celery for a while, and run into a bizarre situation of a celery task not executed when it was supposed to:

[2016-04-03 23:32:55,459: INFO/MainProcess] Received task: appcardrestapi.workers.triggered_actors.set_trigger[10ed416c-c699-49f8-9dc4-8782dee5e8d0] eta:[2016-04-03 14:00:00.000309+00:00]


We're using Celery 3.1.17 w/ REDIS, running a cluster of two nodes (each on a seperate machine). We're running both long running and short running tasks. We have a long visibility configured, and the time of the execution fits the visibility timeout. However, during 14:00:00 (the original task execution time) there were not restarts / kill -9 / high load on either nodes, there's nothing in the system logs either and nothing that could help me understand why it was not executed when it was supposed to.


From our configuration file:


CELERYD_TASK_SOFT_TIME_LIMIT = 120
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 90000} # 25 hours


I'd much appreciate if anyone has any ideas on what happened, and more importantly how to prevent this situation from happening again. Thank you so much.


Amichay

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
aliwaller50 | 4 Apr 02:57 2016
Picon

Celery, SSL, Message Signing and Encryption

I want to use ssl with celery and cloudamqp on heroku using django. I set BROKER_USE_SSL = True in my settings file. I also appended ?ssl=true to end of my CLOUDAMQP_URL.

It seems to be working, but when I check the logs, celery is using amqp:// instead of amqps:// to do the connection. Is my USE_SSL request being honored? It seems like ssl should be using amqps://. Part of my confusion is the cloudamqp docs say you should use amqps://. When I try to have my url start with amqps:// it throws an error. I think my current settings might be doing ssl because I had to disable librabbitmq (which was saying it couldn't work with the given settings because librabbitmq can't do ssl). Appreciate any help so I can confirm this though.

As a related question, has anyone done Message Signing with celery, and possibly encrypting messages using the auth serializer as described here? Wondering if this is worth it and if anyone knew of tutorials for using pyOpenSSL to generate these keys and certificates. Thanks!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Aaron Jackson | 2 Apr 09:44 2016
Picon
Gravatar

AMQP Connection Loss For Long Running Task

We have a network that employs a standard set of connection timeout rules.  When connections silently fail, the rabbitmq broker keeps the connection instance, channels and queues but drops the underlying network connection.  To avoid having connections drop, I've set the broker heartbeat as follows:

BROKER_HEARTBEAT = 10

This has successfully kept connections up and running for long periods of time with no issues.  However, one of my tasks runs a child process using subprocess.Popen to spawn the child and consume it's input.  Under these circumstances ONLY, I've noticed that the heartbeat logic will fail.  From reading the code, my understanding is that the hub dispatches tasks to the worker and the worker must send the result back to the hub before this goes to the broker.  This seems to imply that the hub can continue to send heartbeats while it's waiting for the child.

Can you see any reason why a suspended worker might cause the amqp heartbeat to be suspended?

Thanks.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Gilles Lenfant | 1 Apr 19:32 2016
Picon
Gravatar

Raise an exception in a task (from consumer)

Hi,

I'm looking for an elegant solution to fire an exception from consumer I could handle in the running task. Workers and consumers may run in different apps.

In the consumer:

ar = AsyncResult(some_job_id)
throw_exception_to_job
(SomeException, params_of_exception)
result
= ar.get()  # -> "interrupted"

And in the task under worker control:

def my_task(any_params):
    to_purge
= []
   
try:
        do_something_long
(to_purge)
       
out = 'done'
   
except SomeException:
       
out = 'interrupted'
       
# Making some cleanups
       
for temp_resource in to_purge:
            specific_cleanup_for
(temp_resource)
   
return out


I tried to use the AsyncResult.revoke(terminate=True), but this required to create another function in which I can't do the job, and objects returned to revoke handlers are not provided to the consumer.
In addition, I may need to return results created before the raised exception to the consumer.

Many thanks in advance for any clue
--
Gilles Lenfant


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Eric Mangold | 25 Mar 22:27 2016
Picon

Celery with zeromq broker example?

Hi,

I've installed celery[zeromq] and I would like to see some examples of using
celery with a zeromq broker, all brought up using python code avoiding
static config files and such where possible...

Is there any specific doc on this?

Anything I should be aware of (I know it's experimental) using this in
production?

ZeroMQ offers certain benefits over RabbitMQ (much easier non-root
deployement / stand-alone deployement - no need to compile/ship erlang
runtime for every platform we support, etc)

Any tips, pointers, help is GREATLY appreciated!

Thank you!
-E

Moinuddin Quadri | 24 Mar 20:26 2016
Picon

Django: Celery Worker not getting started (backed by RabbitMQ)

I am trying to configure djcelery in my Django Application backed by rabbitmq server on Ubuntu 14.04 machine hosted on Google Compute Engine.

On attempt to start celery in debug mode using: python manage.py celery worker -B -E  --loglevel=debug, command is getting terminated with below output:

    [2016-03-24 12:16:09,568: DEBUG/MainProcess] | Worker: Preparing bootsteps.
    [2016-03-24 12:16:09,571: DEBUG/MainProcess] | Worker: Building graph...
    [2016-03-24 12:16:09,572: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, StateDB, Autoreloader, Beat, Consumer}
    [2016-03-24 12:16:09,575: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
    [2016-03-24 12:16:09,576: DEBUG/MainProcess] | Consumer: Building graph...
    [2016-03-24 12:16:09,577: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
    <user> <at> <gce.host>:~/path/to/my/project$

What can possibly be the cause of this issue?
The same setup is running on my local ubuntu machine and as far as I remember, I have followed all the steps on my cloud server.

==================================================================================

Additional Info: Things I verified

  1.  RabbitMQ server is running fine. Output from log file:

> =INFO REPORT==== 24-Mar-2016::17:02:14 === accepting AMQP connection <0.209.0> (127.0.0.1:42326 -> 127.0.0.1:5672)
> =INFO REPORT==== 24-Mar-2016::17:02:14 === accepting AMQP connection <0.219.0> (127.0.0.1:42327 -> 127.0.0.1:5672)
> =INFO REPORT==== 24-Mar-2016::17:02:17 === accepting AMQP connection <0.229.0> (127.0.0.1:42328 -> 127.0.0.1:5672)
 
 2.  Port `5672` is open on my machine. I have also opened port: `tcp:5555`, `tcp:4369`, `tcp:15672`, `tcp:5671` as is mentioned here (to be on the safer side).


Celery's Configuration in my project:

Installed `celery` and `django-celery` package. Created `rabbitMQ` user and set its permissions with commands: 

    sudo rabbitmqctl add_user <user> <password>
    sudo rabbitmqctl set_permissions -p / <user> ".*" ".*" ".*"

In settings.py file, I have added:

    import djcelery
    djcelery.setup_loader()
    MIDDLEWARE_CLASSES = [ 'django.middleware.transaction.TransactionMiddleware',
                           ..]
    INSTALLED_APPS = ['djcelery',
                      ..]

Content of celery.py is as:

    from __future__ import absolute_import
    import os
    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab
    from django.conf import settings

    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<my_project>.settings')
    app = Celery('<my_project>')
    # Using a string here means the worker will not have to
    # pickle the object when using Windows.
    app.config_from_object('<my_project>.settings')
    # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    app.conf.update(
        CELERY_ACCEPT_CONTENT = ['json'],
        CELERY_TASK_SERIALIZER = 'json',
        CELERY_RESULT_SERIALIZER = 'json',
        BROKER_URL = 'amqp://<user>:<password> <at> localhost:5672//',
        # BROKER_URL = 'django://',
        CELERY_RESULT_BACKEND = "amqp",
        CELERY_IMPORTS = ("<module1>.tasks", "<module2>.tasks.tasks", "<module3>.tasks.tasks"),
        CELERY_ALWAYS_EAGER = False,
        # CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
        # CELERY_TIMEZONE = 'Europe/London'
        CELERY_TIMEZONE = 'UTC',
        CELERYBEAT_SCHEDULE = {
            'debug-test': {
                'task': '<module1>.tasks.test_celery',
                'schedule': timedelta(seconds=5),
                # 'args': (1, 2)
            },
        }
    )


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Retry Generates Multiple Tasks with Identical UUID

Hello,

I'm using redis as a backend.  I want most of my tasks to silently retry and then, if they ultimately fail, send an email.

To do this, I found created an abstract class as follows, based on this Stack Overflow:
https://stackoverflow.com/questions/6499952/recover-from-task-failed-beyond-max-retries


class FailLoudlyTask(task.Task):
   
abstract = True

   
def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
        submission_id
= args[0]
       
if status == "FAILURE":
            msg
= ("{task} FAILED for id: {id} ").format(
                task
=self.__name__, id=submission_id)
            logger
.info(msg)

            email
= EmailMessage(
               
"django-submission failed permanently",
                msg
,
               
'sad-server-hcDgGtZH8xNBDgjK7y7TUQ@public.gmane.org',
                ERROR_DESTINATION_LIST
,
           
)
            email
.send(fail_silently=False)

            submission
= SubmissionRequest.objects.get(id=submission_id)
            submission
.has_unexpected_task_failure = True
            submission
.save()

           
if self.__name__ != 'email_pdf_to_customer':
                email_pdf_to_customer
.delay(submission_id)


First, is there a more direct feature in Celery to get an email when a task has exhausted its retries, e.g., an argument in the <at> task decorator?

Secondly, why it would spawn thousands of identical tasks?  From my log file:

[...snip...]
[2016-03-24 03:28:14,113: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[baf25772-5e17-4a68-8e27-c86dc9be015b] eta:[2016-03-24 07:27:29.471604-04:00]
[2016-03-24 03:28:14,114: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[baf25772-5e17-4a68-8e27-c86dc9be015b] eta:[2016-03-24 07:27:29.472951-04:00]
[2016-03-24 03:28:14,116: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[baf25772-5e17-4a68-8e27-c86dc9be015b] eta:[2016-03-24 07:27:29.514215-04:00]
[2016-03-24 03:28:14,117: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[baf25772-5e17-4a68-8e27-c86dc9be015b] eta:[2016-03-24 07:27:29.515779-04:00]
[2016-03-24 03:38:14,137: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[8bb7657b-d437-4857-85ac-fb019166d87f] eta:[2016-03-24 06:38:12.984020-04:00]
[2016-03-24 03:38:14,139: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[8bb7657b-d437-4857-85ac-fb019166d87f] eta:[2016-03-24 06:38:13.022189-04:00]
[2016-03-24 03:38:14,141: INFO/MainProcess] Received task: dept.tasks.email_pdf_to_customer[8bb7657b-d437-4857-85ac-fb019166d87f] eta:[2016-03-24 06:37:56.972094-04:00]
[...snip...]


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Stephen Burrows | 22 Mar 19:01 2016
Picon
Gravatar

Stack frames missing

I'm working on a project with a large number of celery tasks. The code all seems correct, but one of these tasks is generating junk data. I've put the following in place as a quick way to catch a stack trace:

try:
   
raise Exception
except:
   
import traceback
    logger
.error('error message', exc_info=True)

But the traceback I get out of this (when run through celery) only includes the latest frame, where the exception is raised, and doesn't tell me which task is currently being run. I've tried also directly passing in the current stack trace - no effect. Anyone know what's going on?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Ravindra Buddharaju | 22 Mar 12:27 2016
Picon

celery Messages are stuck in Queue


Celery messages are stuck in queue, I have tried with cloudamq and rabbitmq as well.

We use Ubuntu server edition and I noticed following two symptoms:
1. The celery main process cpu, which consumes the messages is 100% and the rest of the worker processes are defunct.
2. Few messages stuck in Queue with unacknowledged status.

I have seen this behaviour, when we submit in general 100+ tasks at once, not all the time, but we are frequently running into this issue.

Have any one else noticed this?

Please let me know, if you need more information to help.
Any pointers or help is greatly appreciated.

Thanks,
​​Ravi


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Bernhard Mäder | 21 Mar 13:32 2016
Picon

Growing memory footprint when logging level is INFO

Hey group

I have a growing memory footprint issue in a production installation running celery with django. We use rabbitmq as broker and redis as result backend and are running on Python 2.7. We use the threading instead of prefork.

What I see is a slowly growing memory footprint. It builds up over a day or two and I need to restart celery to make it shrink back to normal size again.

We have one particularity in our setup: we log a lot. We use "-l INFO" with celery worker. And, since celery logs 2 lines of text for each task, we have quite an amount of log being pushed around, even when the actual tasks log nothing.

I can reproduce the issue on my local machine. Before you ask, yes, I have DEBUG disabled. Also, the reproduction is using an empty task. The problem is exacerbated if I log some other stuff from the empty task. So, I'm quite confident that logging has something to do with my issue.

My question is: Is this commonly known and just a limitation of the python interpreter? Or is it worth to dig deeper?

I think I have two options here:
1) Disable logging of celery.worker to reduce the log by 95% and thus increase the time to restart.
2) Switch to prefork and use MAX_TASKS_PER_CHILD

Which one would you guys prefer?


Thanks,
Bernhard


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Asi Li | 21 Mar 15:51 2016
Picon

django-celery: last chord callback is not being called

I have the following workflow:


work = chain(taskA, taskB)
return work()
taskB: creates the following workflow:
work = group(group(taskC, taskD) | taskE), group(taskC, taskD) | taskE)) | taskF
return work()
and the inner group(bold) creates another group(taskG, taskH, ..) | taskV


The problem I see is that taskF is not being called. all other inner tasks have completed.

I'm using latest django-celery with redis


does anyone know what went wrong?

is it related to the dynamic created inner chords?


Thanks

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane