Btara Truhandarien | 18 Mar 07:18 2014
Picon

Rabbitmq server suddenly shutdowns between running long tasks

I'm not sure if this is the right place to ask, but I couldn't find anything on this so I figured I'd ask here since I am using celery

I have several tasks that interacts with MySQL, sometimes these tasks may take some time. In my log file I notice that the rabbitmq server suddenly shutdowns causing a Errno 111 Connection refused. I was wondering what are possible common reasons as to why the rabbitmq server would suddenly shutdown on its own

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mihnea Giurgea | 17 Mar 13:42 2014
Picon

Celery critical during failed task

Hello,

I am running Celery with result & task serializer set to yaml, and the result store on.

I am trying to run a task that will just perform raise Exception('No such user!').

When this task is run, an additional error is raised in the MainProcess (see below), caused by Celery not being able to serialize the task result, which is this case is just an Exception.

I have pasted the stack trace below:

Any ideas what I can do to avoid this?

[2014-03-17 14:32:04,022: CRITICAL/MainProcess] Task pynsights.tasks.add.add[7b058014-c8f8-4e17-85c5-2575d0d4948f] INTERNAL ERROR: EncodeError(RepresenterError('cannot represent an object: No such user!',),)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/trace.py", line 251, in trace_task
    I, R, state, retval = on_error(task_request, exc, uuid)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/trace.py", line 201, in on_error
    R = I.handle_error_state(task, eager=eager)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/trace.py", line 85, in handle_error_state
    }[self.state](task, store_errors=store_errors)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/trace.py", line 118, in handle_failure
    req.id, exc, einfo.traceback, request=req,
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/backends/base.py", line 112, in mark_as_failure
    traceback=traceback, request=request)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/backends/base.py", line 223, in store_result
    request=request, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/backends/base.py", line 447, in _store_result
    self.set(self.get_key_for_task(task_id), self.encode(meta))
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/backends/base.py", line 155, in encode
    _, _, payload = dumps(data, serializer=self.serializer)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/serialization.py", line 165, in dumps
    payload = encoder(data)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/serialization.py", line 59, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/serialization.py", line 55, in _reraise_errors
    yield
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/serialization.py", line 165, in dumps
    payload = encoder(data)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/__init__.py", line 218, in safe_dump
    return dump_all([data], stream, Dumper=SafeDumper, **kwds)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/__init__.py", line 190, in dump_all
    dumper.represent(data)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/representer.py", line 28, in represent
    node = self.represent_data(data)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/representer.py", line 57, in represent_data
    node = self.yaml_representers[data_types[0]](self, data)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/representer.py", line 223, in represent_dict
    return self.represent_mapping(u'tag:yaml.org,2002:map', data)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/representer.py", line 123, in represent_mapping
    node_value = self.represent_data(item_value)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/representer.py", line 67, in represent_data
    node = self.yaml_representers[None](self, data)
  File "/Users/skip/Library/Python/2.7/lib/python/site-packages/yaml/representer.py", line 247, in represent_undefined
    raise RepresenterError("cannot represent an object: %s" % data)
EncodeError: cannot represent an object: No such user!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Guille Rmo | 16 Mar 17:25 2014
Picon

Custom backend

Hi. I modified mongo db backend to fit my needs and I want to use it with celery. 

I use this configuration: 

CELERY_RESULT_BACKEND = '.ReportsBackend.ReportsBackend://localhost:30000' 

but when I try tu send a task from ipython I get this error:
 
In [1]: from tasks import *
In [2]: suma.delay(1,2).get()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-f43bd3f694f1> in <module>()
----> 1 suma.delay(1,2).get()
/usr/local/lib/python2.7/dist-packages/celery/app/task.pyc in delay(self, *args, **kwargs)
    451 
    452         """
--> 453         return self.apply_async(args, kwargs)
    454 
    455     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
/usr/local/lib/python2.7/dist-packages/celery/app/task.pyc in apply_async(self, args, kwargs, task_id, producer, link, link_error, **options)
    553             self.name, args, kwargs, task_id=task_id, producer=producer,
    554             link=link, link_error=link_error, result_cls=self.AsyncResult,
--> 555             **dict(self._get_exec_options(), **options)
    556         )
    557 
/usr/local/lib/python2.7/dist-packages/celery/app/base.pyc in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, route
r, result_cls, expires, publisher, link, link_error, add_to_parent, reply_to, **options)
    317             producer = self.amqp.TaskProducer(connection)
    318         with self.producer_or_acquire(producer) as P:
--> 319             self.backend.on_task_call(P, task_id)
    320             task_id = P.publish_task(
    321                 name, args, kwargs, countdown=countdown, eta=eta,

/usr/local/lib/python2.7/dist-packages/kombu/utils/__init__.pyc in __get__(self, obj, type)
    307             return obj.__dict__[self.__name__]
    308         except KeyError:
--> 309             value = obj.__dict__[self.__name__] = self.__get(obj)
    310             return value
    311 

/usr/local/lib/python2.7/dist-packages/celery/app/base.pyc in backend(self)
    604     <at> cached_property
    605     def backend(self):
--> 606         return self._get_backend()
    607 
    608     <at> cached_property

/usr/local/lib/python2.7/dist-packages/celery/app/base.pyc in _get_backend(self)
    419         backend, url = get_backend_by_url(
    420             self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
--> 421             self.loader)
    422         return backend(app=self, url=url)
    423 

/usr/local/lib/python2.7/dist-packages/celery/backends/__init__.pyc in get_backend_by_url(backend, loader)
     63         else:
     64             backend, _, _, _, _, _, _ = _parse_url(url)
---> 65     return get_backend_cls(backend, loader), url

/usr/local/lib/python2.7/dist-packages/celery/utils/functional.pyc in _M(*args, **kwargs)
    135                     value = cache[key]
    136             except KeyError:
--> 137                 value = fun(*args, **kwargs)
    138                 _M.misses += 1
    139                 with mutex:

/usr/local/lib/python2.7/dist-packages/celery/backends/__init__.pyc in get_backend_cls(backend, loader)
     49     aliases = dict(BACKEND_ALIASES, **loader.override_backends)
     50     try:
---> 51         return symbol_by_name(backend, aliases)
     52     except ValueError as exc:
     53         reraise(ValueError, ValueError(UNKNOWN_BACKEND.format(

/usr/local/lib/python2.7/dist-packages/kombu/utils/__init__.pyc in symbol_by_name(name, aliases, imp, package, sep, default, **kwargs)
     90     try:
     91         try:
---> 92             module = imp(module_name, package=package, **kwargs)
     93         except ValueError as exc:
     94             reraise(ValueError,

/usr/lib/python2.7/importlib/__init__.pyc in import_module(name, package)
     28     if name.startswith('.'):
     29         if not package:
---> 30             raise TypeError("relative imports require the 'package' argument")
     31         level = 0
     32         for character in name:
TypeError: relative imports require the 'package' argument

The ReportsBackend.py file is in the same directory where I call the celery worker command. 

How can I use a local custom Backend ?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
mseise | 16 Mar 16:02 2014
Picon

celery and numpy arrays

Hello,
I'm just starting to use celery, I'm still not sure if it is the right package to use...

 In the end I want to create a application used for image analysis tasks (unfortunately in windows). For "standard" parallelization I use joblib, but when combining it with pycuda I get crashes due to parallel access to the graphics card. With celery I hope to create a background-CUDA process to run the CUDA code, send numpy arrays to/from it. Now I got the basic example app running and I wanted to use numpy arrays as  input/output. Numpy input works but the return is a list of decimal values:

 input

result = add.apply_async((np.random.rand(4,4), np.random.rand(4,4)))

return:

result.get()

[[Decimal('0.5418033164919455'), Decimal('0.6474170807560545'), Decimal('0.8228576224037183'), Decimal('0.7689368628225968')], [Decimal('0.928383251860168'), Decimal('0.12800784335660054'), Decimal('0.6836481714235647'), Decimal('0.7006965505887757')], [Decimal('0.3717491676748691'), Decimal('0.6011661666442496'), Decimal('0.953639310154298'), Decimal('0.9003555411468284')], [Decimal('0.29835952377763875'), Decimal('0.7351207841168913'), Decimal('0.5124473348985139'), Decimal('0.6493860155113658')], [Decimal('0.6475584123227366'), Decimal('0.9266851532881143'), Decimal('0.8930479501413336'), Decimal('0.9059947795611991')], [Decimal('0.40762556290226826'), Decimal('0.5300777804871589'), Decimal('0.03834699194659397'), Decimal('0.4277659415566504')], [Decimal('0.06832908159431139'), Decimal('0.5079649452751968'), Decimal('0.06415964476617231'), Decimal('0.39987715636279264')], [Decimal('0.08539715975951756'), Decimal('0.02054418563411664'), Decimal('0.052444415331636796'), Decimal('0.9840696349268455')]]

 

Questions: (1) Is Celery the right framework for heavy computations

(2) How can I return real numpy arrays?

 

Thanks,

Matthias

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mike | 15 Mar 15:59 2014
Picon

New to celery, looking for a little advice


Hi all, I'm new to celery.  I am looking to prototype a system using Celery and RabbitMQ.  Assuming I'm not way off... the questions should be straightforward at the end. Big assumption, right?  I have written similar systems with a MQ but without celery.

This system has relatively simple tasks spread across 2 data centers.  I'm going to try and keep it simple because I need to focus on the usage of celery, not explain every little nuance of the restrictions I am under (there are many).

First some background: 

In data center A:
  I will be scanning a directory for a file drop.
  I will be performing task T1.

Then in data center B:
  I will be performing task T2 once T1 is complete.

Then back in data center A: 
  I will be performing task T3  once T2 is complete. 

Tasks T1, T2, T3 are all trivial tasks and consume very little resources, but there will be millions of these executions per day.  They must occur, or must be able to continue the processes where we left off in case of a failure. 

This process need to scale out, and have HA, so I'll be running this on multiple servers. 

A little latency via the network between DC is OK. 

My thought so far based on reading the docs...

I need to create 2 worker queues.  DCA, DCB, for example. DCA workers would run on 2 nodes in Data Center A with a shared mount point for the directory I need to scan. DCB workers would run on 2 nodes in Data Center B.

I will bind T1, T3 to the DCA queue, and T2 to the DCB queue. 

My questions:

Is this the "right way" to use celery to approach this problem? 

I assume for RabbitMQ, I'll want to create a separate cluster (lets just say 2 nodes for now)?  

I assume these should be chained. chain( T1 | T2 | T3 )  - I'm not sure yet how errors are handled in this case.  If T2 has an error, would I have to fix the issue (NAS down, let's say) then execute a new chain( T2 | T3 ).  Or do I somehow tell celery to continue that task by task_id? 

Last question... The file drop detection... it is basically the initiator of this whole thing, and would need to execute everything async.  It will be running over and over and over again.  Is there some pattern for doing this is celery, or should it just be a cron job / daemon that scans the directory and sends off T1 tasks as it finds files.  (I still need to figure out exactly how I will not send out duplicate T1 tasks, but that's another story). 

Thanks!!


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
atomekk | 15 Mar 14:52 2014
Picon

Scheduled task, KeyError: 15

Hi,

My scheduled task started to fail recently with traceback:

File celery/worker/consumer.py line 486 in consume_messages: 
    handlermap[fileno](fileno, event)
    KeyError: 15

Setup:

kombu==2.5.16
billiard==2.7.3.34
celery==3.0.19

Any advice appreciated.

Aki Balogh | 14 Mar 03:05 2014
Picon

Create celery connections on initialization and have them persist over days/weeks

Hello,

I have a Flask application that handles incoming requests every few days. I use RabbitMQ/py-amqp as the broker and RabbitMQ/rpc as the result backend.

When I reboot the server, it severs all existing connections. Then, the first request that comes in suffers overhead as celery/kombu have to re-create connections and channels to be able to send messages.

I'm trying to add an initialization step, so when Flask goes online, it creates a number of connections that then persist over time.


from celery.app import app_or_default, enable_trace, disable_trace
celery_app = app_or_default()

def initialization(*args, **kwargs):
    enable_trace()
    try:
        for i in range(0, multiprocessing.cpu_count()):
            celery_app.connection().channel() # create a new connection and channel
            # I've verified from RabbitMQ that this step completes successfully
    except:
        logger.error('Error when creating celery connection: %s', sys.exc_info())
    disable_trace()


then in my code:

with celery_app.connection() as conn:
     result = foo.apply_async(args=[2,2], connection=conn)

However, this seems to create a new connection in the code, and I'm not sure how to access the existing connections.


How can I initialize connections on initialization and have them persist over a long period of time (i.e. several days or weeks)?

Thanks,
Aki

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
xeon Mailinglist | 12 Mar 17:24 2014
Picon

Unrecoverable error: ValueError('too many values to unpack',)

I am launching celeryd with python-2.7, and I get the following error. I am running celery 3.1.9. How I fix this?

[2014-03-12 16:20:25,794: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Queues (intra), Pool, Autoscaler, StateDB, Autoreloader, Consumer}
[2014-03-12 16:20:25,797: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-03-12 16:20:25,797: DEBUG/MainProcess] | Consumer: Building graph...
[2014-03-12 16:20:25,800: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Heart, Agent, Gossip, event loop}
[2014-03-12 16:20:25,801: DEBUG/MainProcess] | Worker: Starting Hub
[2014-03-12 16:20:25,801: DEBUG/MainProcess] ^-- substep ok
[2014-03-12 16:20:25,801: DEBUG/MainProcess] | Worker: Starting Pool
[2014-03-12 16:20:25,804: DEBUG/MainProcess] ^-- substep ok
[2014-03-12 16:20:25,805: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-03-12 16:20:25,805: DEBUG/MainProcess] | Consumer: Starting Connection
[2014-03-12 16:20:25,806: ERROR/MainProcess] Unrecoverable error: ValueError('too many values to unpack',)
Traceback (most recent call last):
  File "/usr/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/usr/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/lib/python2.7/dist-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/usr/lib/python2.7/dist-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/usr/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/lib/python2.7/dist-packages/celery/worker/consumer.py", line 478, in start
    c.connection = c.connect()
  File "/usr/lib/python2.7/dist-packages/celery/worker/consumer.py", line 375, in connect
    callback=maybe_shutdown,
  File "/usr/lib/python2.7/dist-packages/kombu/connection.py", line 373, in ensure_connection
    interval_start, interval_step, interval_max, callback)
  File "/usr/lib/python2.7/dist-packages/kombu/utils/__init__.py", line 229, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/lib/python2.7/dist-packages/kombu/connection.py", line 241, in connect
    return self.connection
  File "/usr/lib/python2.7/dist-packages/kombu/connection.py", line 754, in connection
    self._connection = self._establish_connection()
  File "/usr/lib/python2.7/dist-packages/kombu/connection.py", line 713, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/lib/python2.7/dist-packages/kombu/transport/librabbitmq.py", line 119, in establish_connection
    conn = self.Connection(**opts)
  File "/usr/lib/python2.7/dist-packages/librabbitmq/__init__.py", line 167, in __init__
    host, port = host.split(':')
ValueError: too many values to unpack
[2014-03-12 16:20:25,807: DEBUG/MainProcess] | Worker: Closing Hub...
[2014-03-12 16:20:25,807: DEBUG/MainProcess] | Worker: Closing Pool...
[2014-03-12 16:20:25,808: DEBUG/MainProcess] | Worker: Closing Consumer...
[2014-03-12 16:20:25,808: DEBUG/MainProcess] | Worker: Stopping Consumer...
[2014-03-12 16:20:25,808: DEBUG/MainProcess] | Worker: Stopping Pool...
[2014-03-12 16:20:26,815: DEBUG/MainProcess] | Worker: Stopping Hub...
[2014-03-12 16:20:26,815: DEBUG/MainProcess] | Consumer: Shutdown Gossip...
[2014-03-12 16:20:26,815: DEBUG/MainProcess] | Consumer: Shutdown Heart...
[2014-03-12 16:20:26,816: DEBUG/MainProcess] | Consumer: Shutdown Control...
[2014-03-12 16:20:26,816: DEBUG/MainProcess] | Consumer: Shutdown Tasks...
[2014-03-12 16:20:26,816: DEBUG/MainProcess] | Consumer: Shutdown Events...
[2014-03-12 16:20:26,816: DEBUG/MainProcess] | Consumer: Shutdown Connection...
[2014-03-12 16:20:26,816: DEBUG/MainProcess] removing tasks from inqueue until task handler finished

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Jonathan Stewmon | 11 Mar 23:24 2014

Error using CELERY_ROUTES in Celery 3.1.9

Hi,

I'm trying to use CELERY_ROUTES with RabbitMQ, but I get an exchange.declare error (ConnectionError: exchange.declare: received bad AMQP data) when calling a task more than once.

If code below is in module celery_test.py:

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger

from kombu import Queue, Exchange


class DefaultRouter(object):
    routes = {
        'default': {
            'routing_key': 'task.default',
            'queue': 'task.default'
        }
    }
    
    def route_for_task(self, task, args=None, kwargs=None):
        return self.routes.get('default')


class MyRouter(object):

    prefix = 'celery_test'
    routes = {
        'add': {
            'routing_key': 'task.add',
            'queue': 'task.add'
        },
        'log': {
            'routing_key': 'task.log',
            'queue': 'task.log'
        }
    }

    def route_for_task(self, task, args=None, kwargs=None):
        assert isinstance(task, (str, unicode))
        if not task.startswith(self.prefix):
            return None
        logger.info(task)
        print task
        return self.routes.get(task[len(self.prefix):], None)

celery_exchange = Exchange('celery_test', type='topic')

celery_queues = (
    Queue('default', celery_exchange, routing_key='default'),
    Queue('task.add', celery_exchange, routing_key='task.add'),
    Queue('task.log', celery_exchange, routing_key='task.log')
)

app = Celery()
app.conf.update(
    # enable debugging w env variable:
    CELERYD_MAX_TASKS_PER_CHILD=5000,
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    # disable pickle to avoid warning:
    CELERY_ACCEPT_CONTENT=['json', 'msgpack', 'yaml'],
    CELERY_TASK_RESULT_EXPIRES=3600,
    CELERY_IGNORE_RESULT=True,
    CELERY_DISABLE_RATE_LIMITS=True,
    CELERY_ENABLE_UTC=True,
    CELERY_ANNOTATIONS={
        'celery.group': {'ignore_result': True}
    },
    # RabbitMQ config:
    CELERY_QUEUES=celery_queues,
    CELERY_ROUTES=(MyRouter(), DefaultRouter()),
)

logger = get_task_logger(__name__)


<at> app.task
def add(x, y):
    log.delay(x + y)


<at> app.task
def log(val):
    logger.info(val)


if __name__ == '__main__':
    app.worker_main()

Then, I will get the following error (output from ipython) upon trying to call celery_test.add.delay more than once:
n [1]: import celery_test

In [2]: celery_test.add.delay(3, 4)
celery_test.add
Out[2]: <AsyncResult: a0231b28-eaeb-4efc-8387-f275775c7815>

In [3]: celery_test.add.delay(3, 4)
celery_test.add
---------------------------------------------------------------------------
ConnectionError                           Traceback (most recent call last)
<ipython-input-3-dbb182f4c4d0> in <module>()
----> 1 celery_test.add.delay(3, 4)

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/task.pyc in delay(self, *args, **kwargs)
    451 
    452         """
--> 453         return self.apply_async(args, kwargs)
    454 
    455     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/task.pyc in apply_async(self, args, kwargs, task_id, producer, link, link_error, **options)
    553             self.name, args, kwargs, task_id=task_id, producer=producer,
    554             link=link, link_error=link_error, result_cls=self.AsyncResult,
--> 555             **dict(self._get_exec_options(), **options)
    556         )
    557 

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/base.pyc in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, reply_to, **options)
    322                 task_id=task_id, expires=expires,
    323                 callbacks=maybe_list(link), errbacks=maybe_list(link_error),
--> 324                 reply_to=reply_to or self.oid, **options
    325             )
    326         result = (result_cls or self.AsyncResult)(task_id)

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/celery/app/amqp.pyc in publish_task(self, task_name, task_args, task_kwargs, countdown, eta, task_id, group_id, taskset_id, expires, exchange, exchange_type, event_dispatcher, retry, retry_policy, queue, now, retries, chord, callbacks, errbacks, routing_key, serializer, delivery_mode, compression, reply_to, time_limit, soft_time_limit, declare, headers, send_before_publish, before_receivers, send_after_publish, after_receivers, send_task_sent, sent_receivers, **kwargs)
    298             correlation_id=task_id,
    299             delivery_mode=delivery_mode, declare=declare,
--> 300             **kwargs
    301         )
    302 

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/messaging.pyc in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, **properties)
    164         return publish(body, priority, content_type,
    165                        content_encoding, headers, properties,
--> 166                        routing_key, mandatory, immediate, exchange, declare)
    167 
    168     def _publish(self, body, priority, content_type, content_encoding,

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/connection.pyc in _ensured(*args, **kwargs)
    438             for retries in count(0):  # for infinity
    439                 try:
--> 440                     return fun(*args, **kwargs)
    441                 except conn_errors as exc:
    442                     if got_connection and not has_modern_errors:

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/messaging.pyc in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare)
    176         if declare:
    177             maybe_declare = self.maybe_declare
--> 178             [maybe_declare(entity) for entity in declare]
    179         return channel.basic_publish(
    180             message,

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/messaging.pyc in maybe_declare(self, entity, retry, **retry_policy)
    107         if entity:
    108             from .common import maybe_declare
--> 109             return maybe_declare(entity, self.channel, retry, **retry_policy)
    110 
    111     def publish(self, body, routing_key=None, delivery_mode=None,

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/common.pyc in maybe_declare(entity, channel, retry, **retry_policy)
     97     if retry:
     98         return _imaybe_declare(entity, **retry_policy)
---> 99     return _maybe_declare(entity)
    100 
    101 

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/common.pyc in _maybe_declare(entity)
    108         ident = hash(entity)
    109         if ident not in declared:
--> 110             entity.declare()
    111             declared.add(ident)
    112             return True

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/entity.pyc in declare(self, nowait)
    502         # - declare main binding.
    503         if self.exchange:
--> 504             self.exchange.declare(nowait)
    505         self.queue_declare(nowait, passive=False)
    506 

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/kombu/entity.pyc in declare(self, nowait, passive)
    164                 exchange=self.name, type=self.type, durable=self.durable,
    165                 auto_delete=self.auto_delete, arguments=self.arguments,
--> 166                 nowait=nowait, passive=passive,
    167             )
    168 

/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/librabbitmq/__init__.pyc in exchange_declare(self, exchange, type, passive, durable, auto_delete, arguments, nowait)
    123         """
    124         return self.connection._exchange_declare(self.channel_id,
--> 125                 exchange, type, passive, durable, auto_delete, arguments or {})
    126 
    127     def exchange_delete(self, exchange='', if_unused=False):

ConnectionError: exchange.declare: received bad AMQP data

If I comment the CELERY_ROUTES config line, it works as expected.

Am I doing something wrong?


This e-mail, including attachments, contains confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. The reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Rich Rauenzahn | 11 Mar 02:59 2014
Picon

always eager, retry, exception propagation, and unit testing


I'm unittesting a workflow of my tasks that includes some .retry() calls with countdowns with celery_always_eager=True.  What is the expected behavior there?  It looks to me like the retries are always ignored?

I've also come across another odd behavior.  I have a task that sort of does:

<at> task
def foo():
   try:
      ...dosomething...
   except MyException:
      ....dosomethingelse...

   foo.retry(...)


If MyException was thrown above, caught, and ignored, retry() still looks at the previous exception history and reraises it.  (I also have celery_eager_always_propagate_exceptions=True)

Is this also expected behavior?

My tasks already inherit from celery tasks, so I'm thinking about overriding retry() to work around some of these issues during unittesting (like maybe calling sys.exc_clear() before calling the parent class's retry(), and maybe implementing a retry when unittesting is on...)

But I'm wondering why celery doesn't already deal with sys.exc_clear() itself? 

Rich (Don't have the celery version I'm using handy -- can post it tomorrow.  It's at least ~1 year old.)

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Miki Tebeka | 9 Mar 22:18 2014
Picon

Problem using non-default queue

Hi,

I'm trying to have a worker with a dedicated queue. I'm following the routing guide, however the "feed" task is not picked up by the worker.
You can see the example project here, see the README on how to run.

Any idea why the general task work but the feed one doesn't?

Thanks,
--
Miki

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane