Jon Blackburn | 20 Apr 21:27 2015

Revoke with terminate=False is terminating some tasks anyway

Hello.

I'm running Celery with RabbitMQ for transport and mongodb as my backend. Concurrency = 8.

I have a single logical job that consists of a good number of tasks. It is possible for the user to cancel the entire job while I'm in the process of queueing the tasks. When I receive such a request I perform a query to identify tasks that have been sent but not touched by a worker and revoke them using something like this:

    app.control.revoke([ct.Identity() for ct in unprocessedtasks])

I have a Signal attached to the tasks and it logs what I'd expect for most of the revoked tasks:

<at> task_revoked.connect
def handle_task_revoked(request=None, terminated=False, signum=None, expired=False, **kwargs):
    log.info('Task revoked: request: %s, terminated: %s, signum: ^%s, expired: %s, kwargs: %s',
             request, terminated, signum, expired, kwargs)

I see this in the log:

Many, many of these, which is good.
2015-04-20 13:31:52,179 [INFO] 5960 (MainThread) "Task revoked: request: mytask[3mp4-hs23.00000383], terminated: False, signum: ^None, expired: False, kwargs: {'signal': <Signal: Signal>, ..."

... and then a handful of these, which is not good.
2015-04-20 13:31:52,224 [INFO] 5960 (MainThread) "Task revoked: request: <goo>, terminated: True, signum: ^0, expired: False, kwargs: {'signal': <Signal: Signal>, ..."
2015-04-20 13:31:52,226 [ERROR] 5960 (MainThread) <celery.worker.job> "Task mytask[3mp4-hs23.00000383] raised unexpected: Terminated(0,)"
Traceback (most recent call last):
  File "/home/jon/.virtualenvs/apervita/lib/python2.7/site-packages/billiard/pool.py", line 1672, in _set_terminated
    raise Terminated(-(signum or 0))
Terminated: 0


The reason it's not good is that the terminated tasks have left my database in an indeterminate state - no knowing whether the worker killed the task before or after inserting a bunch of data.

I'm wondering why the tasks are being terminated. Is there a timeout that we're exceeding? I would much prefer to let them complete and clean up afterward via my task's on_success method or the handle_task_success Signal.

Thanks,
Jon

Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you have received this communication in error, please notify me by return e-mail, and destroy this communication and all copies thereof, including any attachments.

Apervita ® is a registered trademark of Apervita Inc.

 

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Tommer Wizansky | 14 Apr 00:29 2015
Picon

Broker heartbeat for celerybeat

I've noticed that celerybeat does not use the BROKER_HEARTBEAT parameter and does not support the --heartbeat-interval option.  This means that if a scheduler node goes down unexpectedly the broker will maintain a hanging connection.    For a worker process I can set BROKER_HEARTBEAT but for the celerybeat this does not work.

Does anyone know if there is a way to use heartbeat with celerybeat.

It seems to me that one fix would be in celery.app.base.Celery.connection.  When initializing the connection replace

    heartbeat=heartbeat

with 

    heartbeat=heartbeat or conf.BROKER_HEARTBEAT

Thanks

Tommer

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mike | 13 Apr 22:44 2015
Picon

Move all tasks with a certain argument value

I assume I would need to use celery.contrib.migrate.  I see this sample below which is almost perfect... but I don't want to do it by task id.  Instead, I would rather move all tasks with argument1 == "somevalue" but I'm not sure how to reference the argument of a task.  Any hints? :-)

def is_wanted_task(body, message): if body['id'] == wanted_id: return Queue('foo', exchange=Exchange('foo'), routing_key='foo') move(is_wanted_task)

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Rajanikanth Jammalamadaka | 13 Apr 17:01 2015
Picon

keeping track of celery tasks

Hi

In an application, I am storing the AsyncResult of certain tasks in a list and am periodically cleaning up the list by getting the completed tasks using task.state == 'SUCCESS'

Is there a better approach for doing this?

Thanks,
Raj

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Jose Luis de la Rosa | 10 Apr 01:52 2015
Picon

How to add workers for Celery scheduler in Django

Hello,

I am using Celery scheduler in Django. When a task is running the other tasks seem to be queued. How can I add more workers to celery running in django as scheduler?

Thanks in advance

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Luca | 9 Apr 20:10 2015
Picon

Celery group revoke notification

In my django app I have some tasks, which I run at the same time as a group. My group can end on its own or can be revoked by the user.
What I need (and I can't find) is some way to be notified when the group ends, no matter if it was revoked or not.
I tried to create a task who does a join() or a get() on the group, but it only works when the group ends on its own. If the group is revoked the join() or get() functions never return anything and my task gets stuck waiting forever.
I tried looking into signals, but I found nothing about groups being revoked, only tasks. Do you know how I could implement a solution? thanks

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Victor Poluksht | 9 Apr 12:38 2015
Picon

yet another celery logging issue

Hello.

I have two issue with configuring celery logging.

1. I have instances of "task processors" and don't want to use celery logging to file at all. Can i somehow disable it at all?

2. I have configured logging "inside", adding sysloghandler

def after_setup_logger_handler(**kwargs):
    logger = kwargs.get('logger')
    format = '%(levelname)s - %(message)s'
    handler = SysLogHandler(address=('/dev/log'), facility='local7')
    handler.setFormatter(logging.Formatter(format))
    numeric_level = getattr(logging, config.CELERY_LOG_LEVEL.upper(), None)
    if not isinstance(numeric_level, int):
        raise ValueError('Invalid log level: {}'.format(config.CELERY_LOG_LEVEL))
    handler.setLevel(numeric_level)
    logger.setLevel(numeric_level)
    logger.addHandler(handler)
    logger.propagate = False


after_setup_logger.connect(after_setup_logger_handler)
after_setup_task_logger.connect(after_setup_logger_handler)

But the question is, can i somehow split task log and celery log?
I've tried to make different "after_setup_logger_handler" functions, but celery log always handles both celery internal, and task log.

Any working example?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mike | 8 Apr 21:24 2015
Picon

MaxRetriesExceeded Error when max_retries=None?

I have a task that is set for max_retries=None, and the docs say this.  Does this not work for raise.retry() ?  Or is none for this one mean use the default?. 

Task.max_retries = 3

Maximum number of retries before giving up. If set to None, it will never stop retrying.




--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Yogesh Panchal | 8 Apr 13:03 2015

Memory issue while processing 700k images using python pil and celery

Hi,

I want to resize more than 700k images using python pil and celery, when add task to celery it starts processing images but after completion of task it doesn't release the memory and celery process occupies maximum memory and server goes out of memory, is there any solution for this ? i tried this link https://github.com/celery/celery/issues/1129  but it is not working in my case. Following is code i am using for re-sizing images,

<at> app.task
def generate_thumbnail(url, file_name_to_save):
response = requests.get(url)

if not os.path.exists(BASE_DIR):
os.mkdir(BASE_DIR)

if not os.path.exists(IMG_DIR):
os.mkdir(IMG_DIR)

if(response.status_code == 200):
original = Image.open(StringIO(response.content))
original.thumbnail((205, 215), Image.ANTIALIAS)
#Save to local disk temporarily
file_to_save_path = os.path.join(IMG_DIR, "temp")
original.save(file_to_save_path, "JPEG")

#Upload file to S3
s3_image_file = Key(bucket)
s3_image_file.key = file_name_to_save
s3_image_file.set_contents_from_filename(file_to_save_path, {'Content-Type': 'image/jpeg'})

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Patrick P. | 7 Apr 05:36 2015
Picon

Celery worker tasks only fail when daemonized with supervisord

I have a Celery task that invokes a Linux binary using `subprocess.Popen` in Python to execute a command on the command-line interface.

It works on localhost. It worked on a previous Ubuntu EC2 deployment. It works on elasticbeanstalk AMI in a terminal session. It does not work on elasticbeanstalk under `supervisord`.

It **does** work if I execute the Python function inside of `ipython` or if I run the `celery worker` manually (e.g. `patrick <at> ec2:~$ celery-worker --app=myapp.celery_app --loglevel=INFO`).

This `celery worker` command is the same script which is daemonized using `supervisord` (using the same exact env vars). However, under `supervisord` the CLI program invoked using Python `subprocess.Popen` is halted when it complains about running out of memory, and then the Celery task fails when it doesn't receive the output it likes from the CLI.

My *hypothesis* is that `supervisord` is to blame ... Any ideas?

At this point my next move is to daemonize the `celery worker` script which works elsewhere using something other than `supervisord`?

(Or building a new elasticbeanstalk AMI ...)

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Patrick Wolf | 4 Apr 02:52 2015
Picon

substantial drift was encountered even though UTCOFFSET is correct

We got 3 branches in 3 timezones here: STR, LAX, PEK. When starting the workers I always encounter the "a substantial drift was encountered" error.

For example in STR I get:
[celery.redirected] celery <at> str_worker ready. (WARNING)
[celery.events.state] Substantial drift from celery <at> pek_worker may mean clocks are out of sync.  Current drift is 21600 seconds.  [orig: 2015-04-04 00:27:54.082000 recv: 2015-04-04 06:27:54.175000]
[celery.events.state] Substantial drift from celery <at> lax_worker may mean clocks are out of sync.  Current drift is 32401 seconds.  [orig: 2015-04-04 00:27:55.049000 recv: 2015-04-03 15:27:54.959000]

Looking at the drift it shows that its exactly the UTC offset:
21600 seconds = 6 hours / str utcoffset=-2 / pek utoffset=-8 = 6 hours
32401 seconds = 9 hours / str utcoffset=-2 / lax utoffset=7 = 9 hours

The interesting thing is that the UTC offset is set correctly as "events -d" shows
-------------------------
(celery) S:\>celery events -d
-> evdump: starting capture...
-------------------------
celery <at> str_worker [2015-04-04 09:35:12.005000] heartbeat: active=0, clock=30275192, 
freq=2.0, loadavg=[0.0, 0.0, 0.0], local_received=1428107712.07, 
pid=4800, processed=0, sw_ident=py-celery, sw_sys=Windows, sw_ver=3.1.17, utcoffset=-2
-------------------------
celery <at> lax_worker [2015-04-04 00:35:12.033000] heartbeat: active=2, clock=30275192, 
freq=2.0, loadavg=[0.0, 0.0, 0.0], local_received=1428107712.01, 
pid=3676, processed=6364, sw_ident=py-celery, sw_sys=Windows, sw_ver=3.1.17, utcoffset=7
-------------------------
celery <at> pek_worker [2015-04-04 15:35:13.424000] heartbeat: active=0, clock=30275194, 
freq=2.0, loadavg=[0.0, 0.0, 0.0], local_received=1428107713.03, 
pid=2576, processed=0, sw_ident=py-celery, sw_sys=Windows, sw_ver=3.1.17, utcoffset=-8
-------------------------

Also the config has UTC enabled:
CELERY_ENABLE_UTC = True

So I'm wondering how this can be fixed?

Thanks!
Patrick

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane