Pablo Molnar | 1 Feb 01:27 2012
Picon

Re: Lost messages in HA tests in a cluster

Ok...so I'm maybe confused. Basically I follow this example:



Using the sentences ch.confirmSelect(); and ch.waitForConfirmsOrDie();  all in a durable queue.

This example doesn't cover republishing nacks? Do you have an example? I have to implement a ConfirmListener?

Thanks for helping Simone!

On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <simone.busoli-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:

Hi Pablo, looking at your code I can't see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn't arrive from the broker, which usually happens when the broker dies.

On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:
Hi all!

I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.

Test OK: Kill node while consuming:
1 - Setup a clean 3 node's cluster
2 - Execute producer with 10.000 messages connected to node A
3 - Wait producer to finish
4 - Execute consumer connected to node A
5 - While consumer is running kill node A

Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.

Test FAILED: Kill node while producing:
1 - Setup a clean 3 node's cluster
2 - Execute consumer to start listening connected node A
3 - Execute producer with 10.000 messages connected to node A
4 - While producer is running kill node A

Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages already published to node A are lost.


These are my settings:

rabbitmqctl status
Status of node 'rabbit <at> i-00000007-asm' ...
[{pid,11339},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
      {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
      {rabbit,"RabbitMQ","2.7.1"},
      {os_mon,"CPO  CXC 138 46","2.2.4"},
      {sasl,"SASL  CXC 138 11","2.1.8"},
      {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
      {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
      {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
      {inets,"INETS  CXC 138 49","5.2"},
      {mnesia,"MNESIA  CXC 138 12","4.4.12"},
      {stdlib,"ERTS  CXC 138 10","1.16.4"},
      {kernel,"ERTS  CXC 138 10","2.13.4"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},
 {memory,
     [{total,92565608},
      {processes,4004968},
      {processes_used,3996224},
      {system,88560640},
      {atom,1322033},
      {atom_used,1291462},
      {binary,32496},
      {code,15264387},
      {ets,1174192}]},
 {vm_memory_high_watermark,0.3999999999362281},
 {vm_memory_limit,2508940902}]
...done.



rabbitmqctl cluster_status
Cluster status of node 'rabbit <at> i-00000007-asm' ...
[{nodes,[{disc,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']},
         {ram,['rabbit <at> i-00000009-asm']}]},
 {running_nodes,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']}]
...done.



Java amqp-client 2.7.1


- Producer.groovy (java amqp-client 2.7.1)

import com.rabbitmq.client.*
try{

// Get rabbitmq config
def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())

def rabbit = new RabbitHA(config)
rabbit.init = { channel ->
  channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange
  channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy
  channel.queueBind('myQueue', 'myExchange', '')
  channel.confirmSelect()
}

10000.times { idx ->
  rabbit.publish { channel ->
    def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent
    def msg = "Message $idx"
    channel.basicPublish('myExchange', '', properties, msg.getBytes())
    println msg
  }
}

rabbit.close()

} catch(e){e.printStackTrace()}


- Consumer.groovy

import com.rabbitmq.client.*

try{

  // Get rabbitmq config
  def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())

  // Connect
  def rabbit = new RabbitHA(config)
  rabbit.onDelivery('myQueue'){ delivery, channel ->
  def msg = new String(delivery.body)
  println msg

  // Manual ack
  channel.basicAck(delivery.envelope.deliveryTag, false)

} catch(e){e.printStackTrace()}


- RabbitHA.groovy

import com.rabbitmq.client.*

/**
 *
 * RabbitMQ highly available proxy.
 * Basic implementation of a basic suscriber/publisher with reconnect logic.
 *
 */
class RabbitHA {
    ConnectionFactory connectionFactory
    Address[] addresses
    Closure init

    Connection connection
    Channel channel
    QueueingConsumer consumer

    public RabbitHA(Map config) {
        this(config, null)
    }

    public RabbitHA(Map config, Closure init){
        this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])
        this.addresses = Address.parseAddresses(config.addresses)
        this.init = init
        connectChannel()
    }

    void onDelivery(String queueName, Closure closure) {
        basicConsume(queueName)
        int i = 0

        while(true) {
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery()
                closure(delivery, channel)
                i = 0
            } catch(e) {
                // Only handle exceptions
                if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e

                i++
                e.printStackTrace()
                println "ShutdownSignalException recieved! Reconnection attempt #$i"
                connectChannel()
                basicConsume(queueName)
            }
        }
    }

    void publish(Closure closure) {
        int i = 0
        boolean retry = true
        while(retry) {
            try {
                closure(channel)
                i = 0
                retry = false
            } catch(e) {
                // Only handle exceptions
                if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e

                i++
                retry = true
                e.printStackTrace()
                println "ShutdownSignalException recieved! Reconnection attempt #$i"
                connectChannel()
            }
        }
    }


    void connectChannel() {
        connection = connectionFactory.newConnection(addresses)
        channel = connection.createChannel()

        println "Succesfully connected to $connection.address"

        if(init) {
            init(channel)
        }
    }

    void basicConsume(queueName) {
        consumer = new QueueingConsumer(channel)
        channel.basicConsume(queueName, false, consumer)
    }

    void close() {
        channel.waitForConfirmsOrDie()
        channel.close()
        connection.close()
    }
}



Output:

cat consumer-output.txt |grep Message | wc
   9957   19914  128330

cat producer-output.txt | grep Message | wc
  10000   20000  128890

Note that consumer lost 43 messages

Output from producer :
...
Message 1466
Message 1467
Message 1468
Message 1469
Message 1470
ShutdownSignalException recieved! Reconnection attempt #1
Succesfully connected to i-0000001a-zsm/172.16.158.46
Message 1471
Message 1472
Message 1473
Message 1474
Message 1475
Message 1476

...


Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.

The complete outputs are in https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There you can see reconnection log of both parts.
I've a strong feeling the publisher confirms is not well configured.

Please anyone could shed some light on the issue?

Cheers,
Pablo Molnar

PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: https://github.com/pablomolnar/rabbitmq_samples

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss-ETbvJ2rUIr4qBm01orBoR9BPR1lH4CV8@public.gmane.org
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


<div>
<p>Ok...so I'm maybe confused. Basically I follow this example:</p>
<div><br></div>
<div><a href="http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java">http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java</a></div>
<div><br></div>
<div>Using the sentences&nbsp;<span>ch.confirmSelect();&nbsp;</span>and&nbsp;<span>ch.waitForConfirmsOrDie(); &nbsp;</span>all in a durable queue.</div>
<div><br></div>
<div>This example doesn't cover republishing nacks?&nbsp;Do you have an example? I have to implement a&nbsp;<span>ConfirmListener?</span>
</div>
<div><span><br></span></div>Thanks for helping Simone!<div>
<br><div>
<div class="gmail_quote">On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <span dir="ltr">&lt;<a href="mailto:simone.busoli@...">simone.busoli@...</a>&gt;</span> wrote:<br><blockquote class="gmail_quote">
<p>Hi Pablo, looking at your code I can't see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn't arrive from the broker, which usually happens when the broker dies.</p>

<div class="gmail_quote">
<div>
<div class="h5">On Jan 31, 2012 11:52 PM, "Pablo Molnar" &lt;<a href="mailto:pablomolnar@..." target="_blank">pablomolnar@...</a>&gt; wrote:<br type="attribution">
</div>
</div>
<blockquote class="gmail_quote">
<div><div class="h5">
Hi all!<br><br>I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br>Test OK: Kill node while consuming:<br>1 - Setup a clean 3 node's cluster<br>

2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br><br>Test FAILED: Kill node while producing:<br>
1 - Setup a clean 3 node's cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br><br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages already published to node A are lost.<br><br><br>These are my settings:<br><br><span>rabbitmqctl status</span><br><span>Status of node 'rabbit <at> i-00000007-asm' ...</span><br><span>[{pid,11339},</span><br><span>&nbsp;{running_applications,</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp; [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {amqp_client,"RabbitMQ AMQP Client","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {rabbit,"RabbitMQ","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {os_mon,"CPO&nbsp; CXC 138 46","2.2.4"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {sasl,"SASL&nbsp; CXC 138 11","2.1.8"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {inets,"INETS&nbsp; CXC 138 49","5.2"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {mnesia,"MNESIA&nbsp; CXC 138 12","4.4.12"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {stdlib,"ERTS&nbsp; CXC 138 10","1.16.4"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {kernel,"ERTS&nbsp; CXC 138 10","2.13.4"}]},</span><br><span>&nbsp;{os,{unix,linux}},</span><br><span>&nbsp;{erlang_version,</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp; "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},</span><br><span>&nbsp;{memory,</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp; [{total,92565608},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {processes,4004968},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {processes_used,3996224},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {system,88560640},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {atom,1322033},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {atom_used,1291462},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {binary,32496},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {code,15264387},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {ets,1174192}]},</span><br><span>&nbsp;{vm_memory_high_watermark,0.3999999999362281},</span><br><span>&nbsp;{vm_memory_limit,<a href="tel:2508940902" value="+12508940902" target="_blank">2508940902</a>}]</span><br><span>...done.</span><br><br><br><br><span>rabbitmqctl cluster_status</span><br><span>Cluster status of node 'rabbit <at> i-00000007-asm' ...</span><br><span>[{nodes,[{disc,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {ram,['rabbit <at> i-00000009-asm']}]},</span><br><span>&nbsp;{running_nodes,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']}]</span><br><span>...done.</span><br><br><br><br>Java amqp-client 2.7.1<br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span>import com.rabbitmq.client.*</span><br><span>try{</span><br><br><span>// Get rabbitmq config</span><br><span>def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br><br><span>def rabbit = new RabbitHA(config)</span><br><span>rabbit.init = { channel -&gt;</span><br><span>&nbsp; channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange</span><br><span>&nbsp; channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy</span><br><span>&nbsp; channel.queueBind('myQueue', 'myExchange', '')</span><br><span>&nbsp; channel.confirmSelect()</span><br><span>}</span><br><br><span>10000.times { idx -&gt;</span><br><span>&nbsp; rabbit.publish { channel -&gt;</span><br><span>&nbsp;&nbsp;&nbsp; def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br><span>&nbsp;&nbsp;&nbsp; def msg = "Message $idx"</span><br><span>&nbsp;&nbsp;&nbsp; channel.basicPublish('myExchange', '', properties, msg.getBytes())</span><br><span>&nbsp;&nbsp;&nbsp; println msg</span><br><span>&nbsp; }</span><br><span>}</span><br><br><span>rabbit.close()</span><br><br><span>} catch(e){e.printStackTrace()}</span><br><br><br>- Consumer.groovy<br><br><span>import com.rabbitmq.client.*</span><br><br><span></span><span>try{</span><br><br><span>&nbsp; // Get rabbitmq config</span><br><span>&nbsp; def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br><br><span>&nbsp; </span><span>// Connect</span><br><span>&nbsp; </span><span>def rabbit = new RabbitHA(config)</span><br><span>&nbsp; </span><span>rabbit.onDelivery('myQueue'){ delivery, channel -&gt;</span><br><span>&nbsp; def msg = new String(delivery.body)</span><br><span>&nbsp; println msg</span><br><br><span>&nbsp; // Manual ack</span><br><span>&nbsp; channel.basicAck(delivery.envelope.deliveryTag, false)</span><br><span></span><br><span>} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span>import com.rabbitmq.client.*</span><br><br><span>/**≤/span><span></span><br><span>&nbsp;*</span><br><span>&nbsp;* RabbitMQ highly available proxy.</span><br><span>&nbsp;* Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br><span>&nbsp;*</span><br><span>&nbsp;*/</span><br><span>class RabbitHA {</span><br><span>&nbsp;&nbsp;&nbsp; ConnectionFactory connectionFactory</span><br><span>&nbsp;&nbsp;&nbsp; Address[] addresses</span><br><span>&nbsp;&nbsp;&nbsp; Closure init</span><br><br><span>&nbsp;&nbsp;&nbsp; Connection connection</span><br><span>&nbsp;&nbsp;&nbsp; Channel channel</span><br><span>&nbsp;&nbsp;&nbsp; QueueingConsumer consumer</span><br><br><span>&nbsp;&nbsp;&nbsp; public RabbitHA(Map config) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this(config, null)</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; public RabbitHA(Map config, Closure init){</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.addresses = Address.parseAddresses(config.addresses)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.init = init</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connectChannel()</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void onDelivery(String queueName, Closure closure) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; basicConsume(queueName)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int i = 0</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while(true) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; closure(delivery, channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i = 0</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch(e) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // Only handle exceptions</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i++</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>e.printStackTrace()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connectChannel()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; basicConsume(queueName)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void publish(Closure closure) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int i = 0</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>boolean retry = true</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while(retry) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; closure(channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i = 0</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>retry = false</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch(e) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // Only handle exceptions</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i++</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>retry = true</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; e.printStackTrace()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connectChannel()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><br><span>&nbsp;&nbsp;&nbsp; void connectChannel() {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = connectionFactory.newConnection(addresses)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; channel = connection.createChannel()</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; println "Succesfully connected to $connection.address"</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(init) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; init(channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void basicConsume(queueName) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; consumer = new QueueingConsumer(channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; channel.basicConsume(queueName, false, consumer)</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void close() {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>channel.waitForConfirmsOrDie()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>channel.close()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>connection.close()</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><span>}</span><br><br><br><br>

Output:<br><br><span>cat consumer-output.txt |grep Message | wc</span><br><span>&nbsp;&nbsp; 9957&nbsp;&nbsp; 19914&nbsp; 128330</span><br><br><span>cat producer-output.txt | grep Message | wc</span><br><span>&nbsp; 10000&nbsp;&nbsp; 20000&nbsp; 128890</span><br><br>Note that consumer lost 43 messages<br><br>Output from producer :<br><span>...</span><br><span><span>Message 1466</span><br><span>Message 1467</span><br><span>Message 1468</span><br><span>Message 1469</span><br><span>Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46" target="_blank">172.16.158.46</a><br>Message 1471<br>

Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br><span>...</span><br><br><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.<br><br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out" target="_blank">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>

I've a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples" target="_blank">https://github.com/pablomolnar/rabbitmq_samples</a><br><br>
</div></div>_______________________________________________<br>
rabbitmq-discuss mailing list<br><a href="mailto:rabbitmq-discuss@..." target="_blank">rabbitmq-discuss@...</a><br><a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br><br>
</blockquote>
</div>
</blockquote>
</div>
<br>
</div>
</div>
</div>
CURSO CERRADO | 1 Feb 03:47 2012
Picon

ACCIDENTES DEL TRABAJO "RESPONSABILIDAD CIVIL Y PENAL" Consulte precio especial por el mes de Febrero

CURSO CERRADO
ACCIDENTES DEL TRABAJO
"RESPONSABILIDAD CIVIL Y PENAL"
Consulte precio especial por el mes de Febrero
 
 
Objetivo:
Al término del curso los asistentes estarán en condiciones de identificar el sentido, alcance, extensión y efectos que genera para el empleador el deber de protección que el legislador le impone en relación a sus trabajadores. Asimismo, los asistentes podrán conocer las principales manifestaciones de este deber de protección. Además, podrán identificar las consecuencias laborales, civiles y penales, como las sanciones que pueden aplicarse a la empresa o a sus representantes en los casos que el accidente o enfermedad sea producto de la inobservancia del deber de protección.
 
Temario
Módulo N° 1
Fuentes del Deber de Protección
 
  • Artículo 184 del Código del Trabajo
  • Ley N° 16.744
  • Decreto Supremo N° 594
  • Decreto Supremo 40
  • Decreto Supremo 54
 
    • Otras normativas:
    • Los reglamentos internos
    • Los procedimientos reglados
    • Las instrucciones de las Mutuales
    • La lex artis
 
Modulo N° 2
La Responsabilidad Civil
 
  • Juicio laboral o juicio civil
  • Responsables:
 
    • Las personas jurídicas
    • Las personas naturales
 
  • Indemnización por daño emergente
 
    • Devengado: Gastos ya realizados
    • No devengado: Gastos por realizarse
 
  • Indemnización por lucro cesante
 
    • El cálculo
    • Los descuentos
    • El resto de la vida útil laboral del trabajador
 
  • Indemnización por daño moral
 
    • Concepto
    • Criterios Jurisprudenciales
 
  • Montos de las indemnizaciones
  • Titulares para demandar las indemnizaciones
 
    • Los familiares
    • La situación de la conviviente
    • Los amigos y otros
 
  • Peso de la prueba
  • Defensas
 
Módulo N° 3
La Responsabilidad Penal
 
  • Responsables
  • Cuasidelito de homicidio – Cuasidelito de lesiones
  • Medidas cautelares: Arraigo, medidas precautorias, prisión preventiva
  • Esquema del proceso penal, sus etapas
  • Revisabilidad de las transacciones
  • Sanciones: Privación de libertad, multas, etc.
  • Defensas
 
Módulo N° 4
Informes de un Accidente del Trabajo
 
  • Mutuales
  • Investigación de la empresa
 
    • El informe y su redacción como prevencionista
    • El informe y su redacción para juicio
 
  • Comité Paritario de Higiene y Seguridad
 
    • El derecho a disentir del representante de la empresa
 
  • Investigaciones de Chile
  • Carabineros de Chile
  • Servicio de Salud
  • Otros organismos especializados: Sernageomin, Autoridad Marítima, tec.
  • Periciales
 
Módulo N° 5
Medidas ante un Accidente del Trabajo
 
  • Discreción
  • Declaraciones de los trabajadores y ejecutivos
  • Fotos
  • Informes
  • Individualización de testigos
  • Declaración de testigos
 
Metodología: Clases expositivas, presentación en Power Point - Participación interactiva - Exposición y análisis de casos que afecten a la empresa.
 
Duración: 4 horas.
 
Lugar: En dependencias de su empresa o en nuestro salón de conferencias.
 
Inscripciones: con Patricia Villarroel al teléfono 02 3352448, o al e-mail pvillarroel-W9NKFRV1AToUy5906dfpTQ@public.gmane.org
 
Consulte precio especial por el mes de Febrero.
 
 
 

Atentamente,

Santiago Montt Vicuña
Presidente
Montt y Cía S.A.

 
De acuerdo a lo dispuesto en el artículo 28 b) de la Ley n° 19.496, Toda comunicación promocional o publicitaria enviada por correo electrónico deberá indicar la materia o asunto sobre el que versa, la identidad del remitente y contener una dirección válida a la que el destinatario pueda solicitar la suspensión de los envíos. En atención a lo anterior este e-mail no podrá ser considerado SPAM. Si desea ser removido de la lista de e-mail por favor solicítelo a: pvillarroel-W9NKFRV1AToUy5906dfpTQ@public.gmane.org
<div>
<table cellspacing="0" cellpadding="0" width="749" align="center" border="0">
<tr><td class="centro_titulo" width="743">CURSO CERRADO <br>ACCIDENTES DEL 
      TRABAJO<br>"RESPONSABILIDAD CIVIL Y PENAL"<br>Consulte precio especial por 
      el mes de Febrero</td></tr>
<tr><td>&nbsp;</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Objetivo:</td></tr>
<tr><td class="texto">Al t&eacute;rmino del curso los asistentes estar&aacute;n en condiciones 
      de identificar el sentido, alcance, extensi&oacute;n y efectos que genera para el 
      empleador el deber de protecci&oacute;n que el legislador le impone en relaci&oacute;n a 
      sus trabajadores. Asimismo, los asistentes podr&aacute;n conocer las principales 
      manifestaciones de este deber de protecci&oacute;n. Adem&aacute;s, podr&aacute;n identificar 
      las consecuencias laborales, civiles y penales, como las sanciones que 
      pueden aplicarse a la empresa o a sus representantes en los casos que el 
      accidente o enfermedad sea producto de la inobservancia del deber de 
      protecci&oacute;n.</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Temario</td></tr>
<tr><td class="texto">M&oacute;dulo N&deg; 1</td></tr>
<tr><td class="texto">Fuentes del Deber de Protecci&oacute;n</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Art&iacute;culo 184 del C&oacute;digo del Trabajo 
        </li>
<li>Ley N&deg; 16.744 
        </li>
<li>Decreto Supremo N&deg; 594 
        </li>
<li>Decreto Supremo 40 
        </li>
<li>Decreto Supremo 54 </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>Otras normativas: 
          </li>
<li>Los reglamentos internos 
          </li>
<li>Los procedimientos reglados 
          </li>
<li>Las instrucciones de las Mutuales 
          </li>
<li>La lex artis </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Modulo N&deg; 2</td></tr>
<tr><td class="texto">La Responsabilidad Civil</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Juicio laboral o juicio civil 
        </li>
<li>Responsables: </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>Las personas jur&iacute;dicas 
          </li>
<li>Las personas naturales </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><li>Indemnizaci&oacute;n por da&ntilde;o emergente </li></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>Devengado: Gastos ya realizados 
          </li>
<li>No devengado: Gastos por realizarse </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><li>Indemnizaci&oacute;n por lucro cesante </li></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>El c&aacute;lculo 
          </li>
<li>Los descuentos 
          </li>
<li>El resto de la vida &uacute;til laboral del trabajador </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><li>Indemnizaci&oacute;n por da&ntilde;o moral </li></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>Concepto 
          </li>
<li>Criterios Jurisprudenciales </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Montos de las indemnizaciones 
        </li>
<li>Titulares para demandar las indemnizaciones </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>Los familiares 
          </li>
<li>La situaci&oacute;n de la conviviente 
          </li>
<li>Los amigos y otros </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Peso de la prueba 
        </li>
<li>Defensas </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">M&oacute;dulo N&deg; 3</td></tr>
<tr><td class="texto">La Responsabilidad Penal</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Responsables 
        </li>
<li>Cuasidelito de homicidio &ndash; Cuasidelito de lesiones 
        </li>
<li>Medidas cautelares: Arraigo, medidas precautorias, prisi&oacute;n 
        preventiva 
        </li>
<li>Esquema del proceso penal, sus etapas 
        </li>
<li>Revisabilidad de las transacciones 
        </li>
<li>Sanciones: Privaci&oacute;n de libertad, multas, etc. 
        </li>
<li>Defensas </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">M&oacute;dulo N&deg; 4</td></tr>
<tr><td class="texto">Informes de un Accidente del 
Trabajo</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Mutuales 
        </li>
<li>Investigaci&oacute;n de la empresa </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul>
<li>El informe y su redacci&oacute;n como prevencionista 
          </li>
<li>El informe y su redacci&oacute;n para juicio </li>
</ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><li>Comit&eacute; Paritario de Higiene y Seguridad </li></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul><ul><li>El derecho a disentir del representante de la empresa 
    </li></ul></ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Investigaciones de Chile 
        </li>
<li>Carabineros de Chile 
        </li>
<li>Servicio de Salud 
        </li>
<li>Otros organismos especializados: Sernageomin, Autoridad Mar&iacute;tima, 
        tec. 
        </li>
<li>Periciales </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">M&oacute;dulo N&deg; 5</td></tr>
<tr><td class="texto">Medidas ante un Accidente del 
  Trabajo</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <ul>
<li>Discreci&oacute;n 
        </li>
<li>Declaraciones de los trabajadores y ejecutivos 
        </li>
<li>Fotos 
        </li>
<li>Informes 
        </li>
<li>Individualizaci&oacute;n de testigos 
        </li>
<li>Declaraci&oacute;n de testigos </li>
</ul>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Metodolog&iacute;a: Clases expositivas, presentaci&oacute;n en Power 
      Point - Participaci&oacute;n interactiva - Exposici&oacute;n y an&aacute;lisis de casos que 
      afecten a la empresa. </td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Duraci&oacute;n: 4 horas.</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Lugar: En dependencias de su empresa o en nuestro sal&oacute;n de 
      conferencias.</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
<span class="texto_negrita">Inscripciones:</span> con 
      Patricia Villarroel al tel&eacute;fono 02 3352448, o al e-mail 
      pvillarroel@...</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">Consulte precio especial por el mes de 
      Febrero.</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto">
      <p class="texto_negrita">Atentamente,</p>
      <p><span class="texto_negrita">Santiago Montt Vicu&ntilde;a<br>Presidente<br>Montt 
      y C&iacute;a S.A. </span><br></p>
</td></tr>
<tr><td class="texto">&nbsp;</td></tr>
<tr><td class="texto"><span class="texto_spam">De acuerdo a lo dispuesto en el 
      art&iacute;culo 28 b) de la Ley n&deg; 19.496, Toda comunicaci&oacute;n promocional o 
      publicitaria enviada por correo electr&oacute;nico deber&aacute; indicar la materia o 
      asunto sobre el que versa, la identidad del remitente y contener una 
      direcci&oacute;n v&aacute;lida a la que el destinatario pueda solicitar la suspensi&oacute;n de 
      los env&iacute;os. En atenci&oacute;n a lo anterior este e-mail no podr&aacute; ser considerado 
      SPAM. Si desea ser removido de la lista de e-mail por favor solic&iacute;telo a: 
      pvillarroel@...</span></td></tr>
</table>
</div>
MarcusR | 1 Feb 03:55 2012
Picon

Re: Keeping track of exchanges -- Any Advice

Direct routing would be great since all my code is already written for
direct routing. The problem with direct routing is rabbitmq (based on
the amqp spec) keeps throwing a channel exception if a message is sent
to an exchange that does not exist. Is there an extension within
rabbitmq that says "Pretend messages that get sent to a non-existant
exchange are sent and just discard the messages?"

If not, then I have to keep track of a LOT of exchanges that are
created and destroyed in an unpredictable matter. The only advantage
to the topic exchange routing is to work around the amqp throwing
channel exceptions as I can guarantee xxx topic exchanges can exist.
Roshan Pradeep | 1 Feb 04:44 2012
Picon

RabbitMQ log files rotation daily

Hi All

I want to rotate the RabbitMQ log files in daily basis like support in log4j. The only log rotate support I found is rabbitmqctl rotate_logs <suffix>.

Could some one please explain how to achieve the daily log file rotation. Also I am not sure what kind of suffix expect from rabbitmqctl rotate_logs.

Thanks.

<div><p>Hi All<br><br>I want to rotate the RabbitMQ log files in daily basis like support in log4j. The only log rotate support I found is rabbitmqctl rotate_logs &lt;suffix&gt;. <br><br>Could some one please explain how to achieve the daily log file rotation. Also I am not sure what kind of suffix expect from rabbitmqctl rotate_logs.<br><br>Thanks. <br></p></div>
Simone Busoli | 1 Feb 07:13 2012
Picon

Re: Lost messages in HA tests in a cluster

The name of the example is probably a little misleading, in that it simply allows you to know if messages may have been lost, but does nothing to cope with eventually lost messages.
You indeed have to implement a confirm listener and set up republishing yourself. Also, by definition republishing unconfirmed messages may lead to duplicate messages.

On Feb 1, 2012 1:27 AM, "Pablo Molnar" <pablomolnar-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:
Ok...so I'm maybe confused. Basically I follow this example:


Using the sentences ch.confirmSelect(); and ch.waitForConfirmsOrDie();  all in a durable queue.

This example doesn't cover republishing nacks? Do you have an example? I have to implement a ConfirmListener?

Thanks for helping Simone!

On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <simone.busoli-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:

Hi Pablo, looking at your code I can't see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn't arrive from the broker, which usually happens when the broker dies.

On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:
Hi all!

I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.

Test OK: Kill node while consuming:
1 - Setup a clean 3 node's cluster
2 - Execute producer with 10.000 messages connected to node A
3 - Wait producer to finish
4 - Execute consumer connected to node A
5 - While consumer is running kill node A

Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.

Test FAILED: Kill node while producing:
1 - Setup a clean 3 node's cluster
2 - Execute consumer to start listening connected node A
3 - Execute producer with 10.000 messages connected to node A
4 - While producer is running kill node A

Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages already published to node A are lost.


These are my settings:

rabbitmqctl status
Status of node 'rabbit <at> i-00000007-asm' ...
[{pid,11339},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
      {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
      {rabbit,"RabbitMQ","2.7.1"},
      {os_mon,"CPO  CXC 138 46","2.2.4"},
      {sasl,"SASL  CXC 138 11","2.1.8"},
      {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
      {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
      {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
      {inets,"INETS  CXC 138 49","5.2"},
      {mnesia,"MNESIA  CXC 138 12","4.4.12"},
      {stdlib,"ERTS  CXC 138 10","1.16.4"},
      {kernel,"ERTS  CXC 138 10","2.13.4"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},
 {memory,
     [{total,92565608},
      {processes,4004968},
      {processes_used,3996224},
      {system,88560640},
      {atom,1322033},
      {atom_used,1291462},
      {binary,32496},
      {code,15264387},
      {ets,1174192}]},
 {vm_memory_high_watermark,0.3999999999362281},
 {vm_memory_limit,2508940902}]
...done.



rabbitmqctl cluster_status
Cluster status of node 'rabbit <at> i-00000007-asm' ...
[{nodes,[{disc,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']},
         {ram,['rabbit <at> i-00000009-asm']}]},
 {running_nodes,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']}]
...done.



Java amqp-client 2.7.1


- Producer.groovy (java amqp-client 2.7.1)

import com.rabbitmq.client.*
try{

// Get rabbitmq config
def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())

def rabbit = new RabbitHA(config)
rabbit.init = { channel ->
  channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange
  channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy
  channel.queueBind('myQueue', 'myExchange', '')
  channel.confirmSelect()
}

10000.times { idx ->
  rabbit.publish { channel ->
    def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent
    def msg = "Message $idx"
    channel.basicPublish('myExchange', '', properties, msg.getBytes())
    println msg
  }
}

rabbit.close()

} catch(e){e.printStackTrace()}


- Consumer.groovy

import com.rabbitmq.client.*

try{

  // Get rabbitmq config
  def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())

  // Connect
  def rabbit = new RabbitHA(config)
  rabbit.onDelivery('myQueue'){ delivery, channel ->
  def msg = new String(delivery.body)
  println msg

  // Manual ack
  channel.basicAck(delivery.envelope.deliveryTag, false)

} catch(e){e.printStackTrace()}


- RabbitHA.groovy

import com.rabbitmq.client.*

/**
 *
 * RabbitMQ highly available proxy.
 * Basic implementation of a basic suscriber/publisher with reconnect logic.
 *
 */
class RabbitHA {
    ConnectionFactory connectionFactory
    Address[] addresses
    Closure init

    Connection connection
    Channel channel
    QueueingConsumer consumer

    public RabbitHA(Map config) {
        this(config, null)
    }

    public RabbitHA(Map config, Closure init){
        this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])
        this.addresses = Address.parseAddresses(config.addresses)
        this.init = init
        connectChannel()
    }

    void onDelivery(String queueName, Closure closure) {
        basicConsume(queueName)
        int i = 0

        while(true) {
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery()
                closure(delivery, channel)
                i = 0
            } catch(e) {
                // Only handle exceptions
                if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e

                i++
                e.printStackTrace()
                println "ShutdownSignalException recieved! Reconnection attempt #$i"
                connectChannel()
                basicConsume(queueName)
            }
        }
    }

    void publish(Closure closure) {
        int i = 0
        boolean retry = true
        while(retry) {
            try {
                closure(channel)
                i = 0
                retry = false
            } catch(e) {
                // Only handle exceptions
                if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e

                i++
                retry = true
                e.printStackTrace()
                println "ShutdownSignalException recieved! Reconnection attempt #$i"
                connectChannel()
            }
        }
    }


    void connectChannel() {
        connection = connectionFactory.newConnection(addresses)
        channel = connection.createChannel()

        println "Succesfully connected to $connection.address"

        if(init) {
            init(channel)
        }
    }

    void basicConsume(queueName) {
        consumer = new QueueingConsumer(channel)
        channel.basicConsume(queueName, false, consumer)
    }

    void close() {
        channel.waitForConfirmsOrDie()
        channel.close()
        connection.close()
    }
}



Output:

cat consumer-output.txt |grep Message | wc
   9957   19914  128330

cat producer-output.txt | grep Message | wc
  10000   20000  128890

Note that consumer lost 43 messages

Output from producer :
...
Message 1466
Message 1467
Message 1468
Message 1469
Message 1470
ShutdownSignalException recieved! Reconnection attempt #1
Succesfully connected to i-0000001a-zsm/172.16.158.46
Message 1471
Message 1472
Message 1473
Message 1474
Message 1475
Message 1476

...


Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.

The complete outputs are in https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There you can see reconnection log of both parts.
I've a strong feeling the publisher confirms is not well configured.

Please anyone could shed some light on the issue?

Cheers,
Pablo Molnar

PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: https://github.com/pablomolnar/rabbitmq_samples

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss-ETbvJ2rUIr4qBm01orBoR9BPR1lH4CV8@public.gmane.org
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


<div>
<p>The name of the example is probably a little misleading, in that it simply allows you to know if messages may have been lost, but does nothing to cope with eventually lost messages.<br>
You indeed have to implement a confirm listener and set up republishing yourself. Also, by definition republishing unconfirmed messages may lead to duplicate messages.</p>
<div class="gmail_quote">On Feb 1, 2012 1:27 AM, "Pablo Molnar" &lt;<a href="mailto:pablomolnar@...">pablomolnar@...</a>&gt; wrote:<br type="attribution"><blockquote class="gmail_quote">
Ok...so I'm maybe confused. Basically I follow this example:<div><br></div>
<div><a href="http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java" target="_blank">http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java</a></div>

<div><br></div>
<div>Using the sentences&nbsp;<span>ch.confirmSelect();&nbsp;</span>and&nbsp;<span>ch.waitForConfirmsOrDie(); &nbsp;</span>all in a durable queue.</div>

<div><br></div>
<div>This example doesn't cover republishing nacks?&nbsp;Do you have an example? I have to implement a&nbsp;<span>ConfirmListener?</span>
</div>

<div><span><br></span></div>Thanks for helping Simone!<div>
<br><div>
<div class="gmail_quote">On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <span dir="ltr">&lt;<a href="mailto:simone.busoli@..." target="_blank">simone.busoli@...</a>&gt;</span> wrote:<br><blockquote class="gmail_quote">
<p>Hi Pablo, looking at your code I can't see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn't arrive from the broker, which usually happens when the broker dies.</p>

<div class="gmail_quote">
<div>
<div>On Jan 31, 2012 11:52 PM, "Pablo Molnar" &lt;<a href="mailto:pablomolnar@..." target="_blank">pablomolnar@...</a>&gt; wrote:<br type="attribution">
</div>
</div>
<blockquote class="gmail_quote">
<div><div>
Hi all!<br><br>I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br>Test OK: Kill node while consuming:<br>1 - Setup a clean 3 node's cluster<br>

2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br><br>Test FAILED: Kill node while producing:<br>
1 - Setup a clean 3 node's cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br><br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages already published to node A are lost.<br><br><br>These are my settings:<br><br><span>rabbitmqctl status</span><br><span>Status of node 'rabbit <at> i-00000007-asm' ...</span><br><span>[{pid,11339},</span><br><span>&nbsp;{running_applications,</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp; [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {amqp_client,"RabbitMQ AMQP Client","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {rabbit,"RabbitMQ","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {os_mon,"CPO&nbsp; CXC 138 46","2.2.4"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {sasl,"SASL&nbsp; CXC 138 11","2.1.8"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {inets,"INETS&nbsp; CXC 138 49","5.2"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {mnesia,"MNESIA&nbsp; CXC 138 12","4.4.12"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {stdlib,"ERTS&nbsp; CXC 138 10","1.16.4"},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {kernel,"ERTS&nbsp; CXC 138 10","2.13.4"}]},</span><br><span>&nbsp;{os,{unix,linux}},</span><br><span>&nbsp;{erlang_version,</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp; "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},</span><br><span>&nbsp;{memory,</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp; [{total,92565608},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {processes,4004968},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {processes_used,3996224},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {system,88560640},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {atom,1322033},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {atom_used,1291462},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {binary,32496},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {code,15264387},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {ets,1174192}]},</span><br><span>&nbsp;{vm_memory_high_watermark,0.3999999999362281},</span><br><span>&nbsp;{vm_memory_limit,<a href="tel:2508940902" value="+12508940902" target="_blank">2508940902</a>}]</span><br><span>...done.</span><br><br><br><br><span>rabbitmqctl cluster_status</span><br><span>Cluster status of node 'rabbit <at> i-00000007-asm' ...</span><br><span>[{nodes,[{disc,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']},</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {ram,['rabbit <at> i-00000009-asm']}]},</span><br><span>&nbsp;{running_nodes,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']}]</span><br><span>...done.</span><br><br><br><br>Java amqp-client 2.7.1<br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span>import com.rabbitmq.client.*</span><br><span>try{</span><br><br><span>// Get rabbitmq config</span><br><span>def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br><br><span>def rabbit = new RabbitHA(config)</span><br><span>rabbit.init = { channel -&gt;</span><br><span>&nbsp; channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange</span><br><span>&nbsp; channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy</span><br><span>&nbsp; channel.queueBind('myQueue', 'myExchange', '')</span><br><span>&nbsp; channel.confirmSelect()</span><br><span>}</span><br><br><span>10000.times { idx -&gt;</span><br><span>&nbsp; rabbit.publish { channel -&gt;</span><br><span>&nbsp;&nbsp;&nbsp; def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br><span>&nbsp;&nbsp;&nbsp; def msg = "Message $idx"</span><br><span>&nbsp;&nbsp;&nbsp; channel.basicPublish('myExchange', '', properties, msg.getBytes())</span><br><span>&nbsp;&nbsp;&nbsp; println msg</span><br><span>&nbsp; }</span><br><span>}</span><br><br><span>rabbit.close()</span><br><br><span>} catch(e){e.printStackTrace()}</span><br><br><br>- Consumer.groovy<br><br><span>import com.rabbitmq.client.*</span><br><br><span></span><span>try{</span><br><br><span>&nbsp; // Get rabbitmq config</span><br><span>&nbsp; def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br><br><span>&nbsp; </span><span>// Connect</span><br><span>&nbsp; </span><span>def rabbit = new RabbitHA(config)</span><br><span>&nbsp; </span><span>rabbit.onDelivery('myQueue'){ delivery, channel -&gt;</span><br><span>&nbsp; def msg = new String(delivery.body)</span><br><span>&nbsp; println msg</span><br><br><span>&nbsp; // Manual ack</span><br><span>&nbsp; channel.basicAck(delivery.envelope.deliveryTag, false)</span><br><span></span><br><span>} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span>import com.rabbitmq.client.*</span><br><br><span>/**≤/span><span></span><br><span>&nbsp;*</span><br><span>&nbsp;* RabbitMQ highly available proxy.</span><br><span>&nbsp;* Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br><span>&nbsp;*</span><br><span>&nbsp;*/</span><br><span>class RabbitHA {</span><br><span>&nbsp;&nbsp;&nbsp; ConnectionFactory connectionFactory</span><br><span>&nbsp;&nbsp;&nbsp; Address[] addresses</span><br><span>&nbsp;&nbsp;&nbsp; Closure init</span><br><br><span>&nbsp;&nbsp;&nbsp; Connection connection</span><br><span>&nbsp;&nbsp;&nbsp; Channel channel</span><br><span>&nbsp;&nbsp;&nbsp; QueueingConsumer consumer</span><br><br><span>&nbsp;&nbsp;&nbsp; public RabbitHA(Map config) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this(config, null)</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; public RabbitHA(Map config, Closure init){</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.addresses = Address.parseAddresses(config.addresses)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.init = init</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connectChannel()</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void onDelivery(String queueName, Closure closure) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; basicConsume(queueName)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int i = 0</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while(true) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; closure(delivery, channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i = 0</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch(e) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // Only handle exceptions</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i++</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>e.printStackTrace()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connectChannel()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; basicConsume(queueName)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void publish(Closure closure) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; int i = 0</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>boolean retry = true</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while(retry) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; closure(channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i = 0</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>retry = false</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch(e) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // Only handle exceptions</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; i++</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>retry = true</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; e.printStackTrace()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connectChannel()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><br><span>&nbsp;&nbsp;&nbsp; void connectChannel() {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = connectionFactory.newConnection(addresses)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; channel = connection.createChannel()</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; println "Succesfully connected to $connection.address"</span><br><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(init) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; init(channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void basicConsume(queueName) {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; consumer = new QueueingConsumer(channel)</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; channel.basicConsume(queueName, false, consumer)</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><br><span>&nbsp;&nbsp;&nbsp; void close() {</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>channel.waitForConfirmsOrDie()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>channel.close()</span><br><span>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </span><span>connection.close()</span><br><span>&nbsp;&nbsp;&nbsp; }</span><br><span>}</span><br><br><br><br>

Output:<br><br><span>cat consumer-output.txt |grep Message | wc</span><br><span>&nbsp;&nbsp; 9957&nbsp;&nbsp; 19914&nbsp; 128330</span><br><br><span>cat producer-output.txt | grep Message | wc</span><br><span>&nbsp; 10000&nbsp;&nbsp; 20000&nbsp; 128890</span><br><br>Note that consumer lost 43 messages<br><br>Output from producer :<br><span>...</span><br><span><span>Message 1466</span><br><span>Message 1467</span><br><span>Message 1468</span><br><span>Message 1469</span><br><span>Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46" target="_blank">172.16.158.46</a><br>Message 1471<br>

Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br><span>...</span><br><br><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.<br><br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out" target="_blank">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>

I've a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples" target="_blank">https://github.com/pablomolnar/rabbitmq_samples</a><br><br>
</div></div>_______________________________________________<br>
rabbitmq-discuss mailing list<br><a href="mailto:rabbitmq-discuss@..." target="_blank">rabbitmq-discuss@...</a><br><a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br><br>
</blockquote>
</div>
</blockquote>
</div>
<br>
</div>
</div>
</blockquote>
</div>
</div>
Steve Powell | 1 Feb 11:35 2012

Re: ShutdownSignalException second 'channel.open'

Yogesh,

Thanks for your explanation of your application structure, and the version you
are running.

I do not know how the second channel.open AMQP command (for the same channel)
was sent to the broker; can you send some more diagnostics of this failure (log,
full stack trace) so I can raise a bug. If possible, a demonstrating small
program. Thanks.

I see your difficulty. I would hope that creating 20000 queues (which are empty
almost all of the time) would not take up too much room, so I think you should
consider not deleting them at all. This would solve most of your problem.

The x-expires attribute for queues, really ought to have a 'only-if-empty'
option. I'll raise a bug (24722) for this, and see how difficult it would be.

I do not understand what the _Return queues are for. They look as though you are
using them as a private persistent store for each App. It would be better if you
put this information somewhere else, but heigh-ho -- you can safely delete these
queues since this app is in complete control of them, and the events are
processed serially.

One of the reasons, I don't do basicConsume on queues is, I am going to have
thousands of queues. I rather thought it would be easier to have a thread pool,
each consuming just one message from a queue in round-robin fashion.

Just because you have thousands of queues doesn't mean you need to have
thousands of Consumer instances -- it is quite ok to use the same Consumer on
several basicConsume calls. The consumerTag is passed to every Consumer callback
so the Consumer code can distinguish for which basicConsume it is called.

By default, the Java client will allocate a pool of 5 threads for each
connection, and dispatch Consumer callbacks onto one of those threads for you.
So your thread pool might be unnecessary if you used basicConsume(). I'd
consider doing that since basicGet() can be slow, and you'd have to get from
each potential queue just to tell if there is a message for it!

Steve Powell  (a happy bunny)
----------some more definitions from the SPD----------
vermin (v.) Treating the dachshund for roundworm.
chinchilla (n.) Cooling device for the lower jaw.
socialcast (n.) Someone to whom everyone is speaking but nobody likes.

On 31 Jan 2012, at 13:56, Yogesh Ketkar wrote:

> I am using RabbitMQ server version 2.7.1.
> Java Client Jar used is
> rabbitmq-java-client-bin-2.7.1/rabbitmq-client.jar
> 
>> The Shutdown seems to be being called because the channel is being opened twice.
>> The broker complains about this and closes the connection. Are you creating
>> channels on different threads simultaneously?
> Yes indeed, I am creating channels on different threads.
> 
>> I don't think the basicPublish will fail if the queue doesn't exist. Why would
>> you create a new channel in this case?
> Yes, you are right. I will basically lose the message in this case.
> 
> Now about overall problem statement.
> My application has a main queue which looks like
> MainQueue
>  - App1-Event1
>  - App2-Event1
>  - App1-Event2
>  - App1-Event3
>  - App3-Event1
>  - App3-Event2
>  - App2-Event2
> 
> Basically there are going to be events from different Apps (there can
> be thousands of apps) and events belonging to an App must
> be processed sequentially. Events across different apps can and should
> be be processed in parallel.
> So I have only one consumer on MainQueue (using basicConsume) which
> reads events from MainQueue and just moves it to appropriate declared/
> redeclared queue.
> So this is how new queue structure would look like.
> 
> App1
>  - App1-Event1
>  - App1-Event2
>  - App1-Event3
> 
> App2
>  - App2-Event1
>  - App2-Event2
> 
> App3
>  - App3-Event1
>  - App3-Event2
> 
> 
> Now again when Event1 is processed from Queue App1, Event2 of App1
> can't be processed unless processing of Event1 is complete.
> Processing of event involves asynchronous communication with external
> systems, so once Event1 is fetched (and acknowledged) from queue
> App1,
> I create another queue like
> App1_Return
>  - App1-Event1-TaskId
> 
> I need to query external system using TaskId after certain time
> interval, to check status of event processing of Event1. Once I get
> the status (either sucess or failure)
> I discard App1-Event1-TaskId and ready to process App1-Event2. So all
> _Return queues will only have one event at any point of time.
> 
> An event on an app might even occur once a day. So I don't want to
> keep so many queues (potentially 20000 if there are 10000 apps)
> hanging around.
> Both auto_delete and x-expires are not very useful as in both the
> schemes, queues get deleted even when they have messages.
> Ideally whenever last message from any Queue (except MainQueue) is
> consumed, I want to delete that queue. Of course, one has to make sure
> while a queue is getting deleted, there might be an event destined for
> that. So if one guy is doing
> queueDelete('somequeue', true, true) and other guy is doing
> queueDeclare, queueBind, basicPublish. If queueDelete gets executed
> after
> queueBind, message will be lost.
> 
> One of the reasons, I don't do basicConsume on queues is, I am going
> to have thousands of queues. I rather thought it would be easier to
> have a thread pool, each consuming just one message from a queue in
> round-robin fashion.
> 
> As was mentioned in some other response, I will certainly not create a
> new channel in every thread, but would rather try and reuse them.
> 
> 
> Thanks for all the help.
> Regards, Yogesh
> 
> 
> 
> On Jan 31, 5:19 pm, Steve Powell <st...@...> wrote:
>> Yogesh,
>> 
>> Please can you provide some information about your environment? And your
>> application? What version of RabbitMQ (and client) are you using?
>> 
>> In your stack trace the ShutdownListener you registered is apparently being
>> called, because the Connection is being shut down. It is not clear why this
>> exception (and its associated stack trace) appears, it seems to come from your
>> Listener code, but perhaps that does nothing.
>> 
>> The Shutdown seems to be being called because the channel is being opened twice.
>> The broker complains about this and closes the connection. Are you creating
>> channels on different threads simultaneously? (Looking at your app 'design' you
>> might be.) Depending upon the version of RabbitMQ this might cause a problem.
>> 
>> I'm afraid your application design is unclear:
>> 
>>> This is how I handle doing basicPublish and basicGet on potentially
>>> non-existent queues
>>> - publish involves 3 steps
>>>  queueDeclare
>>>  queueBind
>>>  basicPublish
>>>  If some other thread deletes the queue after either queueDeclare or
>>> queueBind, basicPublish fails and I again create a new
>>>  channel and do these operations
>> 
>> I don't think the basicPublish will fail if the queue doesn't exist. Why would
>> you create a new channel in this case?
>> 
>> Please explain why you expect the queue might be deleted by some other thread.
>> 
>>> - if basicGet fails, I simply ignore it
>> 
>> What do you mean by ignoring it? Do you poll the queue periodically? Why aren't
>> you using basicConsume and a Consumer to get messages (which will be notified if
>> the queue is deleted)?
>> 
>> Steve Powell  (a loopy bunny)
>> ----------some more definitions from the SPD----------
>> vermin (v.) Treating the dachshund for roundworm.
>> chinchilla (n.) Cooling device for the lower jaw.
>> socialcast (n.) Someone to whom everyone is speaking but nobody likes.
>> 
>> On 30 Jan 2012, at 04:33, Yogesh Ketkar wrote:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> Only operations I ever do with com.rabbitmq.client.Connection in the
>>> code are
>>>    c.addShutdownListener
>>>    c.createChannel
>> 
>>> What does this error signify?
>> 
>>> 2012-01-30 09:44:45,158 ERROR  [ConnectionShutdownHandler]
>>> ShutdownListener
>>> com.rabbitmq.client.ShutdownSignalException: connection error; reason:
>>> {#method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID
>>> - second 'channel.open' seen, class-id=20, method-id=10), null,
>>> "[B <at> 105691e"}
>>>    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:
>>> 641)
>>>    at
>>> com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection. java:
>>> 599)
>>>    at
>>> com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection. java:
>>> 571)
>>>    at com.rabbitmq.client.impl.AMQConnection
>>> $1.processAsync(AMQConnection.java:88)
>>>    at
>>> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel .java:
>>> 144)
>>>    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:
>>> 91)
>>>    at com.rabbitmq.client.impl.AMQConnection
>>> $MainLoop.run(AMQConnection.java:500)
>> 
>>> Some additional info.
>>> I create and close thousands of channels in the code. But at any point
>>> of time there are not more than 20/21 channels open.
>>> This is how I handle doing basicPublish and basicGet on potentially
>>> non-existent queues
>>> - publish involves 3 steps
>>>  queueDeclare
>>>  queueBind
>>>  basicPublish
>>>  If some other thread deletes the queue after either queueDeclare or
>>> queueBind, basicPublish fails and I again create a new
>>>  channel and do these operations
>>> - if basicGet fails, I simply ignore it
>> 
>>> regards, Yogesh
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-disc...@...
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>> 
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-disc...@...://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss@...
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Steve Powell | 1 Feb 11:42 2012

Re: Problem with RabbitMQ Java Client (Memory leak)

Yogesh,

Thank you for this problem report. First point: each connection defines a
(hidden) channel zero, so that explains why there are 20001 of some objects and
20000 of others.

As to why these are not garbage collected, we must be holding a reference
somewhere (probably in the Connection).

We will investigate. I have raised a bug (24723).

Steve Powell  (a happy bunny)
----------some more definitions from the SPD----------
vermin (v.) Treating the dachshund for roundworm.
chinchilla (n.) Cooling device for the lower jaw.
socialcast (n.) Someone to whom everyone is speaking but nobody likes.

On 31 Jan 2012, at 17:49, Yogesh Ketkar wrote:

> I am using rabbitmq-java-client-bin-2.7.1/rabbitmq-client.jar with
> RabbitMQ 2.7.1 Server.
> 
> Just run this code
> 
> public static void main(String[] argv) throws IOException {
>    ConnectionFactory f = new ConnectionFactory();
>    Connection c = f.newConnection();
>    for(int i = 0; i < 20000; ++i) {
>        Channel ch = c.createChannel();
>        ch.close();
>    }
>    System.gc();
>    while(true) {}
> }
> 
> While program is running, observe the memory usage, it keeps on
> increasing
> Get process id and do following
> jmap -dump:format=b,file=yo.bin  <pid>
> jhat jhat -J-mx768m -stack false yo.bin
> Once jhat brings up http server, go here
> http://localhost:7000/showInstanceCounts/
> 
> 20001 instances of class com.rabbitmq.client.impl.AMQCommand
> 20001 instances of class com.rabbitmq.client.impl.CommandAssembler
> 20000 instances of class com.rabbitmq.client.ShutdownSignalException
> 20000 instances of class com.rabbitmq.client.impl.AMQImpl$Channel
> $Close
> 20000 instances of class com.rabbitmq.client.impl.ChannelN
> 20000 instances of class com.rabbitmq.client.impl.ConsumerDispatcher
> 
> In-spite of closing all the channels, not only channels but associated
> objects (ShutdownSignalException, ChannelClose) continue to consume
> memory.
> For sure, knowing this now, I will use channelCreate call sparingly.
> 
> Is there a way to remedy this situation?
> What is also worrying is instances of AMQCommand, so would this be the
> case for each and every call of client library?
> 
> regards, Yogesh
> 
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss@...
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Alexis Richardson | 1 Feb 11:46 2012

Re: [announce] New maintainer for the rabbitmq-c library

Congratulations Alan and thank-you very much for taking this on!

alexis

On Tue, Jan 31, 2012 at 8:28 PM, Alan Antonuk <alan.antonuk@...> wrote:
> Hi;
>
> I am happy to announce that with the blessing of the RabbitMQ team, I have
> taken the lead on further development of the C RabbitMQ client library, also
> known as rabbitmq-c.
>
> The new home for the code is now on github:
> https://github.com/alanxz/rabbitmq-c
>
> My long term goal with the library is to have simple, stable, and scalable
> library that has feature-parity of that of the Java and .NET clients.
>
> Some near term goals I have for the library include:
> - Documenting the APIs presented by the library
> - Merging in channel-oriented memory
> management https://github.com/alanxz/rabbitmq-c/pull/5
> - Providing some patterns for consuming messages to reduce the amount of
> boiler plate code users have to write.
>
>
> I want to encourage users of the library to get involved in the development
> of the library. Good ways to get involved include:
>  - Forking the library on github and submitting a pull request
>  - Submitting any bugs, or feature requests to the issue tracker on
> github https://github.com/alanxz/rabbitmq-c/issues
>  - Sparking up a discussion or simply asking questions on usage on the
> rabbitmq-discuss listserv
>
> -Alan
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss@...
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
Yogesh Ketkar | 1 Feb 12:21 2012
Picon

Re: Problem with RabbitMQ Java Client (Memory leak)

Thanks Steve.
Can you give me some idea about date of next Java Client release (with
this bug-fix)?

regards, Yogesh

On Feb 1, 3:42 pm, Steve Powell <st...@...> wrote:
> Yogesh,
>
> Thank you for this problem report. First point: each connection defines a
> (hidden) channel zero, so that explains why there are 20001 of some objects and
> 20000 of others.
>
> As to why these are not garbage collected, we must be holding a reference
> somewhere (probably in the Connection).
>
> We will investigate. I have raised a bug (24723).
>
> Steve Powell  (a happy bunny)
> ----------some more definitions from the SPD----------
> vermin (v.) Treating the dachshund for roundworm.
> chinchilla (n.) Cooling device for the lower jaw.
> socialcast (n.) Someone to whom everyone is speaking but nobody likes.
>
> On 31 Jan 2012, at 17:49, Yogesh Ketkar wrote:
>
>
>
>
>
>
>
>
>
> > I am using rabbitmq-java-client-bin-2.7.1/rabbitmq-client.jar with
> > RabbitMQ 2.7.1 Server.
>
> > Just run this code
>
> > public static void main(String[] argv) throws IOException {
> >    ConnectionFactory f = new ConnectionFactory();
> >    Connection c = f.newConnection();
> >    for(int i = 0; i < 20000; ++i) {
> >        Channel ch = c.createChannel();
> >        ch.close();
> >    }
> >    System.gc();
> >    while(true) {}
> > }
>
> > While program is running, observe the memory usage, it keeps on
> > increasing
> > Get process id and do following
> > jmap -dump:format=b,file=yo.bin  <pid>
> > jhat jhat -J-mx768m -stack false yo.bin
> > Once jhat brings up http server, go here
> >http://localhost:7000/showInstanceCounts/
>
> > 20001 instances of class com.rabbitmq.client.impl.AMQCommand
> > 20001 instances of class com.rabbitmq.client.impl.CommandAssembler
> > 20000 instances of class com.rabbitmq.client.ShutdownSignalException
> > 20000 instances of class com.rabbitmq.client.impl.AMQImpl$Channel
> > $Close
> > 20000 instances of class com.rabbitmq.client.impl.ChannelN
> > 20000 instances of class com.rabbitmq.client.impl.ConsumerDispatcher
>
> > In-spite of closing all the channels, not only channels but associated
> > objects (ShutdownSignalException, ChannelClose) continue to consume
> > memory.
> > For sure, knowing this now, I will use channelCreate call sparingly.
>
> > Is there a way to remedy this situation?
> > What is also worrying is instances of AMQCommand, so would this be the
> > case for each and every call of client library?
>
> > regards, Yogesh
>
> > _______________________________________________
> > rabbitmq-discuss mailing list
> > rabbitmq-disc...@...
> >https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc...@...://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Liuzhuofu | 1 Feb 13:00 2012

How can I get the event that one connected mq client have exited ?

Hi all:

In my system, many client will connect to the rabbit server, and they send msg to eachother.

My problem is: when one client coredown (lost the connection with rabbit server), I must catch this event quickly, and broad this event to all the other clients as soon as I can. What's is the best way to get the event (one client coredown)?

Do I need to develop a plugin and add it to rabbit server?

thanks
David
<div>
<div>Hi all:
<div><br></div>
<div>In my system, many client will connect to the rabbit server, and they send msg to eachother.
<div><br></div>
<div>My problem is: when one client coredown (lost the connection with rabbit server), I must catch this event quickly, and broad this event to all the other clients as soon as I can. What's is the best way to get the event (one client coredown)?</div>
<div><br></div>
<div>Do I need to develop a plugin and add it to rabbit server?</div>
</div>
<div><br></div>
<div>thanks</div>
<div>David</div>
</div>
</div>

Gmane