Tanya Torgaeva | 19 Apr 09:18 2016
Picon

Celery workers keeps restarting. Celery multi. Django

Hello!
I'm using Celery 3.1.15 in my Django project. I have three different workers with three queues and everything looks OK except logs. My workers keep restarting everytime.

This is my project settings: 
    
    from celery import Celery
    from django.conf import settings

    app = Celery('project')
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

This is my celery settings:

    CELERY_QUEUES = (
        Queue('default', Exchange('default'), routing_key='default'),
        Queue('urgent', Exchange('urgent'), routing_key='urgent'),
        Queue('long_and_urgent', Exchange('long_and_urgent'), routing_key='long_and_urgent')
    )
    CELERY_DEFAULT_QUEUE = 'default'
    CELERY_DEFAULT_EXCHANGE = 'default'
    CELERY_DEFAULT_ROUTING_KEY = 'default'
    CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
    
    class SuplbizRouter(object):
        def route_for_task(self, task, args=None, kwargs=None):
            tasks_dict = {}
            tasks_dict['urgent'] = [some tasks]
            tasks_dict['long_and_urgent'] = [some tasks]
    
            if task in tasks_dict['urgent']:
                return {'exchange': 'urgent', 
                        'exchange_type': 'direct',
                        'routing_key': 'urgent'}
            if task in tasks_dict['long_and_urgent']:
                return {'exchange': 'long_and_urgent',
                        'exchange_type': 'direct',
                        'routing_key': 'long_and_urgent'}
            return {'exchange': 'default',
                    'exchange_type': 'direct',
                    'routing_key': 'default'}
    CELERY_ROUTES = (SuplbizRouter(), )


And this is part of my celery.conf file. I try to run my workers via celery multi command.

    pre-stop script
        /project//venv/bin/celery multi stop default_worker urgent_worker long_and_urgent_worker -A project  --workdir=/project/  --pidfile=/var/run/celery/%n.pid 
    end script
    
    script 
        /project//venv/bin/celery multi start default_worker urgent_worker long_and_urgent_worker -A project -E -B:1 -Q:1 default -Q:2 urgent -Q:3 long_and_urgent --workdir=//project/ --concurrency=5 --uid=suplbiz --gid=suplbiz --pidfile=/var/run/celery/%n.pid --logfile=/var/log/upstart/celery/%n.log 
    end script

I run celery with that command `service celery start` and everything works as expected! BUT! When I look at celery logs I see that workers keeps restarting every time!

    celery multi v3.1.15 (Cipater)
    > Starting nodes...
    > default_worker-KDUzePXb9tMANPMfv7sqcg@public.gmane.org: OK
    > urgent_worker <at> new.supl.biz: OK
    > long_and_urgent_worker-KDUzePXb9tMANPMfv7sqcg@public.gmane.org: OK
    
    > Starting nodes...
        > default_worker-KDUzePXb9tMANPMfv7sqcg@public.gmane.org: ^[[1;32mOK^[[0m
        ERROR: Pidfile (/var/run/celery/default_worker.pid) already exists.
        Seems we're already running? (pid: 15436)
        > urgent_worker-KDUzePXb9tMANPMfv7sqcg@public.gmane.org: ^[[1;32mOK^[[0m
        ERROR: Pidfile (/var/run/celery/urgent_worker.pid) already exists.
        Seems we're already running? (pid: 15445)
        > long_and_urgent_worker-KDUzePXb9tMANPMfv7sqcg@public.gmane.org: ^[[1;32mOK^[[0m
        ERROR: Pidfile (/var/run/celery/long_and_urgent_worker.pid) already exists.
        Seems we're already running? (pid: 15456)
        ^[[1;36mcelery multi v3.1.15 (Cipater)^[[0m
    > Starting nodes...
        > default_worker-KDUzePXb9tMANPMfv7sqcg@public.gmane.org: ^[[1;32mOK^[[0m
        ERROR: Pidfile (/var/run/celery/default_worker.pid) already exists.
    .....


Does anybody know what I do wrong? :( I can't find any appropriate info in the internet :( Thanks!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Pedro Sá da Costa | 18 Apr 21:14 2016
Picon

Catch invocation errors in celery

I have a celery worker that is connected to RabbitMQ so that a client can make requests to the worker. For the requests, I use the celery functions`apply_async` and `apply` to invoke functions remotely.

I use the parameter `expire` to determine how long the task will wait to finish. I do this with the hope to deal with tasks that hang, but in my example this doesn't work. When I run this example with the worker down, the caller hangs indefinitely.

Here is an example, where the function `hello` runs remotely, and `test_hello` invoke this function:

    def test_hello():
        hosts = ["host1", "host2", "host3"]
        for host in hosts:
            print "%s\n" % host
            output = myapp.hello.apply_async(queue=host, args=(host,), expires=70)
            print output.get()

    <at> task(name='myapp.hello')
    def hello(hostname):
        """ dummy function that returns a simple message """
        time.sleep(60)
        return "Hello %s" % hostname

    if __name__ == "__main__":
        print "test_hello"
        test_hello()

My problem is that I want to catch the error when the worker is down, or when the request hangs because the worker crashed in the  middle of the call. How can I catch these errors?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
xeon Mailinglist | 18 Apr 21:52 2016
Picon

Catch invocation errors in celery

I have a celery worker that is connected to RabbitMQ so that a client can make requests to the worker. For the requests, I use the celery functions`apply_async` and `apply` to invoke functions remotely.

I use the parameter `expire` to determine how long the task will wait to finish. I do this with the hope to deal with tasks that hang, but in my example this doesn't work. When I run this example with the worker down, the caller hangs indefinitely.

Here is an example, where the function `hello` runs remotely, and `test_hello` invoke this function:

    def test_hello():
        hosts = ["host1", "host2", "host3"]
        for host in hosts:
            print "%s\n" % host
            output = myapp.hello.apply_async(queue=host, args=(host,), expires=70)
            print output.get()

    <at> task(name='myapp.hello')
    def hello(hostname):
        """ dummy function that returns a simple message """
        time.sleep(60)
        return "Hello %s" % hostname

    if __name__ == "__main__":
        print "test_hello"
        test_hello()

My problem is that I want to catch the error when the worker is down, or when the request hangs because the worker crashed in the  middle of the call. How can I catch these errors?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Timothy Wall | 18 Apr 02:13 2016
Gravatar

Abstract task derived from Task loses request/Context

I have a custom abstract task, set as the default returned by Celery.Task (task_cls field is set to MyTask)

    def MyTask(Task):
        abstract = True
        def __call__(*args, **kwargs):
            return super(MyTask, self).__call__(*args, **kwargs)


    <at> app.task(bind=True)
    def mytask(task, *args, **kwargs):
        assert len(task.request.args) > 0

When the task is invoked via .apply (or called directly), a proper request/Context is built by tracing wrappers, and then an empty context is pushed when Task.__call__ is called.

Have I somehow configured something improperly?


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Timothy Wall | 15 Apr 22:00 2016
Gravatar

kombu.transport.base.register_with_event_loop

I've been trying to get a worker running using the in-memory transport (for supporting unit testing of task execution).

Transport.register_with_event_loop() is called with a "connection" and "hub" arguments, and yet the base class accepts only a single argument "loop".  Looks like some Transport derived classes were updated while others were not.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mitch LeBlanc | 14 Apr 23:34 2016
Picon
Gravatar

Workers running after long delay, not concurrently. Chords issue?

I created a task which creates a bunch of sub-tasks (to fetch data) and then chords them together with a callback.

Looks something like this:

<at> celery_app.task()
def generate_report():


   id_chunks
= list(utils.chunks(ids))  # generates a list of 20 id chunks, for each task to run, rather than using built-in chunks

    chord
(fetch_metadata.s(ids) for ids in id_chunks)(write_report.s())



When I run this, however, it seems like only one worker runs at a time. In addition, there is a large (~15 seconds) delay between each task being picked up.

Any idea what might be causing this? We feel like it is a configuration issue, because this doesn't seem to happen on local. However, I have other tasks (e.g. a deletion job) that uses Celery chunks and that seems to run just fine.

In this case, I stopped using built-in chunks because I was unable to get retries to work.

Any ideas? Our configuration is pretty damned bare bones.

celery:
  resultExpiry
: 300  # seconds
  rateLimit
: 10/s
  visibilityTimeout
: 3600  # seconds
  scheduler
:
    maxLoopInterval
: 300  #seconds
    databaseSyncInterval
: 30  # seconds




--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
John Rinehart | 13 Apr 18:46 2016
Picon

autoreload

I've been authoring a vanilla application in a vanilla working environment. All version information is below. I can not get autoreload to detect changes to the one imported module. It's a real pain to have to kill and restart celery every time I make an application change. I understand that autoreload depends on pyinotify, can only monitor the imported modules, and apparently suffers a lag in detecting changes. But, I can deal with all of that. I just want celery's autoreload to detect a change to the imported module. I am open to workarounds, but I would prefer if I could get celery to work automatically.j

OS version: Ubuntu 15.10 x64 (Linux some_hostname 4.2.0-35-generic #40-Ubuntu SMP Tue Mar 15 22:15:45 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux)
Celery: 3.1.23 (Cipater)
pyinotify==0.9.6

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
bsergean | 12 Apr 18:55 2016
Picon

Processing tasks in LIFO order

Hi there,

I'm trying to have my task being processed in a last created / first processed order. The use case is that if the queues are being backed up and I cannot process the task fast enough, I want to see the last stuff first. I am using celery in a very simple way, I call my_function.delay(data) from within a flask application endpoint handler. Also I'm using the Redis backend. 

Any idea on how to do that ? Redis has a queue data structure where you can push at left or right ends which makes implementing this straightforward. However, since Celery supports many backends it might be more involved.

Thanks !
- Benjamin

ps:
I'm using celery 3.1.23 (OSX for dev and linux for deployment)

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Dorin Adrian Rusu | 12 Apr 18:35 2016
Picon

Rate_limit not imposing the specified limit

I have two rabbitmq nodes each providing queues for 5 nodes who are running Celery. Each node has it's own separate queue and each of the 10 worker nodes run Celery with a concurrency of 4.

When I broadcast a rate_limit to each Celery, the rate does not seem to be the imposed one.

The code I use to broadcast the rate_limit:


app = Celery('tasks', broker="amqp://%s:%s <at> %s/%s" % (config.rabbit_user, config.rabbit_pass, rabbit_ip, config.rabbit_vhost))
app.control.broadcast('rate_limit', arguments={'task_name': 'tasks.read, 'rate_limit': '100/s'})

From my understanding this limit is per worker instance and not global, therefor I'm expecting 100 messages per second for each celery worker of the 10 nodes. Normally they are consuming about 500-600 per second, yet after the broadcast I get about 1.6 per second on every node. I checked the log and it's about 8 messages every 5 seconds.

Raising the limit to 200/s I'm getting anywhere between 2/s and 2.4/s and raising it to 400/s I'm getting 300+/s.


Am I doing something wrong?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Chaim Peck | 12 Apr 17:57 2016
Picon
Gravatar

Rotating log files

If I have a "celery multi" command set to log to a directory (i.e. --logfile=path/to/celery-logs/%n.log), what is the ideal way to rotate those log files?

Is it possible to have celery use a RotatingFileHandler?

Thanks,
Jeff

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Will Dampier | 9 Apr 16:35 2016
Picon

Using an SGE Queue as a backend

I'm working on a cluster that uses SGE to manage jobs across the worker nodes. Is there a way to use the SGE queue as the broker in a way that will cooperate with other people submitting jobs through non-celery means. I currently use python-gridmap to submit python jobs to the SGE queue. Would I need to make a new Broker, or Consumer, both?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane