tapan pandita | 23 Apr 2013 08:20
Picon
Gravatar

Removing a task from the queue

If I have added a task to a queue, and it hasn't been picked up by a worker
yet, how do I remove it? I basically want the task to expire in 5 seconds.
So if it hasn't been picked up in 5 seconds, I want it to be removed.

Pedro Werneck | 22 Apr 2013 19:20
Picon

Out of memory when retrying tasks


I'm running Celery 3.0.18 with RabbitMQ 3.0.2, executing a dozen million tasks every day. All of them are using custom queues, none is using the default celery queue.

One of those tasks, in the middle of the workflow, is a request connecting to a third party server, which sometimes returns an error and the task must be retried in 10s, up to 30 times.

My problem is that with this heavy workload, sometimes I get a lot of those errors in a short period of time, and the worker processes eating more and more memory to store them, until the processes is killed.

The retry delay isn't important, so is there any way that I can immediately send the failed tasks back to the end of the RabbitMQ queue without having to explicitly call the task and manage the retry attempts by myself?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
v0idnull | 22 Apr 2013 17:13
Picon
Gravatar

Celery + RabbitMQ - nothing seems to function and I can't figure out why

celery 3.0.19
rabbitmq 2.8.4
ubuntu 12.10

my config:

CELERY_TIMEZONE = 'UTC'
CELERY_RESULT_BACKEND = 'database'
CELERY_RESULT_DBURI = 'postgresql://postgres:postgres <at> localhost/psi-rt'
CELERY_RESULT_ENGINE_OPTIONS = {"echo": True}
CELERY_TASK_SERIALIZER = 'json'

BROKER_URL = "amqp://rt-crawler:rt-crawler <at> neuromancer/rt-crawler-host"

my celery start script:

import sys
import os
from celery import Celery
import cconfig
from crawler.tasks import rt

sys.path.append(os.path.dirname(os.path.basename(__file__ + "/../")))

celery = Celery('tasks')
celery.config_from_object(cconfig)

the last import I have to do just to register the tasks I believe with celery, they are all marked with the <at> task decorator

So I run this little script with

celery -A ctest.py worker -E --loglevel=debug

this spits out:

[Tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . crawler.tasks.rt.dmap
  . crawler.tasks.rt.fetchSubmissionContent
  . crawler.tasks.rt.fetchSubmissions
  . crawler.tasks.rt.getCalaisFacets
  . crawler.tasks.rt.getPsiopicFacets
  . crawler.tasks.rt.submitToDatabase


[2013-04-22 10:55:02,174: DEBUG/MainProcess] [Worker] Loading modules.
[2013-04-22 10:55:02,214: DEBUG/MainProcess] [Worker] Claiming components.
[2013-04-22 10:55:02,215: DEBUG/MainProcess] [Worker] Building boot step graph.
[2013-04-22 10:55:02,216: DEBUG/MainProcess] [Worker] New boot order: {ev, queues, beat, pool, mediator, autoreloader, timers, state-db, autoscaler, consumer}
[2013-04-22 10:55:02,221: DEBUG/MainProcess] Starting celery.worker.hub.Hub...
[2013-04-22 10:55:02,221: DEBUG/MainProcess] celery.worker.hub.Hub OK!
[2013-04-22 10:55:02,222: DEBUG/MainProcess] Starting celery.concurrency.processes.TaskPool...
[2013-04-22 10:55:02,226: DEBUG/MainProcess] Closed channel #1
[2013-04-22 10:55:02,232: DEBUG/MainProcess] celery.concurrency.processes.TaskPool OK!
[2013-04-22 10:55:02,232: DEBUG/MainProcess] Starting celery.worker.consumer.Consumer...
[2013-04-22 10:55:02,232: WARNING/MainProcess] celery <at> neuromancer ready.
[2013-04-22 10:55:02,233: DEBUG/MainProcess] consumer: Re-establishing connection to the broker...
[2013-04-22 10:55:02,238: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2012 VMware, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'consumer_cancel_notify': True, u'publisher_confirms': True, u'basic.nack': True}, u'platform': u'Erlang/OTP', u'version': u'2.8.4'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2013-04-22 10:55:02,238: DEBUG/MainProcess] Open OK!
[2013-04-22 10:55:02,238: INFO/MainProcess] consumer: Connected to amqp://rt-crawler <at> neuromancer:5672/rt-crawler-host.
[2013-04-22 10:55:02,239: DEBUG/MainProcess] using channel_id: 1
[2013-04-22 10:55:02,239: DEBUG/MainProcess] Channel open
[2013-04-22 10:55:02,366: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->32
[2013-04-22 10:55:02,367: DEBUG/MainProcess] using channel_id: 2
[2013-04-22 10:55:02,369: DEBUG/MainProcess] Channel open
[2013-04-22 10:55:02,380: DEBUG/MainProcess] consumer: Ready to accept tasks!

--

ok so far so good I suppose. So in another script, called csend, I have the following, to start a chain of tasks:

<at> task
def dmap(it, callback):
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)

res = chain(rt.fetchSubmissions.s(cconfig.RT_USER_AGENT, cconfig.RT_USERNAME, cconfig.RT_PASSWORD, cconfig.RT_SUB),
       dmap.s(rt.fetchSubmissionContent.s())
).delay()

all these tasks work on their own. This dmap function I pulled from this helpful stackoverflow: http://stackoverflow.com/questions/13271056/how-to-chain-a-celery-task-that-returns-a-list-into-a-group

I run this script simply as: python csend.py

Now, doing this, I see no action in celery, but I see messages getting added to rabbitmq. Whats weird, is that the messages are added to the virtual host "/" instead of "rt-crawler-host". When I looka t my rabbitmq logs, there is only this when I send the task:

=INFO REPORT==== 22-Apr-2013::11:09:20 ===
accepting AMQP connection <0.15058.0> (127.0.0.1:42221 -> 127.0.0.1:5672)

=WARNING REPORT==== 22-Apr-2013::11:09:20 ===
closing AMQP connection <0.15058.0> (127.0.0.1:42221 -> 127.0.0.1:5672):
connection_closed_abruptly

well, I don't really know what else to do? nothing happens after this point. There is no additional logs or information. I am at a total loss here, so any help would be greatly appreciated, thanks

--v0id

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Lu Ali | 22 Apr 2013 11:01
Picon
Gravatar

[3.0.12] performing list(res.collect()) on chained result

Versions:
-----------------------------------------------------------
celery          - 3.0.12       - active
django-celery   - 3.0.11       - active

Hello,

Apologies if this is a daft question.

When chaining subtasks, my understanding is that performing collect() on the result object will flatten the results (if a list is required)

my code is as follows:-


<at> task
def add(x, y):
    return x + y

<at> task
def tsum(numbers):
    return sum(numbers)

<at> task
def mul(x, y):
    return x*y

res = chain(add.s(4, 4), mul.s(8), mul.s(10))

# have tried with intermediate=True as well
result = res.apply_async()

I can see that I am getting the result back (i.e. it has result.parent.get().. result.parent.parent.get() etc)

but when I do,

list(res.collect()) I get the following error.


Error
Traceback (most recent call last):
....
  File "/home/krone/.virtualenvs/mathspace/local/lib/python2.7/site-packages/celery/result.py", line 153, in collect
    for _, R in self.iterdeps(intermediate=intermediate):
  File "/home/krone/.virtualenvs/mathspace/local/lib/python2.7/site-packages/celery/result.py", line 169, in iterdeps
    stack.extend((node, child) for child in node.children or [])
  File "/home/krone/.virtualenvs/mathspace/local/lib/python2.7/site-packages/celery/result.py", line 237, in children
    children = self.backend.get_children(self.id)
AttributeError: 'NoneType' object has no attribute 'get_children'


please note I have CELERY_ALWAYS_EAGER=True (for testing purposes) so result is an EagerResult object
I also have TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner', in my settings

I haven't tested it through MQ broker yet, but should it make a difference with respect to collect()
am I missing something?

Cheers
Lu



--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Andres Riancho | 20 Apr 2013 10:56
Picon
Gravatar

Best way to spawn new ec2 instances with celery workers in them

List,


    I want to use Celery in the Amazon cloud. Already figured out how to make it work with SQS, run some tests and everything looks good. My problem now is that I would like my worker pool to autoscale. Please note that in this case, when I refer to worker pool, I mean the ec2 instances which are running celery inside. My ideal case would be:

* Create SQS topic for tasks
* Do NOT start any celeryd
* Run process X which will monitor queue
* Run client which will create new tasks and queue them in SQS
* Process X detects new tasks and spawns new ec2 instances, which will them run celery and consume task(s)
* Process X detects that worker(s) are idle and shuts them down

    This seems to be a pretty common use case for celery in the cloud, so I'm wondering... what are you guys as "process X"? Which problems are you found with your solution? Which benefits?

    Note: This [0] article explains how to use Amazon's autoscaling [1] for replacing "process X", but I'm not sure about the flexibility this provides.

[0] http://aws.amazon.com/articles/1464
[1] http://boto.readthedocs.org/en/latest/autoscale_tut.html

Regards,
-- 
Andrés Riancho
Project Leader at w3af - http://w3af.org/
Web Application Attack and Audit Framework
Twitter: <at> w3af
GPG: 0x93C344F3

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
tapan pandita | 19 Apr 2013 11:02
Picon
Gravatar

Task creation time

Hi,

Given the task-id of a task, how can i find out when the task was created? 
I am using celery with django and a database backend and a rabbitmq broker.

Sebastian Johnck | 18 Apr 2013 21:04

How to handle MaxRetriesExceeded in chord_unlock

Hello, 


I'm on version
celery==3.0.13
django-celery==3.0.11

Is it possible to configure the chord task such that when unlock_chord raises MaxRetriesExceeded it can be handled?

Ideally:

callback = final_task.subtask()
taskset = TaskSet(small_thing_to_do.subtask((x))
                      for x in some_list)
 return chord(taskset)(callback, interval=1, max_retries=1, exc=handle_error.s())

or

 return chord(taskset)(callback, interval=1, max_retries=1, link_error=handle_error.s())

Any advice much appreciated.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Matt | 17 Apr 2013 23:39
Gravatar

Integration with nd_service_registry for broker HOST/PORT detection..?

We're looking to try to integrate our Celery clients with our home-grown service registry system (described at https://engblog.nextdoor.com, code available at https://github.com/Nextdoor/ndserviceregistry).


On the one hand, I could go the iron-mq route and create a custom transport that leverages nd_service_registry as well as a specific AMQP client to accomplish this. However thats very limited and I'd have to create one per backend broker I wanted to support.

I was thinking that the right way to do this might be some shim in Celery somewhere that allows me to effectively tell celery:

  1st. Go to nd_service_registry to get a list of broker HOST and PORT combinations
  2nd. Get the BROKER settings, and fill in the HOST+PORT, create the transport, and start up..
  3rd. Register a 'watch' with the service_registry code and in the event that the service list changes AND that the existing HOST+PORT being used is no longer valid, destroy the transport object and reconfigure/recreate new connections.

With a little help inside Celery, I feel like this could work with almost any of the brokers (redis, pika, pyamqp, librabbitmq, etc) pretty well. It also could provide failover support in the event that the broker goes down, the code could pick a new broker HOST+PORT from the list if reconnection to the original HOST+PORT breaks.

Thoughts?


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Matt | 17 Apr 2013 19:51
Gravatar

Temporary message 'data' store in Redis or ???...

We have some celery tasks that pass around large chunks of data right now (300-500k). We know that this is both inefficient and explicitly frowned upon in both Celery docs as well as the RabbitMQ performance benchmarks.


I have tried in the past to cobble together a compression/decompression module that leveraged S3 to dump large arguments for storage, but this is slow and based on our task load its effectively impossible. I was thinking though that Redis or Memcache might work.

Before I go down the road of trying to do this myself, has anyone else thought about storing large message bodies in Redis/Memcache/??? using the Kombu compress/decompress method registry?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Roger Barnes | 16 Apr 2013 07:31
Picon
Favicon
Gravatar

Checking status of all tasks in a flow

Hi all,


I have a canvas flow made up of groups and chains (and therefore chords), and I'd like to be able to check the status of all tasks in the flow once it has been kicked off with apply_async.

I've had a look through the group posts and see some similar questions about traversing the parents/dependencies/children in results across the different canvas types. In particular this may be relevant: https://groups.google.com/d/msg/celery-users/xSdxI-Z08Cw/g6NDVuOr-IoJ

I've also tried manually inspecting results objects for clues as to how to proceed but I'm a bit stuck.

My question, in a nutshell: how can I see the status of all tasks in a flow made up of groups/chains/chords?

I've put a simple example of what I'm hoping to achieve (assuming I am on the right track so far) here: http://pastebin.com/sgyXfXpX

Thanks,
- Roger

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Hongxun Zhang | 17 Apr 2013 05:05
Favicon

Is it possible to get the running time of task chain?

I have a task chain like this:
chain(task1, task2, group(t1,t2,..,tn), task3),
chain(task1, task2, group(t1,t2,..,tn), task3)
...
chain(task1, task2, group(t1,t2,..,tn), task3)

I put the task chain to the queue one by one, can I get the total runtime for the single chain?
for now I just calc the runtime by using task3-task1, but this is not so straightforward when there is a lot of tasks in the queue, since it's also include the queued time.


--
BR,
Hongxun

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Gmane