Sreejith | 24 Dec 11:40 2014

read messages from rabbitmq using celary in parallel

Hi,

I am new to Celery and RabbitMQ.

I read about http://www.rabbitmq.com/tutorials/tutorial-three-python.html and http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

But I couldn't figure out how to read messages from rabbitmq using celary in parallel ?

If there are multiple messages in RabbitMQ how can I configure python celery workers to read each message and process them in parallel ?

eg : I am reading each message from RabbitMQ and need to check if the message is a number or character, If number then i need to display it as character and vice-verse.



--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
donzeno23 | 19 Dec 20:40 2014
Picon

Celery periodic task and celerybeat only sends task to single worker?

I have a project where webserver is running celerybeat and multiple remote workers running celeryd.  On the werbserver, my tasks.py file has a periodic task scheduled to run once per day.  
I would like to have this task executed on each of the remote workers.  However, I see that only one of the workers actually gets the task, while the others don't get anythin.

Setup:

Webserver running celerybeat and django and rabbitmq
Remote workers (1-20) running celeryd 

tasks.py

<at> periodic_task(run_every=crontab(hour=13, minute=14, day_of_week="fri"))
def run_frontend_regression():
    logger.info("Start running frontend regression...")
    # sync tests in ln5/core
    import os
    os.system('/home/automake/p4_sync_ats_ln5_core.sh')
    #all_tests = read_from_dir()
    all_tests = 140
    tests_per_vm = ''
    active_vms = tuple(list(enumerate(sorted(inspect(['nj1dvgtswin21', 'nj1dvgtswin22', 'nj1dvgtswin23', 'nj1dvgtswin24', 'nj1dvgtswin25', 'nj1dvgtswin26', 'nj1dvgtswin27']).ping().keys()), start=1)))
    logger.info("Active VMs: %s" % active_vms)
    for vm in active_vms:
        print "Running tests on agent %s" % vm[1]
        # call run_fe_test_suite...
        # run_fe_test_suite(agent=vm, ...).set(exchange='testbench', routing_key='testbench.'+vm).apply_async()
        run_suite()


celerybeat v3.0.23 (Chiastic Slide) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://guest <at> localhost:5672//
    . loader -> djcelery.loaders.DjangoLoader
    . scheduler -> djcelery.schedulers.DatabaseScheduler

    . logfile -> [stderr] <at> INFO
    . maxinterval -> now (0s)
[2014-12-19 13:13:51,056: INFO/MainProcess] Celerybeat: Starting...
[2014-12-19 13:13:51,056: INFO/MainProcess] Writing entries...
[2014-12-19 13:13:51,387: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2014-12-19 13:13:51,387: INFO/MainProcess] Writing entries...
[2014-12-19 13:14:00,004: INFO/MainProcess] Scheduler: Sending due task test_bench.tasks.run_frontend_regression (test_bench.tasks.run_frontend_regression)
[2014-12-19 13:14:00,008: INFO/MainProcess] Writing entries...

The task only gets sent to 1 worker (each time is random) when I would expect all 20 workers to get this task.  Am I doing something wrong, is my setup wrong?  Any help would be appreciated.

Even using Django's celerybeat-scheduler, the below tasks set to run every minute only go to 1 worker instead of all 20 (see settings.py file below)

# List of modules to import when celery starts.
CELERY_IMPORTS = ("test_bench.tasks")

from celery.schedules import crontab

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task':'test_bench.tasks.SumTask',
        #'schedule': timedelta(seconds=30),
        #'schedule': crontab(hour=7, minute=30, day_of_week=0),
        #'schedule': crontab(hour="0,12"), #Run every day at midnight and noon
        'schedule': crontab(minute='*/3', day_of_week=1),
        'args':(16,16),
        #'kwargs':{'consumer2':'consumer_2'},
        #'kwargs':{'testagent3':'testagent_3'},  #not needed for SumTask -- SumTask takes no keyword arguments
        #'kwargs':{'queue':'worker3tasks'},
        'options':{'queue':'worker1tasks'},
    },
    'add-every-minutes': {
        'task':'test_bench.tasks.test',
        'schedule': crontab(minute='*', day_of_week='*'),
        'options':{'queue':'worker1tasks','exchange':'taskbcast'},
    },
}


# The mapping of queues the worker consumes from.
# This is a dictionary of queue name/options.
CELERY_QUEUES = {
    "worker1tasks": {
        "exchange": "taskbcast",
        "exchange_type": "fanout"},
    "worker2tasks": {
        "exchange": "taskbcast",
        "exchange_type": "fanout"},
    "worker3tasks": {
        "exchange": "taskbcast",
        "exchange_type": "fanout"},
    "testagent1": {
        'exchange': 'testbench',
        'exchange_type': 'topic',
        'routing_key': 'testbench.testagent1.#'},
    "testagent2": {
        'exchange': 'testbench',
        'exchange_type': 'topic',
        'routing_key': 'testbench.testagent2.#'},
    "testagent3": {
        'exchange': 'testbench',
        'exchange_type': 'topic',
        'routing_key': 'testbench.testagent3.#'},
    "testagent4": {
        'exchange': 'testbench',
        'exchange_type': 'topic',
        'routing_key': 'testbench.testagent4.#'
        }
}



Thanks,
-Michael

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Miki Tebeka | 20 Dec 08:13 2014
Picon

ImportError when running Celery from git master

Greetings,

I'm trying to run celery from master and I see the following error, any ideas?

Thanks,
Miki

$ celery -A t --loglevel INFO worker
Traceback (most recent call last):
  File "/Users/mikitebeka/src/celery/venv/bin/celery", line 9, in <module>
    load_entry_point('celery==3.2.0a2', 'console_scripts', 'celery')()
  File "/Users/mikitebeka/src/celery/venv/lib/python2.7/site-packages/pkg_resources.py", line 378, in load_entry_point
    return get_distribution(dist).load_entry_point(group, name)
  File "/Users/mikitebeka/src/celery/venv/lib/python2.7/site-packages/pkg_resources.py", line 2566, in load_entry_point
    return ep.load()
  File "/Users/mikitebeka/src/celery/venv/lib/python2.7/site-packages/pkg_resources.py", line 2260, in load
    entry = __import__(self.module_name, globals(),globals(), ['__name__'])
  File "/Users/mikitebeka/src/celery/celery/__init__.py", line 130, in <module>
    from celery import five
  File "/Users/mikitebeka/src/celery/celery/five.py", line 19, in <module>
    from amqp.five import __all__ as _all_five
ImportError: cannot import name __all__
$ pip freeze
amqp==1.4.6
anyjson==0.3.3
billiard==3.3.0.19
-e git+https://github.com/celery/celery.git <at> 04e77c0bd14596d8ddc9214e7cca5e817f74c9d2#egg=celery-origin/HEAD
kombu==3.0.24
pytz==2014.10
redis==2.10.3
wsgiref==0.1.2
$ git rev-parse HEAD
8d146d8c14fad744b62694359afe5f02e141ace3

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Javier Marcon | 17 Dec 16:05 2014
Picon

http callback example multiple urls

Hello, I have an application that needs to get asychronously in paralell the result of many http requests. The http requests are made to many url requests fetched at the same time and squentially (using groups and chords)? I need to make many requests in certain order (some at the same time), and process all results together.

Thanks,

Javier.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Eugene Park | 17 Dec 12:21 2014
Picon

Celery load balancing (workers synchronization)

Hello,

I have been looking this up but I couldnt find answers to this.

How does celery workers load balance outstanding tasks in the broker?

For example,
Worker A is 5 prefork
Worker B is 5 prefork

1. Task apple comes in. It can be worked by either worker A or B. let's say worker A pick it and work on it
2. Task orange comes in while worker A is working on task apple. What guarantees that task orange is picked by worker B?

Do workers themselves know which worker has the least amount of CPU being utilized and have it get the tasks from the broker?
Is this what celery means by workers synchronization?

How does celery workers protect itself from overloading the box its on?

Is celery slot-based? 
if it has 5 prefork, is that mean it can only do 5 tasks?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Jeremy Tillman | 16 Dec 08:02 2014

Heartbeats and custom ConsumerStep

I'm using a pyamqp transport to a RabbitMQCluster through a LoadBalancer. Because of tight idle connection constraints of the load balancer, I am using heartbeats to keep connections alive. I've read many of threads concerning long running tasks that dictate the amount of time between heartbeats. For example, heartbeats are set to 5 minutes but tasks taking 10 minutes make the connection drop because there is no time to send heartbeats back. This confuses me because I would expect heartbeats to take place on a separate/timer independent of my tasks. I'm unsure if this scenario happens in celery proper but it occurs in custom ConsumerSteps in my case.

My question is, is it common practice or reasonable to spawn threads to process message for the ConsumerStep so that we may still serve heartbeats and not be effected by long running tasks:

Example Code:

class MyCustomStep(bootstep.ConsumerStep):
    def get_consumer(self, channel):
        return [Consumer(channel, queues=[myqueue], on_message=self.on_message)]

    def on_message(self, message):
        worker_thread = threading.Thread(target=self.do_work, args=(message,))
        worker_thread.start()

    def do_work(self, message):
        result = custom_app_task.apply(message)
        if result.state = states.SUCCESS:
            message.ack()
        else:
            message.reject()

I know I would have to manage the amount of threads that get spawned, possibly by the amount of prefetched messages allowed.

Thanks for the advice in advance,

-JT

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mike | 14 Dec 05:40 2014
Picon

Help with Chord returning ValueError(gid)

I'm hoping someone can tell me what I'm doing wrong here.  I have a load and post_load task.  That part works fine.  I'm having trouble when I tried to chord multiples of these together to do something like a "batch load."  post_load returns 'SUCCESS' or 'FAILURE'.  If any one load was a fail, the whole batch is a fail.  Incidently, I don't actually care about the result of get() and would be quite happy enough just to move along.  I'm just having trouble getting the callback to execute.  I get an error: 

 ERROR/Worker-5] Chord callback 'c2c185a0-77b6-44e3-e061-8895e465e9e9' raised: ValueError('c2c185a0-77b6-44e3-e061-8895e465e9e9',)

Any ideas?  I'm using RabbitMQ with Memcached.  I have a ton of space allocated to memcached and it doesn't appear to be even close to limit.   Also the load processes take seconds to execute.  I hope I'm just doing something dumb.


<at> app.task
def mytask_check(statuses, loadid, datestring):
   
for s in statuses:
       
if s != 'SUCCESS':
            mylogger
(loadid, datestring, 'FAILED')
           
return False

    mylogger
(loadid, datestring, 'SUCCESS')

<at> app.task
def mytask(loadfile):

    datestring
, filelist, loadid = get_load_data(loadfile)

    loadtasks
= []
   
for file in filelist:
        loadtasks
.append(  ( load.si( loadfile, datestring ) | post_load.s() )  )

   
if len(loadtasks) > 0:
        res
= chord(loadtasks)(mytask_check.s(loadid, datestring))
       
print 'Chord response: {resp}'.format(resp=repr(res.get()))
   
else:
        mylogger
(loadid, datestring, 'NOTHING TO DO')

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mike | 13 Dec 03:06 2014
Picon

celery.contrib.batches retry?


I'm using Batches to flush logs to a database.  This does seem to work very well, however there is a case where some exception might occur that makes me want to retry the messages later. 

Is there a normal way to do this?  Do I just mark the individual requests as retry using app.backend.mark_as_retry? I'm not sure if that is enough to cause it to retry, or if that is just telling that this message was retried? Or am I able to use the normal raise retry? I only have general knowledge about what goes on under the hood with celery, which is why I ask.  

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Lukáš Homza | 11 Dec 20:56 2014
Picon

Celery chain does not work

I cant get a simple chain to work:

<at> app.task() def send(): print 1 (load.s() | push.s() | dequeue.s())() # doesnt work as well #result = chain(load.si() | push.si() | dequeue.si())() <at> app.task() def load(): print 2 <at> app.task() def push(): print 3 <at> app.task() def dequeue(): print 4

It will always output 1. I would expect it to output 1 2 3 4. I have tried with apply_async, moving the chain to separate function, but just can't get it to work...

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Igor Vasilyev | 11 Dec 17:48 2014
Picon

Celery block some tasks while executing a long-running task

Hi.

We have a celery setup with a RabbitMQ broker. Concurrency of celery worker is set to 20.
I found while executing a long-running tasks(~2 hours) celery move some jobs to a "reserved" state and don't execute them until this long-running task is completed.
This is not an issue of celery overload because we have maximum 2 other possible tasks to be executed in the same time. Seems that the main process sometimes put tasks somewhere to the queue of the worker which is currently executes a long-running task.

Is it known bug or feature?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Walt Askew | 11 Dec 00:44 2014
Picon

Celery, gevent and AsyncResult.get()

I'm building a web application with Flask, gevent-websocket and Celery.  The workflow I'm trying to support is:

- a client requests a job via a websocket being listened to by gevent
- the web app submits the job to celery
- when the job is done, the result from celery is written back into the web socket

some psuedo-ish code:

<at> app.task
def handle_message(message):
   ...

def handle_request(socket):
   while True:
        message = socket.receive()
        job = handle_message.delay(message)
        result = job.get()
        socket.send(result)

where gevent is in charge of that loop.

My question is if that job.get() call will mess with gevent's cooperative multitasking by blocking and not letting other greenlets execute because there isn't a gevent.sleep() call or something to yield control to another greenlet.  If that's the case, is there something I can do to make job.get() yield control to other greenlets?

I was using rabbitmq as the backend so that I could do non-polling job.get() calls, but it does occur to me that I could use redis or another polling backend to do something like:

while job.status not in (celery.states.FAILURE, celery.states.SUCCESS)
    gevent.sleep(1)

to ensure I'm playing nice with gevent, but I'd prefer not to have to poll my result backend if I don't have to.

Thanks, and sorry if this is a silly question or one that would be better placed on the gevent mailing list -- I'm new to many of these technologies.  Other suggest on how to architect a system like this would also be welcome if I appear to be doing something silly.

Thanks again!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane