Brenda Bell | 18 May 22:29 2016
Picon
Gravatar

Independent redis connection disables celery event receiver

I have an app that's basically monitoring celery tasks. The code was pretty much lifted from the celery documentation:

            with self.celery_app.connection() as connection:
                try:
                    connection.ensure_connection(
                        errback=self.on_connection_error,
                        max_retries=settings.CONNECTION_MAX_RETRIES,
                        interval_start=2,
                        interval_step=2,
                        interval_max=30,
                        callback=None)
                    recv = self.celery_app.events.Receiver(connection, handlers={
                            'task-sent': self.on_task_event,
                            'task-received': self.on_task_event,
                            'task-started': self.on_task_event,
                            'task-succeeded': self.on_task_event,
                            'task-failed': self.on_task_event,
                            'task-revoked': self.on_task_event,
                            'task-retried': self.on_task_event,
                            'worker-online': self.on_worker_event,
                            'worker-heartbeat': self.on_worker_event,
                            'worker-offline': self.on_worker_event,
                    })
                    recv.capture(limit=None, timeout=None, wakeup=True)

                except connection.connection_errors + connection.channel_errors as exc:
                    ...

                except (KeyboardInterrupt, SystemExit):
                    ...

                except Exception as exc:
                    ...


This code works as expected until I add some custom redis logic to my app. As soon as I create a redis connection to a different redis host, port and database, the celery capture stops working.

What do I need to do to have an app that both captures celery events and writes custom data to an independent redis hash?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Using a class method as a task. Please Help

According to the celery 3.1 docs i defined the following class method as a task:


in file1.py i defined the class:

from celery.contrib.methods import task_method

app=Celery(*myTasks')

class X(object):
   <at> app.task(filter=task_method)
   def myMethod(self):
        pass
********************************************************************

in file2.py i call the class method:

x = X()
x.myMethod.delay()

But i get the following error
  "Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed."

So instead of calling the method with a lower case,x, i called it with an upper case X:
X.myMethod.delay().
In which case, although the method is called, it never executes.





--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Timothy Wall | 17 May 17:26 2016
Picon

DisabledBackend bugs or misuse?

I implemented a base task which performs some operations against the results backend in order to save additional task state transparently to the decorated task being run.

I found two bugs (or perhaps misuse of the backend) within this new task.

* task.backend.expire() is only supported on some backends
* task.backend.get_task_meta() is implemented on BaseBackend (from which DisabledBackend is derived), but it calls an internal method  _get_task_meta_for which doesn't exist on either BaseBackend _or_ DisabledBackend

My workaround is to check for DisabledBackend before attempting these operations, but it seems kind of hacky.

I would expect any backend to fully support an advertised API and either consistently raise exceptions or ignore unsupported operations or support a reduced API where all operations are supported.

Perhaps the backend was never intended to be a publicly accessible service?

At any rate, I implemented a little bit of additional state saving, following existing state/result manipulation that exists within celery, and I apparently got it wrong somehow.




--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Timothy Wall | 17 May 17:19 2016
Picon

Bug in celery 3.1.23 in app serialization when dispatching a task

I've found after upgrading to celery 3.1.23 from 3.1.11 that when a task is sent to a subprocess (using multiprocess pool), the Celery app is serialized and then deserialized in the subprocess, but some parts of the Celery app are improperly initialized.

Namely, the broker setting is missing (which in the main process is set in the Celery app constructor).  If the broker is passed as part of the celery configuration rather than passed to the constructor, the Celery app is reconstituted properly in the subprocess.


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Jason Johns | 12 May 13:48 2016
Picon

Can I stop celery workers while tasks are running without losing the tasks?

I have a cluster of servers that are configured with an init script to have five workers per server.  These workers process high I/O, low computation complexity tasks.  The task queue is managed by RabbitMQ via a Django application.

Assume that I have ten Celery servers, all running five jobs at once.  Is it possible to shut down a celery worker via 

service celeryd stop

and make changes to the config file in /etc/defaults/celeryd without having the in-process jobs stop?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Nicolas Laurance | 11 May 15:21 2016
Picon

task.replace leads to task hanging

Hi list,

I have hanging tasks with latest celery

 -------------- celery <at> nicolas-desktop v4.0.0rc2 (0today8)
---- **** -----
--- * ***  * -- Linux-4.2.0-35-generic-x86_64-with-Ubuntu-15.10-wily 2016-05-11 13:57:33
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fb6c59100b8
- ** ---------- .> transport:   amqp://guest:** <at> localhost:5672//
- ** ---------- .> results:     redis://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
               



for the sake of demonstration here are my tasks

<at> app.task
def generator():
return list(range(9))

<at> app.task
def runner(*values):
return sum(values)

<at> app.task(bind=True)
def substitute(self, generated):
work = group(runner.s(val, 10)
for val in generated)
raise self.replace(work)

<at> app.task
def finalize(*values):
print('='*50)
return sum(values)

and the call looks like

from tasks import generator
from tasks import substitute
from tasks import finalize

pipeline = generator.s() | substitute.s() | finalize.s()

pipeline.delay().get()



I tried to wrap substitute in a chord with a dummy task, but to no better (accumulate does basically that)

I also tried

pipeline = generator.s() | chord(substitute.s(), finalize.s())
still no better.

the worker hangs (see log)

[2016-05-11 13:57:48,558: INFO/MainProcess] Received task: tasks.generator[1a613f97-4a2d-487d-b6b1-d9c7a3c44821] 
[2016-05-11 13:57:49,104: INFO/MainProcess] Received task: tasks.substitute[a0b71a94-40f8-462c-90ba-846590fd7b15] 
[2016-05-11 13:57:49,104: INFO/PoolWorker-1] Task tasks.generator[1a613f97-4a2d-487d-b6b1-d9c7a3c44821] succeeded in 0.011677501956000924s: [0, 1, 2, 3, 4, 5, 6, 7, 8]
[2016-05-11 13:57:49,108: INFO/MainProcess] Received task: tasks.runner[606b3b62-669f-4f40-8496-1cdba97a8101] 
[2016-05-11 13:57:49,108: INFO/MainProcess] Received task: tasks.runner[aaf6778b-fbfa-4c82-8e32-0a3c68954587] 
[2016-05-11 13:57:49,109: INFO/MainProcess] Received task: tasks.runner[a7ab214d-d9b7-44e4-8a8f-8764ce2bb9f4] 
[2016-05-11 13:57:49,109: INFO/MainProcess] Received task: tasks.runner[e4c37945-309f-4e53-902e-7e4b4ef9f7f1] 
[2016-05-11 13:57:49,111: INFO/PoolWorker-1] Task tasks.substitute[a0b71a94-40f8-462c-90ba-846590fd7b15] ignored
[2016-05-11 13:57:49,112: INFO/PoolWorker-1] Task tasks.runner[606b3b62-669f-4f40-8496-1cdba97a8101] succeeded in 0.000600722967647016s: 10
[2016-05-11 13:57:49,112: INFO/MainProcess] Received task: tasks.runner[22241d7d-75a4-477a-92b7-3e1170517f86] 
[2016-05-11 13:57:49,113: INFO/MainProcess] Received task: tasks.runner[549ffbac-0a62-4bd2-904e-42a69e9fc40a] 
[2016-05-11 13:57:49,113: INFO/PoolWorker-1] Task tasks.runner[aaf6778b-fbfa-4c82-8e32-0a3c68954587] succeeded in 0.0005385979311540723s: 11
[2016-05-11 13:57:49,114: INFO/MainProcess] Received task: tasks.runner[0defcaa3-d04f-4a8f-bbe2-7fc9238d749d] 
[2016-05-11 13:57:49,114: INFO/PoolWorker-1] Task tasks.runner[a7ab214d-d9b7-44e4-8a8f-8764ce2bb9f4] succeeded in 0.0005091530038043857s: 12
[2016-05-11 13:57:49,115: INFO/PoolWorker-1] Task tasks.runner[e4c37945-309f-4e53-902e-7e4b4ef9f7f1] succeeded in 0.00045368995051831007s: 13
[2016-05-11 13:57:49,115: INFO/MainProcess] Received task: tasks.runner[7c804104-d159-45c0-9f28-a9d1d22893eb] 
[2016-05-11 13:57:49,115: INFO/MainProcess] Received task: tasks.runner[5784f568-3f42-4953-a8c1-1c4fcad0576b] 
[2016-05-11 13:57:49,115: INFO/PoolWorker-1] Task tasks.runner[22241d7d-75a4-477a-92b7-3e1170517f86] succeeded in 0.00047107599675655365s: 14
[2016-05-11 13:57:49,116: INFO/PoolWorker-1] Task tasks.runner[549ffbac-0a62-4bd2-904e-42a69e9fc40a] succeeded in 0.0004503619857132435s: 15
[2016-05-11 13:57:49,117: INFO/PoolWorker-1] Task tasks.runner[0defcaa3-d04f-4a8f-bbe2-7fc9238d749d] succeeded in 0.0004548350116237998s: 16
[2016-05-11 13:57:49,118: INFO/PoolWorker-1] Task tasks.runner[7c804104-d159-45c0-9f28-a9d1d22893eb] succeeded in 0.0004735640250146389s: 17
[2016-05-11 13:57:49,119: INFO/PoolWorker-1] Task tasks.runner[5784f568-3f42-4953-a8c1-1c4fcad0576b] succeeded in 0.001147118047811091s: 18
[2016-05-11 13:57:49,120: INFO/MainProcess] Received task: celery.accumulate[a0b71a94-40f8-462c-90ba-846590fd7b15] 
[2016-05-11 13:57:49,120: INFO/PoolWorker-1] Task celery.accumulate[a0b71a94-40f8-462c-90ba-846590fd7b15] succeeded in 0.0003270149463787675s: [10, 11, 12, 13, 14, 15, 16, 17, 18]


Any suggestion on how to debug this or better yet, a solution would be greatly appreciated.
many thanks



--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Ben Sachs | 9 May 23:53 2016
Picon

Priority Queues (4.0 date)?

Hey All -- 

We are eagerly awaiting the day when we can use Priority Queues.  Seems 4.0.0 will support this.  Any idea on when it will drop?

Thanks
Ben Sachs
Software Engineer
Digitalglobe, Inc

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Monty Hindman | 6 May 18:01 2016
Picon
Gravatar

Performance implications of worker pulling from many queues

Are there substantial performance implications that I should keep in mind when Celery workers are pulling from multiple (or perhaps many) queues? For example, would there be a significant performance penalty if my system were designed so that workers pulled from 10 to 15 queues rather than just 1 or 2?

Thanks for the help.

Monty

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
sma | 3 May 19:57 2016

anyway to shutdown all workers like flower?

How does flower cleanly shutdown celery workers?  Is there a cmdline option?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
stuart.axon | 3 May 17:24 2016

Programmatically create periodic tasks in Celery 3.1.23 (or switch to 4.0rc)

It looks like celery 4 will have a way of creating periodic tasks

http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html


On a greenfield project I'm using 3.1.23, is there a similar way to do this ?

Alternately I may be able switch to 4.0rc, if 4.0 is going to come out imminently ?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Madhur Ahuja | 2 May 14:49 2016
Picon
Gravatar

Scaling HTTP tasks with Celery

I am using celery to make http calls which can take almost 1-2 seconds to respond. Each individual task is an HTTP call. The problem I am facing is that I am unable to scale this model because the worker does not do anything till the HTTP call returns and after that only it acknowledges the task.

I want reliability and thus I have ACKS_LATE enabled. 

But what I am really looking for is the solution where worker can asynchronously process other tasks instead of getting blocked for that HTTP call to return. I tried looking for solutions to integrate Tornado, Twisted with celery but havn't found anything worthwhile.

Any ideas?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at https://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane