Re: Lost messages in HA tests in a cluster
Ok...so I'm maybe confused. Basically I follow this example:
Hi Pablo, looking at your code I can't see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn't arrive from the broker, which usually happens when the broker dies.
On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:
_______________________________________________Hi all!
I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.
Test OK: Kill node while consuming:
1 - Setup a clean 3 node's cluster
2 - Execute producer with 10.000 messages connected to node A
3 - Wait producer to finish
4 - Execute consumer connected to node A
5 - While consumer is running kill node A
Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.
Test FAILED: Kill node while producing:
1 - Setup a clean 3 node's cluster
2 - Execute consumer to start listening connected node A
3 - Execute producer with 10.000 messages connected to node A
4 - While producer is running kill node A
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages already published to node A are lost.
These are my settings:
rabbitmqctl status
Status of node 'rabbit <at> i-00000007-asm' ...
[{pid,11339},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
{amqp_client,"RabbitMQ AMQP Client","2.7.1"},
{rabbit,"RabbitMQ","2.7.1"},
{os_mon,"CPO CXC 138 46","2.2.4"},
{sasl,"SASL CXC 138 11","2.1.8"},
{rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
{webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
{mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
{inets,"INETS CXC 138 49","5.2"},
{mnesia,"MNESIA CXC 138 12","4.4.12"},
{stdlib,"ERTS CXC 138 10","1.16.4"},
{kernel,"ERTS CXC 138 10","2.13.4"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},
{memory,
[{total,92565608},
{processes,4004968},
{processes_used,3996224},
{system,88560640},
{atom,1322033},
{atom_used,1291462},
{binary,32496},
{code,15264387},
{ets,1174192}]},
{vm_memory_high_watermark,0.3999999999362281},
{vm_memory_limit,2508940902}]
...done.
rabbitmqctl cluster_status
Cluster status of node 'rabbit <at> i-00000007-asm' ...
[{nodes,[{disc,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']},
{ram,['rabbit <at> i-00000009-asm']}]},
{running_nodes,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']}]
...done.
Java amqp-client 2.7.1
- Producer.groovy (java amqp-client 2.7.1)
import com.rabbitmq.client.*
try{
// Get rabbitmq config
def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())
def rabbit = new RabbitHA(config)
rabbit.init = { channel ->
channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange
channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy
channel.queueBind('myQueue', 'myExchange', '')
channel.confirmSelect()
}
10000.times { idx ->
rabbit.publish { channel ->
def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent
def msg = "Message $idx"
channel.basicPublish('myExchange', '', properties, msg.getBytes())
println msg
}
}
rabbit.close()
} catch(e){e.printStackTrace()}
- Consumer.groovy
import com.rabbitmq.client.*
try{
// Get rabbitmq config
def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())
// Connect
def rabbit = new RabbitHA(config)
rabbit.onDelivery('myQueue'){ delivery, channel ->
def msg = new String(delivery.body)
println msg
// Manual ack
channel.basicAck(delivery.envelope.deliveryTag, false)
} catch(e){e.printStackTrace()}
- RabbitHA.groovy
import com.rabbitmq.client.*
/**
*
* RabbitMQ highly available proxy.
* Basic implementation of a basic suscriber/publisher with reconnect logic.
*
*/
class RabbitHA {
ConnectionFactory connectionFactory
Address[] addresses
Closure init
Connection connection
Channel channel
QueueingConsumer consumer
public RabbitHA(Map config) {
this(config, null)
}
public RabbitHA(Map config, Closure init){
this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])
this.addresses = Address.parseAddresses(config.addresses)
this.init = init
connectChannel()
}
void onDelivery(String queueName, Closure closure) {
basicConsume(queueName)
int i = 0
while(true) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery()
closure(delivery, channel)
i = 0
} catch(e) {
// Only handle exceptions
if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e
i++
e.printStackTrace()
println "ShutdownSignalException recieved! Reconnection attempt #$i"
connectChannel()
basicConsume(queueName)
}
}
}
void publish(Closure closure) {
int i = 0
boolean retry = true
while(retry) {
try {
closure(channel)
i = 0
retry = false
} catch(e) {
// Only handle exceptions
if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e
i++
retry = true
e.printStackTrace()
println "ShutdownSignalException recieved! Reconnection attempt #$i"
connectChannel()
}
}
}
void connectChannel() {
connection = connectionFactory.newConnection(addresses)
channel = connection.createChannel()
println "Succesfully connected to $connection.address"
if(init) {
init(channel)
}
}
void basicConsume(queueName) {
consumer = new QueueingConsumer(channel)
channel.basicConsume(queueName, false, consumer)
}
void close() {
channel.waitForConfirmsOrDie()
channel.close()
connection.close()
}
}
Output:
cat consumer-output.txt |grep Message | wc
9957 19914 128330
cat producer-output.txt | grep Message | wc
10000 20000 128890
Note that consumer lost 43 messages
Output from producer :
...
Message 1466
Message 1467
Message 1468
Message 1469
Message 1470
ShutdownSignalException recieved! Reconnection attempt #1
Succesfully connected to i-0000001a-zsm/172.16.158.46
Message 1471
Message 1472
Message 1473
Message 1474
Message 1475
Message 1476
...
Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.
The complete outputs are in https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There you can see reconnection log of both parts.
I've a strong feeling the publisher confirms is not well configured.
Please anyone could shed some light on the issue?
Cheers,
Pablo Molnar
PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: https://github.com/pablomolnar/rabbitmq_samples
rabbitmq-discuss mailing list
rabbitmq-discuss-ETbvJ2rUIr4qBm01orBoR9BPR1lH4CV8@public.gmane.org
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
<div> <p>Ok...so I'm maybe confused. Basically I follow this example:</p> <div><br></div> <div><a href="http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java">http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java</a></div> <div><br></div> <div>Using the sentences <span>ch.confirmSelect(); </span>and <span>ch.waitForConfirmsOrDie(); </span>all in a durable queue.</div> <div><br></div> <div>This example doesn't cover republishing nacks? Do you have an example? I have to implement a <span>ConfirmListener?</span> </div> <div><span><br></span></div>Thanks for helping Simone!<div> <br><div> <div class="gmail_quote">On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <span dir="ltr"><<a href="mailto:simone.busoli@...">simone.busoli@...</a>></span> wrote:<br><blockquote class="gmail_quote"> <p>Hi Pablo, looking at your code I can't see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn't arrive from the broker, which usually happens when the broker dies.</p> <div class="gmail_quote"> <div> <div class="h5">On Jan 31, 2012 11:52 PM, "Pablo Molnar" <<a href="mailto:pablomolnar@..." target="_blank">pablomolnar@...</a>> wrote:<br type="attribution"> </div> </div> <blockquote class="gmail_quote"> <div><div class="h5"> Hi all!<br><br>I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br>Test OK: Kill node while consuming:<br>1 - Setup a clean 3 node's cluster<br> 2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br><br>Test FAILED: Kill node while producing:<br> 1 - Setup a clean 3 node's cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br> 4 - While producer is running kill node A<br><br> Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages already published to node A are lost.<br><br><br>These are my settings:<br><br><span>rabbitmqctl status</span><br><span>Status of node 'rabbit <at> i-00000007-asm' ...</span><br><span>[{pid,11339},</span><br><span> {running_applications,</span><br><span> [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},</span><br><span> {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},</span><br><span> {amqp_client,"RabbitMQ AMQP Client","2.7.1"},</span><br><span> {rabbit,"RabbitMQ","2.7.1"},</span><br><span> {os_mon,"CPO CXC 138 46","2.2.4"},</span><br><span> {sasl,"SASL CXC 138 11","2.1.8"},</span><br><span> {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},</span><br><span> {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},</span><br><span> {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},</span><br><span> {inets,"INETS CXC 138 49","5.2"},</span><br><span> {mnesia,"MNESIA CXC 138 12","4.4.12"},</span><br><span> {stdlib,"ERTS CXC 138 10","1.16.4"},</span><br><span> {kernel,"ERTS CXC 138 10","2.13.4"}]},</span><br><span> {os,{unix,linux}},</span><br><span> {erlang_version,</span><br><span> "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},</span><br><span> {memory,</span><br><span> [{total,92565608},</span><br><span> {processes,4004968},</span><br><span> {processes_used,3996224},</span><br><span> {system,88560640},</span><br><span> {atom,1322033},</span><br><span> {atom_used,1291462},</span><br><span> {binary,32496},</span><br><span> {code,15264387},</span><br><span> {ets,1174192}]},</span><br><span> {vm_memory_high_watermark,0.3999999999362281},</span><br><span> {vm_memory_limit,<a href="tel:2508940902" value="+12508940902" target="_blank">2508940902</a>}]</span><br><span>...done.</span><br><br><br><br><span>rabbitmqctl cluster_status</span><br><span>Cluster status of node 'rabbit <at> i-00000007-asm' ...</span><br><span>[{nodes,[{disc,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']},</span><br><span> {ram,['rabbit <at> i-00000009-asm']}]},</span><br><span> {running_nodes,['rabbit <at> i-0000001a-zsm','rabbit <at> i-00000007-asm']}]</span><br><span>...done.</span><br><br><br><br>Java amqp-client 2.7.1<br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span>import com.rabbitmq.client.*</span><br><span>try{</span><br><br><span>// Get rabbitmq config</span><br><span>def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br><br><span>def rabbit = new RabbitHA(config)</span><br><span>rabbit.init = { channel -></span><br><span> channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange</span><br><span> channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy</span><br><span> channel.queueBind('myQueue', 'myExchange', '')</span><br><span> channel.confirmSelect()</span><br><span>}</span><br><br><span>10000.times { idx -></span><br><span> rabbit.publish { channel -></span><br><span> def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br><span> def msg = "Message $idx"</span><br><span> channel.basicPublish('myExchange', '', properties, msg.getBytes())</span><br><span> println msg</span><br><span> }</span><br><span>}</span><br><br><span>rabbit.close()</span><br><br><span>} catch(e){e.printStackTrace()}</span><br><br><br>- Consumer.groovy<br><br><span>import com.rabbitmq.client.*</span><br><br><span></span><span>try{</span><br><br><span> // Get rabbitmq config</span><br><span> def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br><br><span> </span><span>// Connect</span><br><span> </span><span>def rabbit = new RabbitHA(config)</span><br><span> </span><span>rabbit.onDelivery('myQueue'){ delivery, channel -></span><br><span> def msg = new String(delivery.body)</span><br><span> println msg</span><br><br><span> // Manual ack</span><br><span> channel.basicAck(delivery.envelope.deliveryTag, false)</span><br><span></span><br><span>} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span>import com.rabbitmq.client.*</span><br><br><span>/**≤/span><span></span><br><span> *</span><br><span> * RabbitMQ highly available proxy.</span><br><span> * Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br><span> *</span><br><span> */</span><br><span>class RabbitHA {</span><br><span> ConnectionFactory connectionFactory</span><br><span> Address[] addresses</span><br><span> Closure init</span><br><br><span> Connection connection</span><br><span> Channel channel</span><br><span> QueueingConsumer consumer</span><br><br><span> public RabbitHA(Map config) {</span><br><span> this(config, null)</span><br><span> }</span><br><br><span> public RabbitHA(Map config, Closure init){</span><br><span> this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br><span> this.addresses = Address.parseAddresses(config.addresses)</span><br><span> this.init = init</span><br><span> connectChannel()</span><br><span> }</span><br><br><span> void onDelivery(String queueName, Closure closure) {</span><br><span> basicConsume(queueName)</span><br><span> int i = 0</span><br><br><span> while(true) {</span><br><span> try {</span><br><span> QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br><span> closure(delivery, channel)</span><br><span> i = 0</span><br><span> } catch(e) {</span><br><span> // Only handle exceptions</span><br><span> </span><span>if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br><br><span> i++</span><br><span> </span><span>e.printStackTrace()</span><br><span> println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br><span> connectChannel()</span><br><span> basicConsume(queueName)</span><br><span> }</span><br><span> }</span><br><span> }</span><br><br><span> void publish(Closure closure) {</span><br><span> int i = 0</span><br><span> </span><span>boolean retry = true</span><br><span> while(retry) {</span><br><span> try {</span><br><span> closure(channel)</span><br><span> i = 0</span><br><span> </span><span>retry = false</span><br><span> } catch(e) {</span><br><span> // Only handle exceptions</span><br><span> </span><span>if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br><br><span> i++</span><br><span> </span><span>retry = true</span><br><span> e.printStackTrace()</span><br><span> println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br><span> connectChannel()</span><br><span> }</span><br><span> }</span><br><span> }</span><br><br><br><span> void connectChannel() {</span><br><span> connection = connectionFactory.newConnection(addresses)</span><br><span> channel = connection.createChannel()</span><br><br><span> println "Succesfully connected to $connection.address"</span><br><br><span> if(init) {</span><br><span> init(channel)</span><br><span> }</span><br><span> }</span><br><br><span> void basicConsume(queueName) {</span><br><span> consumer = new QueueingConsumer(channel)</span><br><span> channel.basicConsume(queueName, false, consumer)</span><br><span> }</span><br><br><span> void close() {</span><br><span> </span><span>channel.waitForConfirmsOrDie()</span><br><span> </span><span>channel.close()</span><br><span> </span><span>connection.close()</span><br><span> }</span><br><span>}</span><br><br><br><br> Output:<br><br><span>cat consumer-output.txt |grep Message | wc</span><br><span> 9957 19914 128330</span><br><br><span>cat producer-output.txt | grep Message | wc</span><br><span> 10000 20000 128890</span><br><br>Note that consumer lost 43 messages<br><br>Output from producer :<br><span>...</span><br><span><span>Message 1466</span><br><span>Message 1467</span><br><span>Message 1468</span><br><span>Message 1469</span><br><span>Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46" target="_blank">172.16.158.46</a><br>Message 1471<br> Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br><span>...</span><br><br><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.<br><br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out" target="_blank">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br> I've a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples" target="_blank">https://github.com/pablomolnar/rabbitmq_samples</a><br><br> </div></div>_______________________________________________<br> rabbitmq-discuss mailing list<br><a href="mailto:rabbitmq-discuss@..." target="_blank">rabbitmq-discuss@...</a><br><a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br><br> </blockquote> </div> </blockquote> </div> <br> </div> </div> </div>
RSS Feed