Lukáš Homza | 6 Dec 13:18 2014
Picon

Celery task execution practices

Is it generally good practice to keep the code that will be executed bundled with celery package or installing it as a separate package and then importing?

E.g.

from mypackage import celery
from mypackage.workers import worker1

<at> task
def x():
 # lots of code

 
return id

instead of

from mypackage import celery
import packagexyz.worker1 as worker1

<at> task
def x():
 id
= worker1.getId()
 
 
return id

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
bussiere adrien | 6 Dec 13:23 2014
Picon

celery not using workers

So i have configured two worker on two servers with rabbitmq they work fine on each i have :
[2014-12-06 12:11:24,532: INFO/Beat] beat: Starting...
[2014-12-06 12:11:24,649: INFO/MainProcess] Connected to amqp://bussiere:** <at> 127.0.0.1:5672/celeryhost
[2014-12-06 12:11:24,698: INFO/MainProcess] mingle: searching for neighbors
[2014-12-06 12:11:25,784: INFO/MainProcess] mingle: sync with 1 nodes
[2014-12-06 12:11:25,785: INFO/MainProcess] mingle: sync complete
/usr/local/lib/python2.7/dist-packages/celery/fixups/django.py:254: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2014-12-06 12:11:25,892: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/fixups/django.py:254: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2014-12-06 12:11:25,893: WARNING/MainProcess] celery <at> ns331640 ready.
[2014-12-06 12:11:27,792: INFO/MainProcess] Events of group {task} enabled by remote.
[2014-12-06 12:12:31,166: INFO/MainProcess] sync with celery <at> ns331641


So my two worker are ready


i launch them with :
 celery -A tasks worker -B --loglevel=info



here are my file tasks.py

from celery import Celery

celery = Celery('tasks', broker='amqp://bussiere:****** <at> localhost:5672/celeryhost', backend='amqp')

<at> celery.task(name="diststock")
def getdateabrdist(abrstock,day,redisram):
    while (int(str(time.time()).split(".")[0]))-int(get("yahoo").split(".")[0]) < 4  :
        time.sleep(1)
    getShareDate(abrstock,day,redisram)
    put("yahoo",str(time.time()))



here is the program that use it : (the program work fine solely) updateStock.py


from tasks import getdateabrdist
def parcourdaystock(abrstock):
    i= 0
    limit =  366*10
    compteurfalse = 0
    while i < limit :
        day = date.today() + relativedelta( days = i*-1 )
        if not isweekend(day) :
            try :
                getdateabr(abrstock,str(day))
                compteurfalse = 0
            except Exception as e:
                print e
                compteurfalse = compteurfalse + 1
        i = i +1
        if compteurfalse > 4 :
            i = limit

def getdateabr(abrstock,day):
    #getShareDate(abrstock,day,False)
    getdateabrdist(abrstock,day,False)


if __name__ == '__main__':
    for stock in Stock.objects.all():
        parcourdaystock(stock.AbrY)



i've installed flower and i see that the two worker are fine and ready but when i launch updateStock.py it doesn't use the worker ....

I don't understand why ...


thanks and regards

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Tony Barbieri | 6 Dec 01:02 2014
Picon

Crash While Quickly Processing Tasks

Hello,

This is kind of a strange one but periodically while processing a queue with a large number of tasks the following error occurs:

Traceback (most recent call last):
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 471, in run
    return self.body()
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 530, in body
    pool._maintain_pool()
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 1245, in _maintain_pool
    self._repopulate_pool(joined)
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 1230, in _repopulate_pool
    self._create_worker_process(self._avail_index())
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 1062, in _create_worker_proces
    w.start()
  File "vendor\celery-3.1.11\win64_python2.6\billiard\process.py", line 137, in start
    self._popen = Popen(self)
  File "vendor\celery-3.1.11\win64_python2.6\billiard\forking.py", line 242, in __init__
    cmd = get_command_line() + [rhandle]
  File "vendor\celery-3.1.11\win64_python2.6\billiard\forking.py", line 359, in get_command_line
    os.path.abspath(os.path.join(__file__, "..", ".."))])
  File "c:\Python26\lib\os.py", line 420, in __setitem__
    putenv(key, item)

Traceback (most recent call last):
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 288, in run
    sys.exit(self.workloop(pid=pid))
  File "vendor\celery-3.1.11\win64_python2.6\billiard\pool.py", line 354, in workloop
    put((ACK, (job, i, now(), pid, synqW_fd)))
IOError: [Errno 232] The pipe is being closed


I'm not sure what's causing it.  This is on Windows using Celery 3.1.11.  I can provide my config settings if necessary.

Best,

--
Tony

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
abhishek pareek | 5 Dec 11:30 2014
Picon

Celery is rerunning completed tasks over and over

I've a python celery-redis queue processing uploads and downloads worth gigs and gigs of data at a time.

Few of the uploads takes upto few hours. However once such a task finishes, I'm witnessing this bizarre celery behavior that the celery scheduler is rerunning the just concluded task again by sending it again to the worker (I'm running a single worker) And it just happened upto 4 times on the same task! (My CELERYD_PREFETCH_MULTIPLIER is set to 1)

Can someone help me debug why is this happening and how can I prevent it?

The tasks are definitely finishing cleanly with no errors reported just that these are extremely long running tasks.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Simon Forsberg | 4 Dec 09:52 2014
Picon

Workers stops responding/hangs

Hello,

I have gotten into an issue with celery on my production machines.

Workers stops responding/hangs after some time of inactivity. This also happens to Flower?

In the worker log(debug trace) I can only see entries like the following:

[2014-12-04 09:35:41,210: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]

The weird thing is that the health of the process seems fine and the worker seems to run, but it is completely disconnected from the cluster and does not respond to inspect-commands. Could this have something to do with my redis instance? Maybe due to key expiration or something?

Thanks,
Simon

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Martin Tiršel | 3 Dec 12:20 2014
Picon

Ignoring all tasks in some automated test (Django project)

Hello,

I have a complex Django application with a lot of Celery tasks (some called from another tasks). Now, I am writing a Selenium functional test and because of the complexity, I need to disable all tasks in some tests. I need to check some states in user interface, call some tasks directly, check the interface and so on. Is there an easy way how to disable all tasks? I didn't found any setting for that.

My approach would be something like this:

# project/mytask.py
from django.conf import settings

class MyTask(Task):
   
def apply_async(self, *args, **kwargs):
       
if settings.CELERY_TASKS_DISABLED:
           
return
       
super(MyTask, self).apply_async(*args, **kwargs)



# project/app/tasks.py
<at> app.task(base=MyTask)
def task_foo():
   
pass


<at> app.task(base=MyTask)
def task_bar():
   
pass



# project/app/tests.py
<at> override_settings(CELERY_TASKS_DISABLED=True)
class MyTest(SeleniumTestMixin, LiveServerTestCase):
   
def test_something(self):
       
self.browser.get('/something/')
       
# Some checks.
        task_foo
()        self.browser.get('/something/')
       
# Some checks.
        task_bar
()
       
self.browser.get('/something/')
       
# Some checks.

class AnotherTest(SeleniumTestMixin, LiveServerTestCase):
   
def test_something_else(self):
       
# task called within this code is executed normally
        self.browser.get('/something/')



Is this a correct approach or is there something easier? Is it possible to use MyTask as a default task base so I don't need to go through the code and update every task with base=MyTask?

Thanks!
Martin

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Alexandre Garel | 2 Dec 10:16 2014
Picon

What is the good way for tests in Celery ?

Hello,

The page http://celery.readthedocs.org/en/3.0/django/unit-testing.html was removed in latest doc by commit:f1e33abcd076c  but I didn't find any alternative.

In our application we use ALWAYS_EAGER = True and it works for most case.
But there are case were it won't work, and there are more of them as we use atomic_requests and  transaction hooks to launch tasks after successful commit.

What I did was using a context manager as a helper to run tasks sequentially : https://gist.github.com/alexgarel/88720e50abaf30594634

As you can see this context manager has the drawback of being a bit costly as it must monkey patch each task !

Is there anything better provided by Celery ?

Maybe I could propose a patch so that apply_async take into account a new value for ALWAYS_EAGER (or another constant) that would just log tasks to be run, avoiding such a monkey patch ?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
zvikico | 26 Nov 22:31 2014
Picon

RabbitMQ: tasks are queuing as "Unacked" instead of "Ready"

Hi,

I have a new RabbitMQ/Celery deployment and I'm getting some weird behavior. I've done several in the passed and, in all cases, I could see messages queuing up as "Ready" in the RabbitMQ console. However, with the current configuration, they only appear as "Unacked". I get thousands of messages in "Unacked" with less than 20 workers processes. When I add more workers, the queues are quickly drained, so my conclusion is that the messages are not really reserved. 

I'm not sure why. I should point out that:
  • I'm using late ack on all tasks.
  • My RabbitMQ is in a cluster configuration with queue replication for task queues (not other celery queues).
  • All tasks are currently sent with no ETA.
Any ideas?

Thanks,
Z

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Matthieu Rigal | 26 Nov 18:06 2014

CELERYD_FORCE_EXECV vs CELERY_FORCE_EXECV

Hi all,

Searching on the website, I could find a reference to CELERY_FORCE_EXECV in the release notes of Celery 3.0 and a reference to CELERY_FORCE_EXECV in Celery 3.1.17.

The first one is supposed to be by default True and the second is now unrecommended to be set to True.

Is there such a difference in the behaviour of the app being run as a daemon ? Can the two settings somehow interact between each other ? Is it possible to read/learn a bit more about the issues and consequences of both settings ?

Thanks in advance,
Matthieu

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Derek Tubbs | 25 Nov 18:14 2014
Picon

Task selection to maximize utilization with preferred queues

Good Afternoon,

 

I apologize if this is not the right location for this question and would appreciate being redirected to the proper venue.

 

I am new to celery and have been reading the documentation thoroughly over the last few days.  I would like to use it to replace a homebrew distributed computing application I have.

 

This application is shared amongst multiple customers.  In this scenario, tasks have a large time cost (upwards of 1hr each) and require specialized equipment.  The goal is to keep all workers fully utilized at any given time.  Customers are allowed to purchase or “reserve” workers for their particular tasks.  However, in the case that a reserve customer does not currently have any jobs running, the reserve worker should service jobs from any other customer.  As such, it is OK for newly added reserve-customer tasks to wait for general tasks currently assigned to reserved-workers to complete.

 

The best solution I see thus far would be to provide the worker with multiple topical queues to consume from, something like:

-Q    reserve-customer.tasks,#.tasks

 

However, the worker currently gives each queue equal weight, resulting in a round-robin type behavior.

 

Therefore,

1.            Is there an alternative mechanism or routing feature for tasks that would accomplish the same intention?

2.            Is there an option that causes the worker to give preference to queues based on order vice round-robin?

3.            Being a moderately experienced python developer, I am willing to make patches or changes to the worker code to accomplish this perhaps non-standard implementation.  Could someone point to which file or section is responsible for choosing the queue from which to get the next task?

4.            Am I going about this entirely the wrong way?  Is there a different feature or project better suited to my given scenario?

 

Coincidentally, solving this use case may also provide a workaround for assigning task priorities. Presumably, a worker could consume from each priority level giving preference to the first queues listed as such:

               

                -Q    tasks.priorityHIGH,tasks.priorityMED,tasks.priorityLOW

 

I sincerely appreciate the time taken to review my questions and any recommendations you can provide.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Yerzhan Torgayev | 23 Nov 19:16 2014
Picon

acks_late not recovering scheduled tasks after Rabbitmq crash

I am using Celery with https://github.com/gjedeer/celery-php for PHP. Readability is important for our project. So I want to be sure that all tasks that scheduled are executed even Rabbitmq crash. So I scheduled one task, then I stopped Rabbitmq and started again. When scheduled time come, task not executed. 
Here is my task. 
from celery import Celery
import subprocess

app = Celery('tasks', backend='amqp', broker='amqp://')

<at> app.task(acks_late=True)
def hipoCheckSubscriptions(args):
    return subprocess.call(['php54','/var/project/hipo/protected/yiic.php', 'mq',
        'checkSubscription', '--args='+args])

Please help, what I am doing wrong? And is there any recipes for my case. 

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane