Jepsen-testing RabbitMQ [with python]

UPDATE: the code is now available on GitHub https://github.com/mbsimonovic/jepsen-python

In this post I’ll tell you about trying to reproduce the Jepsen RabbitMQ test (using python, not clojure).
It’s been more than 2 years since the test, and rabbitmq went from 3.3 to 3.6 meanwhile, so I was wondering if anything’s different these days (end of 2016).

Messaging is a legit communication pattern, as nicely documented in the Enterprise Integration Patterns (2003) book, and with the rise of microservices it’s even more relevant that it was 10-15yrs ago.

Seems like there’s a consensus about preferring smart endpoints and dumb pipes, so when it comes to picking up a messaging provider, there’s a few options: RabbitMQ, Apache Kafka and Apache ActiveMQ. Google Trends do back up this claim about the rise of interest:

 google-trends-rabbitmq-vs-kafka-vs-activemq

Back in 2013 Kyle Kingsbury started publishing a fantastic series of blog posts called Jepsen where he tested how databases behave in a distributed environment. Turned out bad for most of them, RabbitMQ included.

In short, under certain failure scenarios, it’s possible to lose data, and losing here means losing a rabbitmq acknowledged write.

I was wondering if today (December 2016) things are any different.

Setting up the environment (docker)

I’ve been using docker for almost 2 years now, so that was a natural choice for this test. I use Vmware Fusion to run ubuntu xenial, and run docker on ubuntu (you can grab an image from http://www.osboxes.org/ubuntu/).


# enable ssh
sudo apt-get install openssh-server
# login from your laptop
# update & upgrade
#
$ sudo apt-get -y install python-pip python-virtualenv python-dev
$ python --version Python 2.7.12
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 16.04.1 LTS
Release: 16.04
Codename: xenial
$ uname -a
Linux osboxes 4.4.0-53-generic #74-Ubuntu SMP Fri Dec 2 15:59:10 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
#
# install docker https://docs.docker.com/engine/installation/linux/ubuntulinux/

Jepsen has meanwhile gotten a docker setup so I tried that first:

$ ssh [email protected]
$ git clone https://github.com/jepsen-io/jepsen.git && cd jepsen/docker
$ ./up.sh # this runs docker-compose build and up, so in a new terminal:
$ sudo docker exec -it jepsen-control bash
root@control $ cd rabbitmq && lein test :only jepsen.rabbitmq-test

This failed with JSchException: Packet corrupt. ssh-ing once to each node and accepting the host key fixed this problem.
Trying lein test again failed with the message “rabbitmq-server: unrecognized service”. Looking at the console output, the test tried to install rabbitmq only on 3 nodes (out of 5). So I manually installed rabbit on the remaining 2 nodes and ran the test again. It now hangs while starting rabbit. I tried changing the rabbit-test.clj and adding :nodes [:n1 :n2 :n3 :n4 :n5] but it tries to start rabbit on 4 nodes and hangs again. Not familiar that much with clojure enough to debug so a Jepsen bug submitted.

Running a RabbitMQ cluster in Docker

First thing I need a rabbitmq cluster in docker, will start with bijukunjummen/docker-rabbitmq-cluster:

$ git clone https://github.com/bijukunjummen/docker-rabbitmq-cluster.git
$ cd docker-rabbitmq-cluster/base
$ sudo docker build -t bijukunjummen/rabbitmq-base .
# this hangs while installing plugins, seems like erlang freaks out when it's pid 1, so just add && true ad the end:
# RUN /usr/sbin/rabbitmq-plugins enable rabbitmq_mqtt rabbitmq_stomp rabbitmq_management rabbitmq_management_agent rabbitmq_management_visualiser rabbitmq_federation rabbitmq_federation_management sockjs && true
$ cd ../server
$ wget 'https://github.com/jepsen-io/jepsen/raw/master/rabbitmq/resources/rabbitmq/rabbitmq.config'
$ sudo docker build -t bijukunjummen/rabbitmq-server .
$ cd ../cluster
# edit docker-compose.yml

rabbit1:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit1
  ports:
    - "5672:5672"
    - "15672:15672"

rabbit2:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit2
  links:
    - rabbit1
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
  ports:
      - "5673:5672"
      - "15673:15672"

rabbit3:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit3
  links:
    - rabbit1
    - rabbit2
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   
  ports:
        - "5674:5672"

rabbit4:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit4
  links:
    - rabbit1
    - rabbit2
    - rabbit3
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
  ports:
        - "5675:5672"

rabbit5:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit5
  links:
    - rabbit1
    - rabbit2
    - rabbit3
    - rabbit4
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   
  ports:
        - "5676:5672"

Starting up a cluster using links didn’t work for me, some nodes would hang because they didn’t know
all other nodes (rabbit3 doesn’t know about rabbit5 because it was created earlier), so had to go with a DNS.

version: '2'
services:
  rabbit1:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit1
    ports:
      - "5672:5672"
      - "15672:15672"

  rabbit2:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit2
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5673:5672"
      - "15673:15672"
  rabbit3:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit3
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1   
    ports:
      - "5674:5672"

  rabbit4:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit4
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5675:5672"

  rabbit5:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit5
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5676:5672"

# need new docker-compose for version 2 syntax:
$ sudo apt-get install docker-compose
$ sudo docker-compose up

$ sudo docker-compose up
Creating jepsen_rabbit4_1
Creating jepsen_rabbit5_1
Creating jepsen_rabbit2_1
Creating jepsen_rabbit3_1
Creating jepsen_rabbit1_1
Attaching to jepsen_rabbit2_1, jepsen_rabbit3_1, jepsen_rabbit1_1, jepsen_rabbit5_1, jepsen_rabbit4_1
rabbit2_1  | Warning: PID file not written; -detached was passed.
rabbit3_1  | Warning: PID file not written; -detached was passed.
rabbit5_1  | Warning: PID file not written; -detached was passed.
rabbit4_1  | Warning: PID file not written; -detached was passed.
rabbit1_1  | 
rabbit1_1  |               RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
rabbit1_1  |   ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
rabbit1_1  |   ##  ##
rabbit1_1  |   ##########  Logs: /var/log/rabbitmq/[email protected]
rabbit1_1  |   ######  ##        /var/log/rabbitmq/[email protected]
rabbit1_1  |   ##########
rabbit1_1  |               Starting broker...
rabbit2_1  | Stopping node rabbit@rabbit2 ...
rabbit3_1  | Stopping node rabbit@rabbit3 ...
rabbit4_1  | Stopping node rabbit@rabbit4 ...
rabbit5_1  | Stopping node rabbit@rabbit5 ...
rabbit1_1  |  completed with 12 plugins.
rabbit3_1  | Clustering node rabbit@rabbit3 with rabbit@rabbit1 ...
rabbit2_1  | Clustering node rabbit@rabbit2 with rabbit@rabbit1 ...
rabbit4_1  | Clustering node rabbit@rabbit4 with rabbit@rabbit1 ...
rabbit5_1  | Clustering node rabbit@rabbit5 with rabbit@rabbit1 ...
rabbit3_1  | Starting node rabbit@rabbit3 ...
rabbit2_1  | Starting node rabbit@rabbit2 ...
rabbit4_1  | Starting node rabbit@rabbit4 ...
rabbit2_1  |  * sockjs
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit1 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit3 down
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | Keep rabbit@rabbit3 listeners: the node is already back
rabbit5_1  | Starting node rabbit@rabbit5 ...
rabbit3_1  |  * rabbitmq_management
rabbit3_1  |  * rabbitmq_web_dispatch
rabbit3_1  |  * webmachine
rabbit3_1  |  * mochiweb
rabbit3_1  |  * rabbitmq_mqtt
rabbit3_1  |  * rabbitmq_federation
rabbit3_1  |  * rabbitmq_stomp
rabbit3_1  |  * rabbitmq_management_agent
rabbit3_1  |  * amqp_client
rabbit3_1  |  * sockjs
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit3 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit2_1  | rabbit on node rabbit@rabbit5 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit2_1  | rabbit on node rabbit@rabbit4 up
rabbit5_1  |  * mochiweb
rabbit5_1  |  * rabbitmq_mqtt
rabbit5_1  |  * rabbitmq_federation
rabbit5_1  |  * rabbitmq_stomp
rabbit5_1  |  * rabbitmq_management_agent
rabbit5_1  |  * amqp_client
rabbit5_1  |  * sockjs
rabbit5_1  | 
rabbit5_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit5_1  | rabbit on node rabbit@rabbit4 up
rabbit4_1  |  * rabbitmq_management
rabbit4_1  |  * rabbitmq_web_dispatch
rabbit4_1  |  * webmachine
rabbit4_1  |  * mochiweb
rabbit4_1  |  * rabbitmq_mqtt
rabbit4_1  |  * rabbitmq_federation
rabbit4_1  |  * rabbitmq_stomp
rabbit4_1  |  * rabbitmq_management_agent
rabbit4_1  |  * amqp_client
rabbit4_1  |  * sockjs
rabbit3_1  | 
rabbit3_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit3_1  | rabbit on node rabbit@rabbit5 up
rabbit3_1  | 
rabbit3_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit3_1  | rabbit on node rabbit@rabbit4 up

 

After a few seconds the cluster should be up and running so open http://192.168.54.136:15672/ and login with guest/guest.

screen-shot-2016-12-26-at-12-02-37

Let’s run a simple hello world test:

$ sudo pip install pika
$ cat jepsen_produce.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='jepsen.queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
 routing_key='jepsen.queue',
 body=message,
 properties=pika.BasicProperties(
 delivery_mode = 2, # make message persistent
 ))
