Robert Haddon | 20 Jun 01:58 2015
Picon

chord fails to detect SUCCESSFUL completion of simple tasks


I'm wondering if there is something wrong with my configuration, because while there are quite a few posts about problems with chords when the header task(s) fail, there are none that I found that have problems when header tasks succeed!

For me, this is 100% reproducible with a very simple example:

header = add.s(1,2) | group([add.s(3), add.s(4)])
callback
= accept_sequence.s()
chord
(header)(callback)

Called as header.delay(), I can run the header tasks all day long without issue.  I get back a group result of [6, 7].
However, if I put it in a chord with a simple callback task, it fails every time.  chord_unlock fails the deps ready check and loops forever.

I've tried RabbitMQ, memcached, and redis for my result backend.  No difference.

If I remove the chain from header and just run a group of tasks, then chord works fine.  A group of chains also works fine.  But a chain that feeds a group is broken.

Possibly relevant config items:
CELERY_IGNORE_RESULT = False
CELERY_CHORD_PROPAGATES
= True



--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Asmodehn Shade | 17 Jun 08:58 2015
Picon

Passing arguments from command line ( or worker.main() ) all the way to the task

Hello,

I am currently trying to use celery for a complex setup, mixing celery and ROS. http://www.ros.org/
I have one redis server ( no problem there ), but my workers will be different and therefore provide different tasks at different times...
Also my workers can be offline for an extended period of time ( think robots moving around, doing stuff).

While it might not be what celery has been designed for at first, I think I can leverage celery existing codebase, and its extensive parametrization to avoid spending a few years rewriting everything :-)
However I m just beginning to discover celery's potential...

I am currently looking at a way to pass a parameter from the command line ( or worker.main() ) to the task ( or bootstep ).

The parameter set I want to pass specify the configuration of the current worker.
And each task can run or not, based on that configuration + some dynamic introspection.

I already have the part to setup custom arguments :

# Starting Celery worker process
tasks.celery.worker_main(argv=['celery', '--loglevel=DEBUG', '--broker=' + config.default.CELERY_BROKER_URL, '--autoreload', '--custom-arg=ARG'], )
celery.user_options['worker'].add(
Option("--custom-arg", action="store_true", dest="custom_arg", default=False, help="Activate support of custom_arg")
)

class CustomArgs(bootsteps.Step):

def __init__(self, worker, custom_arg, **options):
# store the config
self.custom_arg = custom_arg

celery.steps['worker'].add(CustomArgs)
But how can my class constructor access this argument ?

class MyTask(Task):
abstract = True
def __init__(self, *args, **kwargs):
super(MyTask, self).__init__(*args, **kwargs)

try:
# get_custom_args() ????
# construct introspector_class_instance for dynamic introspection of system based on get_custom_args()
pass
except Exception, e:
raise


<at> celery.task(base=MyTask, bind=True)
def do stuff(self, task_args):
#do stuff with introspector_class_instance and task_args together
#reject things we cannot do here.


Or is there a better way to do this ?

Basically I am trying to get my worker to dynamically check if they can satisfy a task, and reject the task if not ( or even better, not consume it in the first place, but this might depend on dynamic behavior on worker )...

Thanks for any help/insights/comments :-)

Cheers,
--
AlexV

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Julien B | 16 Jun 17:10 2015
Picon

Celery and Python functools wraps

Hi, I asked the question on stack overflow (http://stackoverflow.com/questions/30865670/celery-and-python-functools-wrapper) but I think it is also appropriated to post it here :

I'm trying to create a general wrapper to do the following :

 - act as the celery task wrapper
 - update the meta **outside** the wrapped function
 - update the meta **inside** the wrapped function

async.py

    def my_celery_wrapper(func):
        <at> celery_app.task(bind=True)
        <at> wraps(func)
        def wrapper(task,*args, **kwargs):
            print(func.__name__ + " is called")
            task.update_state(state='STARTED')
            func(task,*args, **kwargs)
            task.update_state(state='END')
        return wrapper

foo.py

    from async import my_celery_wrapper
    import time
   
    class Foo(object):
        def __init__(self):
            print('Foo init')
        <at> my_celery_wrapper
        def bar_async(self):
            for i in range(0,10):
                task.update_state(state='PROGRESS',meta={'progress':i})
                time.sleep(2)

I can't get it to work or detect the task inside the wrapper so my questions are : what is wrong with this code ? And also is it the way to go ?

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Manas Hardas | 12 Jun 20:53 2015
Picon

Django 1.8.2 & djcelery & celery 3.1.18 AppRegistryNotReady: Translation infrastructure cannot be initialized

I am getting the following error:

File "/Library/Python/2.7/site-packages/Django-1.8.2-py2.7.egg/django/utils/translation/trans_real.py", line 164, in _add_installed_apps_translations "The translation infrastructure cannot be initialized before the " django.core.exceptions.AppRegistryNotReady: The translation infrastructure cannot be initialized before the apps registry is ready. Check that you don't make non-lazy gettext calls at import time.

I have a project which is not really a django app but a celery app. Therefore, I have not created a wsgi.py or models.py or any of the typical files created by django-admin when a project or app is started.

I only want to use djcelery to be able to create periodic tasks using the djcelery.schedules.DatabaseScheduler like specified here Add, modify, remove celery.schedules at run time and here How to dynamically add / remove periodic tasks to Celery (celerybeat)

The solution to the problem as given here (AppRegistryNotReady, translation bug when deploying with uWSGI) requires me to make changes to vassal.ini file. There is no vassal.ini file in my implementation.

I will briefly describe my proj -

proj apps.py tasks.py celeryconfig.py runproj.py - apps.py from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryconfig') myapp = Celery('myapp') myapp.config_from_object('celeryconfig') if __name__ == '__main__' myapp.worker_main('-B', '-S', 'djcelery.schedules.DatabaseScheduler') - tasks.py from apps import myapp <at> myapp.task(name='msgprinter') def msg_printer(msg): print msg - runproj.py from djcelery.models import PeriodicTask, IntervalSchedule intSch = IntervalSchedule(period='seconds', every=30) periodic_task = PeriodicTask( name = 'msg_printer_schedule', task = 'proj.tasks.msg_printer', interval = intSch, args=json.dump(['such wow']), ) periodic_task.save() - celeryconfig.py CELERY_ACCEPT_CONTENT = ['pickle', 'json'] BROKER_URL = 'amqp://guest <at> localhost' CELERY_IMPORTS = ('proj.tasks') CELERY_QUEUES = [Queue('default', Exchange('default', type='direct'), routing_key='default')] #DJANGO SETTINGS INSTALLED_APPS = ( 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', 'mypp') DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join('/home', 'test.db'), } }

Before I run the workers I created the required tables using the django-admin migrate command. I can see the tables in the /home/test.db database.

First I run the workers - $python apps.py Then I save a schedule to the database to be executed repeatedly by celerybeat daemon - $python runproj.py

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Phill Tornroth | 12 Jun 18:56 2015

Chord's footprint in Redis grows extra-linearly with number of tasks, making memory spikes very hard to predict


Today I was chasing down a memory spike in the redis instance we use to back Celery. The spike corresponded with queuing a large number of tasks, but it was vastly non-linear in relation to the number of tasks queued compared to previous nights.

It seems the Canvas stuff is the reason.

First off, my primary task is a simple one that takes an int and returns nothing. The reason we're using Canvas was to fire off a finalizer that tells us the work is done.

Celery 3.1.16, Redis backend.

I did some testing only queuing tasks (no celery workers were running consuming the tasks) and flushed redis in between, used Redis' INFO command to tell me how much memory was consumed. Here's what I see:

10k tasks queued normally (.delay()) =  ~8MB
50k tasks queued normally (.delay()) =  ~40MB
10k tasks in a group() queued = ~9.2MB

Now the chords, done via: chord( task.si(i) for i in xrange(N))(finalizer.si())

1k = ~11MB
10k = ~900MB
50k = Not sure. I stopped it at over 10G


So it seems that a chord's footprint in redis grows extra-linearly with the number of tasks queued in it. I'm wondering if this is known/expected and whether there are any options other than not using Canvas? For instance, does the RabbitMQ backend have this issue?

I'm also curious what the reasoning is for this. I haven't dug in deeply to pull apart the data structure and understand why this growth is happening myself yet.

Thanks in advance!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
mike | 11 Jun 15:37 2015

Creating a single Async task that makes many Async tasks.

This has been a face palm moment for the last day now. I am having issues writing a task to process a bunch of log files. It seems that it get hung up on the parent task when executed in Async mode. I have no issues running the same task in sync mode.  I have no idea what I am doing wrong. 


The task:
def search(term):
    data = group(process_file.s(filename=f, term=term) for f in get_logs_files()).delay().get()
    return process_result(data)

When calling the above task with .delay it will forever be in pending even though all of the children have completed successful.

Any help would be greatly appreciated. 

Thanks, 

Mike

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Mazzaroth M. | 7 Jun 22:45 2015
Picon

dynamically update celery beat schedule

Hi all, I'd like to make my celery beat schedules more flexible by starting and stopping them dynamically, and choose which schedules I'd like to run. A thought I had was to store the schedules in a database and perhaps through some control panel start and stop them. Here is how I initiate celery currently in a static manner:

http://paste.ofcode.org/39X6uWdv7dTG7cAygsj5nP8

is this possible and are there any recommendations of best practices/tutorials etc?


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
willian coelho | 3 Jun 16:27 2015
Picon

ImportError when importing tasks to use them

Hi there!

I`m working on a web app (flask) where i`m using celery to distribute some tasks. I`m using the factory pattern described on http://flask.pocoo.org/docs/0.10/patterns/celery/
and getting the following error when running:

celery worker -A portautos.tasks -l info

error:
Traceback (most recent call last):
  File "/Users/macbookair2011/Documents/envs/portautosapp/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/__main__.py", line 30, in main
    main()
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main
    cmd.execute_from_commandline(argv)
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/bin/celery.py", line 769, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
    argv = self.setup_app_from_commandline(argv)
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/bin/base.py", line 465, in setup_app_from_commandline
    self.app = self.find_app(app)
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/bin/base.py", line 485, in find_app
    return find_app(app, symbol_by_name=self.symbol_by_name)
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/app/utils.py", line 232, in find_app
    sym = imp(app)
  File "/Users/macbookair2011/Documents/envs/portautosapp/lib/python2.7/site-packages/celery/utils/imports.py", line 101, in import_from_cwd
    return imp(module, package=package)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
  File "/Users/macbookair2011/Desktop/projects/portautos/portautos/tasks.py", line 15, in <module>
    celery = create_celery_app()
  File "/Users/macbookair2011/Desktop/projects/portautos/portautos/app.py", line 46, in create_celery_app
    app = app or create_app()
  File "/Users/macbookair2011/Desktop/projects/portautos/portautos/app.py", line 35, in create_app
    from .apps.accounts import module as accounts
  File "/Users/macbookair2011/Desktop/projects/portautos/portautos/apps/accounts/__init__.py", line 6, in <module>
    from .views import LoginView, RegisterView, LogoutView, ConfirmView
  File "/Users/macbookair2011/Desktop/projects/portautos/portautos/apps/accounts/views.py", line 10, in <module>
    from ...models import db, User
  File "/Users/macbookair2011/Desktop/projects/portautos/portautos/models.py", line 10, in <module>
    from .tasks import send_mail
ImportError: cannot import name send_mail


As you can see, when importing the tasks to call it, the error is raised. All my tasks are in tasks.py on the application root.

├── README.md
├── bower.json
├── gulpfile.js
├── manage.py
├── migrations
├── package.json
├── portautos
│   ├── __init__.py
│   ├── __init__.pyc
│   ├── app.py
│   ├── apps
│   │   ├── __init__.py
│   │   ├── accounts
│   │   ├── aquisition
│   │   └── home
│   ├── core
│   │   ├── __init__.py
│   │   ├── exceptions.py
│   │   ├── views.py
│   ├── extensions.py
│   ├── models.py
│   ├── settings
│   │   ├── __init__.py
│   │   ├── local_settings.py
│   │   └── test_settings.py
│   ├── static
│      ├── tasks.py
│   ├── tasks.pyc
│   ├── templates

My tasks.py:

# coding:utf-8
"""tasks.py
"""
from celery.utils.log import get_task_logger

from flask import render_template as render
from flask.ext.mail import Message

from .extensions import mail
from .app import create_celery_app

# logging
logger = get_task_logger(__name__)

celery = create_celery_app()


<at> celery.task
def send_mail(to, subject, template, **kwargs):
    """"""
    logger.info('Sending email to {}.'.format(to))
    try:
        msg = Message(sender=celery.conf['FLASK_MAIL_SENDER'],
                      recipients=[to],
                      subject=subject)
        msg.body = render(template + '.txt', **kwargs)
        msg.html = render(template + '.html', **kwargs)
        mail.send(msg)
    except Exception as e:
        logger.error(e)
    logger.info('An email has been sent to {}.'.format(to))

I`ve noticed when removing the import statement and run the celery again it starts without errors. But i need to import it to call the tasks :-).


Thanks in advance,
Michael Coelho.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Robert Waters | 2 Jun 12:16 2015
Picon

Storing data in task instance during __call__; safe?

Is it safe to add state to a Task instance (celery3) at runtime inside the __call__ method?


I need to communicate some data to my post-run handler at runtime, and I don't wish to change the function signature for all my tasks just to accommodate this.

I have defined a custom task abstract class and am hanging this data off 'self' in the class's __call__ method. It works well enough, but I am concerned that at scale there may be issues, like this state persisting after the task exits, to the next instance.

Thanks!

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
tagsense | 28 May 05:05 2015
Picon

Latency issues

Hello,
We have been trying to use Celery with Django to enable asynchronous execution of database updates. We have tried Celery with both RabbitMQ and Redis, and the combination (former for the broker and later for storing results) of them. One thing I noticed is that the latency (difference between the request for a task is placed and time at which gets picked up for execution) is significant - in the order of few hundred milliseconds (~ 1 sec). That too, this is happening on a machine with substantial resources (8 Intel(R) Atom(TM) CPU C2750 <at> 2.40GHz and 32 GB RAM). Moreover, there were not many other processes running at the same time.

Please advise how to resolve this issue. Thanks in advance.









--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.
Pedro Werneck | 27 May 02:49 2015
Picon

Worker heartbeat issues with long running tasks

I'm running Celery 3.0.18 with eventlet 0.14 and RabbitMQ 3.4.2 on Amazon EC2. In order to monitor worker nodes with our monitoring system I created a new RabbitMQ binding that routes all "worker.heartbeat" messages to a queue that's consumed by the monitoring system, which alerts if it doesn't hear from a worker node for 5 minutes. It's working for several months, except for some false alarms that I realized are because the worker isn't sending the heartbeats during a long-running task. Documentation for celery.worker.heartbeat says it uses an internal thread to send them, so I assume it isn't blocked by task execution. Is that correct? 

Any ideas on what might be wrong? If heartbeats do get blocked by long-running tasks, is there any way I can send them explicitly from inside the task?


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users+unsubscribe-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
To post to this group, send email to celery-users-/JYPxA39Uh5TLH3MbocFFw@public.gmane.org.
Visit this group at http://groups.google.com/group/celery-users.
For more options, visit https://groups.google.com/d/optout.

Gmane