Andres Riancho | 18 Jun 2013 21:54
Picon
Gravatar

Running a worker in unittest

List,

    I have the following setup:

* Django-Celery project A registers task "foo"
* Project B: Uses Celery's send_task to call "foo"
* Project A and project B have the same configuration: SQS, msgpack
for serialization, gzip, etc.
* Each project lives on a different github repository

    I've unit-tested calls to "foo" in project A, without using Celery
at all, just "foo(1,2,3)" and assert the result. I know that it works.

    I've unit-tested that send_task in project B sends the right parameters.

    What I'm not testing, and would like your advice is the
integration between the two projects. I would like to have a unittest
that would:

* Start a worker in the context of project A
* Send a task using the code of project B
* Assert that the worker started in the first step gets the task, with
the parameters I sent in the second step, and that the "foo" function
returned the expected result.

    It seems to be possible to hack this by using python's subprocess
and parsing the output of the worker, but that's ugly. What's the
recommended approach to unit-testing in cases like this? Any code
snippet you could share? Thanks!

(Continue reading)

Rafael Novello | 18 Jun 2013 18:22
Picon
Gravatar

Periodic Tasks Problem

Hi people! I would like to ask for help from you.

I have a project in Django which uses Celery to, among other things, perform some tasks from time to time, periodic tasks. For this I am using the decorator periodic_task like this:

<at> periodic_task (name='task_name' run_every=crontab(hour="*/2"))

In the past this same task was performed every minute (using crontab minute = *), and after a change in the system the schedule was changed to what is above, every two hours.

My problem is that even after the change in the code, the task continues to run every minute in the production environment. And, what is worse, sometimes it gets a whole hour without being executed, for example - the task is running every minute until 2:59 am and only run again 4 am.

I already restarted the Celery and RabbitMQ and yet this behavior does not change.

If you can give me any hint will be of great help!

PS: The whole environment is on the same server, Celery, RabbitMQ and the system itself. The Celery and django-celery are the latest versions 3.0.19 and 3.0.17 respectively. The RabbitMQ this in version 2.7.1

Thanks!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Tony Barbieri | 18 Jun 2013 06:03
Picon
Gravatar

Manipulate Args, Kwargs or Add Custom Data

Hello!

I am trying to intercept all calls before they leave the client and head off to the broker.  I need to send extra metadata to be used by the task_prerun_signal to setup the python environment.

I'm assuming the task_sent signal is called after the task has been sent, so I would be unable to manipulate the args or kwargs there.  I'm thinking it would be some sort of a hook like task_present.

Thanks!

--
-tony

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Lewis Franklin | 17 Jun 2013 20:42
Picon
Gravatar

Celery Beat and ZooKeeper

I am a developer that uses both Celery and Kazoo for distributed computing. For a while now I've been bothered by the lack of a distributed job scheduler for Celery since Celery beat can only run on one server.

To that end I have attempted to write a class that can be used by Celery to allow the Celery beat program to run on multiple servers and they work using a shared lock. While my code seems to work for me, I am a sole system developer at my company and do not have any fellow Pythonistas, much less and Python Celery users, to help me think of all the places that I may have missed and edge cases I didn't think through.

I would appreciate it if any fellow Celery users would look over my gist and help me to make this a solid alternative for the job scheduler built into Python.

The code can be found at: https://gist.github.com/brolewis/5799103

Lewis

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Harel Malka | 17 Jun 2013 11:47
Favicon

Could librabbitmq be a potential memory leaker?

Hi 

In the last while we noticed rabbitmq crashing due to lack of memory on the server. It looks like the memory usage is fine for a while and then starts to climb until all ram is consumed causing rabbit to fail. I thought it might be a rabbitmq issue but we've just had scenario such as this now and I've  noticed one of the tasks using up 100% of CPU. I killed it and the memory usage went back to normal. 
Looking at the logs of the task we see failures coming from librabbitmq trying to reconnect and failing.  Is it possible that each such failure contributes to ram not being released until there's nothing left for poor ol' rabbit? 

[2013-06-16 23:02:28,472: CRITICAL/MainProcess] Couldn't ack 209L, reason:ConnectionError('Operation on closed connection',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/kombu/transport/base.py", line 100, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.6/dist-packages/kombu/transport/base.py", line 95, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.6/dist-packages/librabbitmq/__init__.py", line 86, in basic_ack
    delivery_tag, multiple)
ConnectionError: Operation on closed connection
[2013-06-16 23:02:28,472: CRITICAL/MainProcess] Couldn't ack 210L, reason:ConnectionError('Operation on closed connection',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/kombu/transport/base.py", line 100, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.6/dist-packages/kombu/transport/base.py", line 95, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.6/dist-packages/librabbitmq/__init__.py", line 86, in basic_ack
    delivery_tag, multiple)
ConnectionError: Operation on closed connection
[2013-06-17 00:03:04,767: ERROR/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/consumer.py", line 395, in start
    self.consume_messages()
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/consumer.py", line 486, in consume_messages
    handlermap[fileno](fileno, event)
  File "/usr/local/lib/python2.6/dist-packages/kombu/connection.py", line 291, in drain_nowait
    self.drain_events(timeout=0)
  File "/usr/local/lib/python2.6/dist-packages/librabbitmq/__init__.py", line 198, in drain_events
    self._basic_recv(timeout)
ChannelError: Bad frame read
 

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Dreampuf | 16 Jun 2013 18:49
Picon
Gravatar

How to execute celery task in testcase ?

I use the memory as backend storage in the testcase. 


class CELERY_CONFIG(object):
    BROKER_URL = "memory://"
    CELERY_CACHE_BACKEND = "memory"
    CELERY_RESULT_BACKEND = "cache"

But I don't want to start another celery worker to process the testcase. It can't share the backend storage neither.
How to test the celery task in the testcase ?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Tony Barbieri | 13 Jun 2013 22:18
Picon
Gravatar

Calling a Task within executing Task

Hello!

Sorry if this has been brought up before but with the latest celery (3.1.0rc3) I can't seem to call a Task within another Task.  I perform the call and do get the "Got task from broker" for the second Task but it just hangs and never completes.  Rolling back to 3.0.18 seems to solve it but I am now experiencing the error, Worker exited prematurely: exitcode 155.  That may be due to kombu/billiard/amqp incompatibilities so I will double check that.


I was using 3.0.19 but it was mentioned there are some issues with using CELERYD_MAX_TASKS_PER_CHILD = 1.

Thanks!

--
-tony

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Jared Biel | 13 Jun 2013 18:41
Favicon

Chain of subtasks without registering tasks issue

Hello,

I'm using celery 3.0.19 and testing the functionality "Subtasks can now be used with unregistered tasks" introduced in 3.0.13.

Python 2.7.3

>>> import celery
>>> task_1 = celery.subtask('tasks.add', args=(2,2))
>>> task_2 = celery.subtask('tasks.mul', args='8')
>>> result = celery.chain(task_1, task_2).apply_async()
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/dist-packages/celery/result.py", line 108, in get
    interval=interval)
  File "/usr/lib/python2.7/dist-packages/celery/backends/base.py", line 185, in wait_for
    raise result
NotImplementedError: Tasks must define the run method.

Full worker output: http://pastie.org/pastes/8039681/text
task.py: http://pastie.org/pastes/8037598/text
celeryconfig routes excerpt: http://pastie.org/pastes/8039650/text

So, it looks like for some reason celery.chain isn't being handled properly. If "import tasks" is run before submitting the chain everything works properly, but we're trying to get away from that. If not, CELERY_ROUTES isn't obeyed (the chain message is sent to the "celery" queue instead of the "tasks" queue) and the error occurs on consumption. Is all of this expected behavior? Am I trying to do something out-of-bounds?

Any help at all is much appreciated, thanks!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Brian Kennedy | 13 Jun 2013 03:27
Picon

DatabaseError whenever a revoked task gets read from RabbitMQ by celery

Celery is 100% reliably dying with a DatabaseError after getting a task that has been revoked from the queue.  I've got a reproducible demo going on my VM, and I can't figure out what's happening for the life of me.  At log level debug2, postgres seems to have nothing to say about this DBerror in celery.  This is celery 3.0.19 running with django-celery and saving to a postgres 8.4 db on ubuntu 10.04.  Has anyone ever seen this kind of error?  Thousands of tasks process normally, but revoked tasks get read from the queue and celery barfs and dies, and what's weirder is that this seems new because we've been revoking tasks and setting expiration times on things for a long time.

[2013-06-12 18:02:29,558: WARNING/MainProcess] Skipping revoked task: iondb.rundb.tasks.echo[2dbfba7b-7e60-4ff3-9b05-11dd1df80450]
[2013-06-12 18:02:29,566: ERROR/MainProcess] Unrecoverable error: DatabaseError('server closed the connection unexpectedly\n\tThis probably means the server terminated abnormally\n\tbefore or while processing the request.\n',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/__init__.py", line 363, in start
    component.start()
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/consumer.py", line 395, in start
    self.consume_messages()
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/consumer.py", line 495, in consume_messages
    drain_nowait()
  File "/usr/local/lib/python2.6/dist-packages/kombu/connection.py", line 291, in drain_nowait
    self.drain_events(timeout=0)
  File "/usr/local/lib/python2.6/dist-packages/kombu/connection.py", line 280, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/kombu/transport/pyamqp.py", line 91, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/lib/python2.6/dist-packages/amqp/connection.py", line 288, in drain_events
    return amqp_method(channel, args, content)
  File "/usr/local/lib/python2.6/dist-packages/amqp/channel.py", line 1886, in _basic_deliver
    fun(msg)
  File "/usr/local/lib/python2.6/dist-packages/kombu/messaging.py", line 562, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/usr/local/lib/python2.6/dist-packages/kombu/messaging.py", line 531, in receive
    [callback(body, message) for callback in callbacks]
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/consumer.py", line 438, in on_task_received
    strategies[name](message, body, message.ack_log_error)
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/strategy.py", line 25, in task_message_handler
    delivery_info=message.delivery_info))
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/consumer.py", line 511, in on_task
    if task.revoked():
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/job.py", line 300, in revoked
    'expired' if expired else 'revoked', False, None, expired,
  File "/usr/local/lib/python2.6/dist-packages/celery/worker/job.py", line 284, in _announce_revoked
    self.task.backend.mark_as_revoked(self.id, reason)
  File "/usr/local/lib/python2.6/dist-packages/celery/backends/base.py", line 139, in mark_as_revoked
    status=states.REVOKED, traceback=None)
  File "/usr/local/lib/python2.6/dist-packages/celery/backends/base.py", line 282, in store_result
    self._store_result(task_id, result, status, traceback, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/djcelery/backends/database.py", line 28, in _store_result
    traceback=traceback, children=self.current_task_children(),
  File "/usr/local/lib/python2.6/dist-packages/djcelery/managers.py", line 41, in _inner
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/djcelery/managers.py", line 183, in store_result
    'meta': {'children': children}})
  File "/usr/local/lib/python2.6/dist-packages/djcelery/managers.py", line 83, in update_or_create
    return self.get_query_set().update_or_create(**kwargs)
  File "/usr/local/lib/python2.6/dist-packages/djcelery/managers.py", line 67, in update_or_create
    obj, created = self.get_or_create(**kwargs)
  File "/usr/lib/pymodules/python2.6/django/db/models/query.py", line 470, in get_or_create
    return self.get(**lookup), False
  File "/usr/lib/pymodules/python2.6/django/db/models/query.py", line 382, in get
    num = len(clone)
  File "/usr/lib/pymodules/python2.6/django/db/models/query.py", line 90, in __len__
    self._result_cache = list(self.iterator())
  File "/usr/lib/pymodules/python2.6/django/db/models/query.py", line 301, in iterator
    for row in compiler.results_iter():
  File "/usr/lib/pymodules/python2.6/django/db/models/sql/compiler.py", line 775, in results_iter
    for rows in self.execute_sql(MULTI):
  File "/usr/lib/pymodules/python2.6/django/db/models/sql/compiler.py", line 840, in execute_sql
    cursor.execute(sql, params)
  File "/usr/lib/pymodules/python2.6/django/db/backends/util.py", line 41, in execute
    return self.cursor.execute(sql, params)
  File "/usr/lib/pymodules/python2.6/django/db/backends/postgresql_psycopg2/base.py", line 58, in execute
    six.reraise(utils.DatabaseError, utils.DatabaseError(*tuple(e.args)), sys.exc_info()[2])
  File "/usr/lib/pymodules/python2.6/django/db/backends/postgresql_psycopg2/base.py", line 54, in execute
    return self.cursor.execute(query, args)
DatabaseError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.

The task:
<at> task
def echo(message, wait=0):
    time.sleep(wait)
    logger.info("Logged: " + message)
    print(message)

The test code:
foo = [echo.delay("poop", 10) for i in range(6)]
foo[-1].revoke()

django's settings.py
Using postgresql_psycopg2
CELERY_TASK_RESULT_EXPIRES = 1800
BROKER_HEARTBEAT=0
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

I can't find anything on this.  Does anyone have any insight here?

Thanks,

Brian

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Tony Barbieri | 13 Jun 2013 06:21
Picon
Gravatar

Check If Executing Inside Worker

Hello!

Is it possible to check if the python script is being executed inside the worker vs being invoked and should call apply_async?  I have a slightly different setup where apply_async is called in one instance, but once the worker picks up the job, I want to run the command directly.

Thanks!

--
-tony

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
daniel | 11 Jun 2013 22:01
Favicon
Gravatar

Implement worker in PHP

Hi,


I'm trying to understand if there is a reasonable way to implement the workers (aka consumers) in PHP rather than Python. The reason is simple: we already have working PHP code that we'd like to reuse and we also have PHP knowledge while we don't have much experience with Python. In our setup, we would have approximately 4 worker servers and about 10 publisher (webservers). Publishing or submitting tasks from PHP should be easy using the existing PHP wrapper for the http API, but I'm not clear about whether there is a good way to implement the worker tasks in PHP.

If I understand correctly, we have basically two options:
1) Write a python functions that makes a simple http request to localhost, submitting the parameters and waiting for the request to complete. Apache would run locally and host the PHP code.
2) Use the Webhooks, which are essentially the same as 1) with the only difference that celeryd is doing it on it's own

Are my assumptions correct?

What would be the performance implications of such a setup and would you generally recommend going that route or rather have us reimplement the tasks in Python?

Thanks, Daniel

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
Visit this group at http://groups.google.com/group/celery-users?hl=en-US.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Gmane