print(" [x] Sent %r" % message)
connection.close()
# 
#
$ cat jepsen_consume.py
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='jepsen.queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
 print(" [x] Received %r" % body)
 time.sleep(body.count(b'.'))
 print(" [x] Done")
 ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='jepsen.queue')

channel.start_consuming()
# 
$ python jepsen_produce.py 'hi there'
[x] Sent 'hi there'

$ python jepsen_consume.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi there'
[x] Done

Blockade – Jepsen port in python

There’s a python port of Jepsen called blockade, so while waiting for jepsen-160, I’ve decided to write my own rabbitmq test using blockade.

With the cluster up and running let’s setup blockade to manage it:

$ # sudo pip install blockade; this has a bug that was fixed in master after the last 0.3.1 release
# so had to install from sources:
$ git clone https://github.com/dcm-oss/blockade.git
$ cd blockade && sudo python setup.py install
# add existing nodes named jepsen_rabbit{1..5}_1 to blockade:
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
83db717f40f5 bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, 0.0.0.0:15673->15672/tcp jepsen_rabbit2_1
7ec3390ea0fc bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 0.0.0.0:5672->5672/tcp, 9100-9105/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp jepsen_rabbit1_1
4599b42b5d9a bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5675->5672/tcp jepsen_rabbit4_1
4eaa2e7a76c2 bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp jepsen_rabbit3_1
aa409bf28eba bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5676->5672/tcp jepsen_rabbit5_1
$
$ for i in `seq 1 5`; do sudo blockade add jepsen_rabbit${i}_1; done
$
$ sudo blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
jepsen_rabbit1_ 7ec3390ea0fc UP 172.20.0.2 UNKNOWN
1
jepsen_rabbit2_ 83db717f40f5 UP 172.20.0.3 UNKNOWN
1
jepsen_rabbit3_ 4eaa2e7a76c2 UP 172.20.0.4 UNKNOWN
1
jepsen_rabbit4_ 4599b42b5d9a UP 172.20.0.5 UNKNOWN
1
jepsen_rabbit5_ aa409bf28eba UP 172.20.0.6 UNKNOWN
1

 

The original jepsen test uses triple-mirrored writes so need to configure that:

Follow Milan Simonovic on Linkedin

screen-shot-2016-12-19-at-12-46-25

 

rabbitmq randomly chooses two more slaves for the queue, and it my case they were rabbit3 and rabbit5, you can find this on the queue tab, under Details.

Finally testing RabbitMQ

Let’s rewrite jepsen’s rabbit.clj test in python:

#!/usr/bin/env python
import pika
import sys
import time
import exceptions
import threading
import logging

logging.basicConfig(level=logging.INFO, format='[%(levelname)s] (%(threadName)-10s) %(message)s', )


RABBIT_HOST = '192.168.54.136'
RABBIT_PORT = 5672
TOTAL_MSGS = 5000
MAX_RECONN_ATTEMPTS = 300000
MAX_PUBLISH_ATTEMPTS = 10

num_rabbits = int(sys.argv[1]) if len(sys.argv) > 1 else 5

class JepsenRabbitmq():
    '''
    Is Pika thread safe?

    Pika does not have any notion of threading in the code. If you want to use Pika with threading,
    make sure you have a Pika connection per thread, created in that thread. It is not safe to
    share one Pika connection across threads.
    '''
    def __init__(self, rabbit_port):
        logging.info('rabbitmq client at %i', rabbit_port)
        self.rabbit_hole = rabbit_port
        self.sent = []
        self.failed = []
        self.dig()

    def dig(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBIT_HOST, port=self.rabbit_hole))
        self.channel = self.connection.channel()
        # for publish:
        # jepsen uses confirmation trackings:   (lco/select ch) ; Use confirmation tracking
        # is this the way to enable this in python:

        # http://www.rabbitmq.com/extensions.html#confirms
        self.channel.confirm_delivery()  # Turn on RabbitMQ-proprietary Confirm mode in the channel; Enabled delivery confirmations

        # https://github.com/jepsen-io/jepsen/blob/master/rabbitmq/src/jepsen/rabbitmq.clj#L132
        self.channel.queue_declare(queue='jepsen.queue', durable=True, auto_delete=False, exclusive=False)

    def enqueue(self, message):
        logging.debug('sending %s', message)
        # see http://pika.readthedocs.io/en/0.10.0/examples/blocking_delivery_confirmations.html
        res = self.channel.basic_publish(exchange='',
                                             routing_key='jepsen.queue',
                                             body=message,
                                             properties=pika.BasicProperties(
                                                 delivery_mode=2,  # make message persistent
                                                 ## in jepsen:     :content-type  "application/edn"
                                                 # content_type='text/plain'
                                             ),
                                             mandatory=True)
        if not res:
            raise Exception('Message could not be confirmed')
            # TODO 5sec timeout
            # (if (lco/wait-for-confirms ch 5000)     # ; Block until message acknowledged

    def dequeue(self):
        """Given a channel and an operation, dequeues a value and returns the
        corresponding operation.
          ; auto-ack dynamics mean that even if we issue a dequeue req then crash,
          ; the message should be re-delivered and we can count this as a failure.
        """
        self.channel.basic_qos(prefetch_count=1)
        # see http://pika.readthedocs.io/en/0.10.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.basic_get
        method, header, body = self.channel.basic_get(queue='jepsen.queue', no_ack=False)
        if method:
            self.channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
        return body

        # TODO timeout
    def close(self):
        try:
            self.connection.close()
        except:
            logging.warning('failed to close the connection')

class JepsenProducer(JepsenRabbitmq):
    def __init__(self, rabbit_port):
        JepsenRabbitmq.__init__(self, rabbit_port)

        self.thread = threading.Thread(name='rabbit %i' % rabbit_port, target=self._test)

    def test(self):
        logging.info('starting producer')
        self.thread.start()

    def _test(self):
        self._produce()
        self.report()
        self.close()

    def wait_for_test_to_complete(self, timeout = None):
        self.thread.join(timeout)
        logging.info('producer finished work')

    # (timeout 5000 (assoc op :type :fail :value :timeout)
    #           (let [[meta payload] (lb/get ch queue)
    #                 value          (codec/decode payload)]
    #             (if (nil? meta)
    #               (assoc op :type :fail :value :exhausted)
    #               (assoc op :type :ok :value value)))))
    # timeout = 5
    # def on_timeout():
    #    global connection
    #    connection.close()
    # connection.add_timeout(timeout, on_timeout)
    # if basic_get returns than clear the timeout
    def _produce(self):
        logging.info('sending messages')
        for i in range(TOTAL_MSGS):
            msg = i + TOTAL_MSGS * (self.rabbit_hole - RABBIT_PORT)
            for publish_attempt in range(MAX_PUBLISH_ATTEMPTS):
                try:
                    self.enqueue(str(msg))
                    self.sent.append(msg)
                    time.sleep(0.1)
                    break
                    # (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
                except pika.exceptions.ConnectionClosed as c:
                    for conn_attempt in range(MAX_RECONN_ATTEMPTS):
                        try:
                            logging.debug('trying to re-open connection')
                            self.dig()
                            break
                        except:
                            time.sleep(conn_attempt)
                            # time.sleep(1)
                    else: # failed to re-open
                        self.failed.append(i)
                        logging.error('failed to send %i, cannot connect to rabbit, stopping the test' % msg, c)
                        return
                except exceptions.KeyboardInterrupt as ke:
                    logging.warning('keyboard exception, stopping msg publishing')
                    # failed.append(msg)
                    return
                # except Exception as e:
                except:
                    e = sys.exc_info()[0]
                    if publish_attempt == MAX_PUBLISH_ATTEMPTS - 1:
                        logging.exception('failed to send %i' % msg)
                        self.failed.append(i)
                    else:
                        logging.exception('bumpy road',)
                        time.sleep(1)

    def report(self):
        logging.info('failed: %s', self.failed)
        logging.info('sent: %i, failed: %i, total: %i' , len(self.sent), len(self.failed), TOTAL_MSGS)

class JepsenConsumer(JepsenRabbitmq):
    def __init__(self, rabbit_port, all_sent, all_failed):
        JepsenRabbitmq.__init__(self, rabbit_port)
        self.failed = all_failed
        self.sent = all_sent
        self.lost = set()
        self.unexpected = []

    def wrapup(self):
        # i = raw_input('press ENTER to drain the queue')

        try:
            self._drain()
            logging.warning('RECEIVED: %i, unexpected: %i. %s, LOST MESSAGES %i, %s',
                            len(self.sent), len(self.unexpected), self.unexpected, len(self.lost), sorted(list(self.lost)))
        except:
            logging.exception('failed to drain')
        finally:
            self.close()

    def _drain(self):
        # i = raw_input('press ENTER to drain the queue')

        received = set()
        # new connection and channel?
        self.dig()

        while True:
            r = self.dequeue()
            if not r or len(r) == 0:
                break
            i = int(r)
            if i in received:
                self.unexpected.append(i)
            received.add(i)

        self.lost.update(set(self.sent).difference(received))

rabbits = []
for r in range(num_rabbits):
    rabbits.append(JepsenProducer(RABBIT_PORT + r))
for r in rabbits:
    r.test()

# wait for tests to complete
for r in rabbits:
    r.wait_for_test_to_complete()

time.sleep(60)
all_sent = []
all_failed = []
for r in rabbits:
    all_sent.extend(r.sent)
    all_failed.extend(r.failed)

c = JepsenConsumer(RABBIT_PORT, all_sent, all_failed)
c.wrapup()
print('done')

The code uses separate threads to send messages to rabbitmq, then waits for all to finish before collecting all messages. To introduce network problems while the test is running, I use blockade in another terminal:

for i in `seq 6`
do 
    echo 'creating partition'
    sudo blockade partition jepsen_rabbit1_1,jepsen_rabbit2_1,jepsen_rabbit3_1 jepsen_rabbit4_1,jepsen_rabbit5_1
    sudo blockade status
    sleep 60
    echo 'healing partition'
    sudo blockade join
    sudo blockade status
    sleep 60
done

 

Results

The clients connects and starts sending messages:

/usr/bin/python2.7 src/jepsen_test.py
[INFO] (MainThread) rabbitmq client at 5672
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5673
[INFO] (MainThread) Connecting to 192.168.54.136:5673
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5674
[INFO] (MainThread) Connecting to 192.168.54.136:5674
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5675
[INFO] (MainThread) Connecting to 192.168.54.136:5675
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5676
[INFO] (MainThread) Connecting to 192.168.54.136:5676
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) starting producer
[INFO] (rabbit 5672) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5673) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5674) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5675) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5676) sending messages

then when rabbitmq detects a partition (pause minority), clients loses a connection:

[WARNING] (rabbit 5675) Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=jepsen.queue'])>; properties=<BasicProperties(['delivery_mode=2'])>; body_size=5; body_prefix='15310'
[WARNING] (rabbit 5676) Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=jepsen.queue'])>; properties=<BasicProperties(['delivery_mode=2'])>; body_size=5; body_prefix='20312'
[WARNING] (rabbit 5676) Socket closed when connection was open
[WARNING] (rabbit 5676) Disconnected from RabbitMQ at 192.168.54.136:5676 (320): CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
[CRITICAL] (rabbit 5676) Connection close detected; result=BlockingConnection__OnClosedArgs(connection=, reason_code=320, reason_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")

screen-shot-2016-12-21-at-10-39-03

Then after 60sec the partition is healed and the clients connects again:

[INFO] (rabbit 5675) Connecting to 192.168.54.136:5675
[INFO] (rabbit 5675) Created channel=1

End result, all 25000 messages sent:

[INFO] (rabbit 5672) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5673) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5674) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5675) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5676) sent: 5000, failed: 0, total: 5000

Note that rabbitmq reports 25020 messages:

screen-shot-2016-12-26-at-13-02-41

That’s ok, some are duplicates.

[WARNING] (MainThread) RECEIVED: 25000, DUPLICATE: 20. [20123, 15121, 15630, 20644..., 22713], LOST MESSAGES 0, []

Bottom line: 0 messages lost. I’ve found a great post by Balint Pato where he explained how he managed to reproduce the Jepsen results. Unfortunately, his code and setup are not publicly available, so until he gets it out, I’ll try to reproduce his results. Stay tuned, more posts coming..

5.1.17 UPDATE: second post available where I try different partitioning schemes.

Leave a Reply

Your email address will not be published. Required fields are marked *