Nick Stanoszek | 20 Sep 04:31 2014
Picon

Celery and Rabbitmq

Hi everyone,

I currently have a rabbitmq environment (server) and celery workers on my client side.  They way the total environment is set up is as follows:

-There are multiple locations.  Each location has it's own worker, unique location ID, and completely different data than the others. (i.e. if I request data from loc 1, it has to come from one and CANNOT come from loc 2.
-There is one server that runs rabbitmq and the broker/exchange

Currently what I am experiencing is this:

1. Each location has it's own queue (or at least it appears that way to me)
2. Each location connects to the rabbitmq server (i get a sync between clients)
3. When I send a HTTP request to loc1, it works fine.  The worker gets the request and completes the task and returns the result to the "requestor"---Same is true for loc2
4. When I send a request for loc1 and immediately after send a request for loc2--that is where things get funky.  Loc1 gets the task and loc2 doesn't get it's task until loc1 finishes and returns a result.
     -->if loc1 dies (internet, or something) and the broker/exchange doesn't get a result, then it never sends loc2 it's task.  It takes me resetting the server (rabbitmq) before everything works again.

What I am hoping to see is that loc1 gets it's task and so does loc2, regardless of how far along the task is in processing on either location.  I am seeing that the exchange isn't distributing the tasks until the previous task is complete (regardless of location).

I have also started 2 workers on the same location (i.e. loc1) and even when I submit two requests at the same time, only one task is completed at a time.

I am connecting each location to the same user/vhost on rabbitmq, which from my understanding, shouldn't make a difference.


Any thoughts?

Thanks!

Here is the configuration for celery:
from pcs_client.config import location_info
_DEV_SERVER = "54.XXX.XXX.XXX"

_AMQP_CONNECTION = {
'user': 'user',
'password': 'password',
'ip': _DEV_SERVER,
'port': '2020',
'vhost': 'vhost',
}
BROKER_URL = 'amqp://{user}:{password} <at> {ip}:{port}/{vhost}'.format(**_AMQP_CONNECTION)
CELERY_RESULT_BACKEND = BROKER_URL
CELERY_ROUTES = ({'payr_route': {
'queue': 'name.v1.location.{}'.format(location_info.LOCATION_ID),
'routing_key': 'name.v1.#',
}
},)



here's more cloud information
AMQP_PREFIX = 'name.v1'
PCS_CELERY_BROKER = 'amqp://user:password-Emc0PJsdn/Y@public.gmane.org/vhost'
PCS_CELERY_BACKEND = 'amqp://user:password-Emc0PJsdn/Y@public.gmane.org/vhost'
PCS_INCOMING_ROUTE_NAME = 'name.v1.location.{}'.format(location_info.LOCATION_ID)

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
cjh39 | 19 Sep 08:58 2014
Picon
Picon

How to monitor celery workers


Hello, we need to have a simple alerting system which can tell us if there are too many requests vs the number of celery works, the load of the cpu. This will allow us to create more workers.

We would like to receive these celery alerts by email.

Thanks

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Robert Minsk | 19 Sep 07:39 2014
Picon

Not understanding how to use a broadcast queue

I can not seem to get the broadcast queue to work correctly.  I have a really simple program I am testing with.  I am using rabbitmq and celery-3.1.14 on Fedora 20.

lfw/celery.py:
from __future__ import absolute_import
from celery import Celery

app = Celery('lfw', broker='amqp://', backend='amqp://', include=['lfw.tasks'])
app.config_from_object('lfw.celeryconfig')

if __name__ == '__main__':
    app.start()

lfw/celeryconfig.py:
from kombu.common import Broadcast

CELERY_QUEUES = (Broadcast('broadcast_queue'),)
CELERY_ROUTES = {'lfw.tasks.broadcast': {'queue': 'broadcast_queue'}}
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

lfw/tasks.py:
from __future__ import absolute_import
from lfw.celery import app

<at> app.task(ignore_result=True)
def broadcast(val):
    print val

Starting the celery worker:
> celery -A lfw worker -l info
 
 -------------- celery <at> lenovobert v3.1.14 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.15.10-201.fc20.x86_64-x86_64-with-fedora-20-Heisenbug
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         lfw:0x10c65d0
- ** ---------- .> transport:   amqp://guest:** <at> localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> bcast.5950807b-b26d-4230-b54e-164bf9e3a5f5 exchange=broadcast_queue(fanout) key=
                

[tasks]
  . lfw.tasks.broadcast

[2014-09-18 22:29:06,588: INFO/MainProcess] Connected to amqp://guest:** <at> 127.0.0.1:5672//
[2014-09-18 22:29:06,601: INFO/MainProcess] mingle: searching for neighbors
[2014-09-18 22:29:07,612: INFO/MainProcess] mingle: all alone
[2014-09-18 22:29:07,758: WARNING/MainProcess] celery <at> lenovobert ready.

Now when I try to run the broadcast task:
>>> import lfw.tasks
>>> lfw.tasks.broadcast.delay('hello')
<AsyncResult: e52a80bd-22c0-42dd-91dc-8d7462db5079>

The log shows:
[2014-09-18 22:31:29,209: INFO/MainProcess] Received task: lfw.tasks.broadcast[e52a80bd-22c0-42dd-91dc-8d7462db5079]
[2014-09-18 22:31:29,210: WARNING/Worker-1] hello
[2014-09-18 22:31:29,211: INFO/MainProcess] Task lfw.tasks.broadcast[e52a80bd-22c0-42dd-91dc-8d7462db5079] succeeded in 0.00108586007264s: None

I would expect to see a WARNING message from all 4 workers not just one.  When I run lfw.tasks.broadcast.delay('hello') a second time it runs on a different worker but I still only get one WARNING message.  How do I run a task on all workers?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Brett Dixon | 18 Sep 18:06 2014
Picon

how to kill unack tasks

In my situation, we have a bunch of worker machines that consume tasks, everything is great.  However, if a machine gets a task, then decides to die in some unkindly fashion, the task is still out and unacked.  Is there a way in celery or rabbit to just put the task back in the queue?

Thanks

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Vitaliy Antonov | 18 Sep 11:56 2014
Picon

chord callback problem

Starting with version 3.1.13 started having problems with chord callbacks.
<at> shared_task
def task_a():
   
print 'task a'
   
return 'a'

<at> shared_task
def task_b():
   
print 'task b'
   
return 'b'

<at> shared_task
def callback(result):
   
print 'callback', result

chord([task_a.s(), task_b.s()], callback.s()).apply_async()

[2014-09-18 13:52:59,565: INFO/MainProcess] Received task: background.tasks.manage.task_a[d080d282-66e5-44dd-a9ae-142d4abc867e]
[2014-09-18 13:52:59,565: WARNING/Worker-1] task a
[2014-09-18 13:52:59,565: INFO/MainProcess] Received task: background.tasks.manage.task_b[af505f05-a8c9-472d-8cc1-26159350cb56]
[2014-09-18 13:52:59,565: WARNING/Worker-1] task b
[2014-09-18 13:52:59,575: INFO/MainProcess] Task background.tasks.manage.task_a[d080d282-66e5-44dd-a9ae-142d4abc867e] succeeded in 0.010999917984s: a
[2014-09-18 13:53:05,246: ERROR/Worker-1] Chord callback for '123a1089-3aa3-4872-ba33-924a3fe177d5' raised: error(error(10061, '\xcf\xee\xe4\xea\xeb\xfe\xf7\xe5\xed\xe8\xe5 \xed\xe5 \xf3\xf1\xf2\xe0\xed\xee\xe2\xeb\xe5\xed\xee,'),)
Traceback (most recent call last):
 
File "C:\Python27\lib\site-packages\celery\backends\redis.py", line 221, in _new_chord_return
    callback
.delay([unpack(tup, decode) for tup in resl])
 
File "C:\Python27\lib\site-packages\celery\canvas.py", line 168, in delay
   
return self.apply_async(partial_args, partial_kwargs)
 
File "C:\Python27\lib\site-packages\celery\canvas.py", line 242, in apply_async
   
return _apply(args, kwargs, **options)
 
File "C:\Python27\lib\site-packages\celery\app\task.py", line 555, in apply_async
   
**dict(self._get_exec_options(), **options)
 
File "C:\Python27\lib\site-packages\celery\app\base.py", line 353, in send_task
    reply_to
=reply_to or self.oid, **options
 
File "C:\Python27\lib\site-packages\celery\app\amqp.py", line 305, in publish_task
   
**kwargs
 
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 168, in publish
    routing_key
, mandatory, immediate, exchange, declare)
 
File "C:\Python27\lib\site-packages\kombu\connection.py", line 457, in _ensured
    interval_max
)
 
File "C:\Python27\lib\site-packages\kombu\connection.py", line 369, in ensure_connection
    interval_start
, interval_step, interval_max, callback)
 
File "C:\Python27\lib\site-packages\kombu\utils\__init__.py", line 243, in retry_over_time
   
return fun(*args, **kwargs)
 
File "C:\Python27\lib\site-packages\kombu\connection.py", line 237, in connect
   
return self.connection
 
File "C:\Python27\lib\site-packages\kombu\connection.py", line 741, in connection
   
self._connection = self._establish_connection()
 
File "C:\Python27\lib\site-packages\kombu\connection.py", line 696, in _establish_connection
    conn
= self.transport.establish_connection()
 
File "C:\Python27\lib\site-packages\kombu\transport\pyamqp.py", line 112, in establish_connection
    conn
= self.Connection(**opts)
 
File "C:\Python27\lib\site-packages\amqp\connection.py", line 165, in __init__
   
self.transport = self.Transport(host, connect_timeout, ssl)
 
File "C:\Python27\lib\site-packages\amqp\connection.py", line 186, in Transport
   
return create_transport(host, connect_timeout, ssl)
 
File "C:\Python27\lib\site-packages\amqp\transport.py", line 299, in create_transport
   
return TCPTransport(host, connect_timeout)
 
File "C:\Python27\lib\site-packages\amqp\transport.py", line 95, in __init__
   
raise socket.error(last_err)
error
: [Errno 10061] ����������� �� �����������,
[2014-09-18 13:53:05,273: INFO/MainProcess] Task background.tasks.manage.task_b[af505f05-a8c9-472d-8cc1-26159350cb56] succeeded in 5.70799994469s: b

Celery config:
from __future__ import absolute_import
import os

os
.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

from celery import Celery
from django.conf import settings


app
= Celery(
   
'adminity',
    broker
='redis://%s:%i/0' % (settings.REDIS_HOST, settings.REDIS_PORT)
)

app
.conf.update(
    CELERY_ENABLE_UTC
=True,
    CELERY_TIMEZONE
='Europe/Moscow',
    CELERY_RESULT_BACKEND
='redis://%s:%i/1?new_join=1' % (settings.REDIS_HOST, settings.REDIS_PORT),
    BROKER_TRANSPORT_OPTIONS
={'fanout_patterns': True},
    CELERY_IMPORTS
=(
       
'background.tasks.manage',
   
)
)

Redis version: 2.8.8

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Andrew Chase | 18 Sep 04:08 2014

Set Django DB connection specific to Celery

Hey Guys,

I'm running Celery 3.1.13 and Django 1.6.6.

I have been trying to use the CONN_MAX_AGE variable in Django to enable persistent connections and it works fine for the webapp, but something is going wrong when hitting the DB from my celery workers and I'm getting errors sometimes when django hits the DB in the Celery env (OperationalError: could not receive data from server: Bad file descriptor). I am speculating that the error may be because celery closes connections after each task and perhaps that is conflicting with the persistent connection feature enabled by CONN_MAX_AGE. I'm not really sure. My next step is to upgrade my setup to remove the now deprecated django-celery to see if that makes a difference.

However, in the meantime, I'd like to use CONN_MAX_AGE for the db connection when I'm in the appserver context, and not use it for my celery workers.

Is there an environment variable I can when I'm in my settings file to tell whether I'm in the Celery container?

Thanks,
Andrew

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Paul Choi | 16 Sep 01:03 2014
Picon

Publishing task stats to external monitoring service

Hello,

I'd like some ideas on how to accomplish the following - and perhaps someone's already done this already?
1. Overall stats on the worker itself. How many tasks are pending, have succeeded, etc. Looks like Celery Flower already implements this, and it could be done as a periodic task by Celery itself? It may be as simple as publishing results of these methods: http://docs.celeryproject.org/en/latest/reference/celery.worker.state.html?highlight=state#module-celery.worker.state

2. Each task should publish its own stats after finishing. For example, I want the task to publish its name, worker hostname, how many seconds it took to execute the task, etc. Would task signals be the right way to implement this? (http://docs.celeryproject.org/en/latest/userguide/signals.html#task-signals) Again, Celery Flower does this already, but it inspects Celery workers from outside-in, and I'd rather not run another daemon if possible. I'd like the stats publishing to be done by each individual task.

And obviously I don't want to reinvent the wheel if this has been done already.
Thanks for your help!

-Paul Choi

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Eswar Vandanapu | 14 Sep 05:00 2014

Messages stay in reserved status and are not distributed to available tasks after 3.1 upgrade.

We have 8 workers in the master queue, of which one is currently used. It is waiting for completion of 3 other messages submitted into the queue, and we can see them with inspect reserved as below.

However they are not being handed off to any worker even though 7 of them are free. I understand a single machine may pre_fetch them, but that machine itself has 3 more workers doing nothing. It only started happening upgrade to 3.1 from 3.0

We have one task that creates and waits for the other tasks to complete in the same queue.  Other tasks seems to be fine, but this one particular task that creates a large number of child tasks like that is always behaving this behavior.

Please advise us if anything can be done?

(analytics-server_env)gnana <at> ip-10-0-15-117:/opt/gnana/analytics-server$ celery -A gnana inspect reserved
-> b <at> ip-10-0-15-117: OK
- empty -
-> w <at> ip-10-0-14-184: OK
- empty -
-> m <at> ip-10-0-14-184: OK
- empty -
-> w <at> ip-10-0-15-117: OK
- empty -
-> b <at> ip-10-0-14-184: OK
- empty -
-> m <at> ip-10-0-15-117: OK
* {u'args': u"[[u'xxx', u'xxx.com', u'administrative.domain']]", u'time_start': None, u'name': u'tasks.answers.combined_results_for_day', u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'master', u'exchange': u'master'}, u'hostname': u'm <at> ip-10-0-15-117', u'acknowledged': False, u'kwargs': u"{u'target_key': u'main', u'daily_dose': [], u'target': u'bookings', u'trace': u'3c63a2a8-5ddc-4054-b9f8-7c8783b27032'}", u'id': u'd405d9e0-067a-47d6-8bea-9d8999042e61', u'worker_pid': None}
* {u'args': u"[[u'xxx', u'xxx.com', u'administrative.domain']]", u'time_start': None, u'name': u'tasks.answers.combined_results_for_day', u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'master', u'exchange': u'master'}, u'hostname': u'm <at> ip-10-0-15-117', u'acknowledged': False, u'kwargs': u"{u'target_key': u'main', u'daily_dose': [], u'target': u'bookings', u'trace': u'3c63a2a8-5ddc-4054-b9f8-7c8783b27032'}", u'id': u'0362d63c-45f1-45f8-b8c3-6c428a22e0f1', u'worker_pid': None}
* {u'args': u"[[u'xxx', u'xxx.com', u'administrative.domain']]", u'time_start': None, u'name': u'tasks.answers.combined_results_for_day', u'delivery_info': {u'priority': None, u'redelivered': False, u'routing_key': u'master', u'exchange': u'master'}, u'hostname': u'm <at> ip-10-0-15-117', u'acknowledged': False, u'kwargs': u"{u'target_key': u'main', u'daily_dose': [], u'target': u'bookings', u'trace': u'3c63a2a8-5ddc-4054-b9f8-7c8783b27032'}", u'id': u'5bb8fe84-e54d-414d-a312-e466445accd2', u'worker_pid': None}
(analytics-server_env)gnana <at> ip-10-0-15-117:/opt/gnana/analytics-server$

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Simon Forsberg | 12 Sep 10:29 2014
Picon

Revoking tasks with termination

Hello,

I noticed that tasks that have been terminated is completed with success 
status?

Is it supposed to function like that?

Thanks,

Lars Bowlin | 11 Sep 23:17 2014
Picon

Celery as a windows service

I am trying to install celery as a windows service on Windows Server 2008 R2 without much success. 
I'm using the following to create the process: 

sc create "celeryBeat" binpath= "C:\Python27\Scripts\celery.exe beat --workdir=\"<my working dir>\"" start= auto

The service installs successfully, but up on starting displays:
 "Error 1053: The service did not respond to the start or control request in a timely fashion". 
The same celery command runs just fine from the command line, and I've extended that timeout to ~10 minutes with no further success. I have seen suggestions for running it from task scheduler, but I would prefer to use a windows service for deployment automation reasons.

Any help would be greatly appreciated,
-L

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Nino Walker | 11 Sep 21:05 2014

Celery 2.6 to 3.1 performance degradation

We recently upgraded part of our system to 3.1.x, using the Redis backend and multiprocess concurrency.

We've observed a marked decrease in throughput under 3.1. Without flooding the thread with logs (yet), we're observing via strace sluggishness of workers in prolonged blocking READ on the pipe to the parent process (waiting for a task?), which we didn't see in 2.X. There are plenty of tasks in the redis queue.

This blocking read could last up to several seconds, and a number of short tasks would execute, and then we'd block again. Load on the host was well inside of healthy operating parameters.

As a workaround, we are multiple spawning single-process workers who do not suffer the same lag (but is suboptimal).

The stack:
billiard==3.3.0.18

celery==3.1.13

kombu==3.0.21
Ubuntu 12.04.4

Thanks in advance for any advice.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane