Tag Archives: jepsen

Jepsen-testing RabbitMQ [with python] – Part 2

This is the second post about my efforts to reproduce the Jepsen RabbitMQ test (using python). The first one failed to reproduce data loss by cutting the network in half the same way every time. Here I’ll try different partitioning schemes.

Random blockade partitions

First, let’s try the blockade’s random partitions:

$ #!/bin/bash
$ for i in `seq 5`
do 
  echo 'creating partition'
  sudo blockade partition --random
  sudo blockade status
  sleep 60
  echo 'healing partition'
  sudo blockade join
  sudo blockade status
  sleep 60
done

This is implemented as a nemesis in blockade_random_partitions-rabbitmq-test.py.

Running the test and yet again no messages were lost:

[WARNING] (MainThread) RECEIVED: 25000, unexpected: 19. [15042, 20042, 41, 10041, 5042, 10590, 20609, 604, 15630, 16155, 21134, 1134, 11709, 21709, 22240, 2496, 7498, 12279, 17532], LOST MESSAGES 0, []

Jepsen random halves

Let’s try to cut the network into random halves like in the original Jepsen test:

Meanwhile, the nemesis cuts the network into random halves every sixty seconds, then repairs it for sixty seconds, and so on.

This is implemented in random-majority-rabbitmq-test.py

Running this test and yet again no messages were lost.

Baliant Pato’s partitions

In an excellent post, Baliant Pato explained how he managed to reproduce the data loss. Let’s try Baliant Pato’s partitions (in a 3 node cluster: rabbit1, rabbit2, rabbit3 with pause_minority)

  1. Publisher A connecting to rabbit1
  2. partition rabbit3 away from rabbit2 and rabbit1 (now rabbit3 is stale pretty quickly, he’s gonna be the future “stale node”, >evil laughter<.)
  3. partition rabbit1 away from rabbit2 and rabbit3 (at this point the cluster is stopped, the final two minorities are in sync but dead)
  4. partition rabbit2 away from rabbit1 and rabbit3
  5. heal rabbit3 (cluster is still stopped)
  6. heal rabbit2

For this test I need 3 instead of 5 rabbitmq nodes (make a copy of docker-compose.yml and then sudo docker-compose -f docker-compose-3_nodes.yml up). I’ll be using 3 instead of 2 producers, one per rabbitmq node, and a single queue with a default exchange mirrored to all the nodes (“ha-mode”:”all”).

The partitioning scheme is:

  1. sudo blockade partition n1,n2 n3
  2. sudo blockade partition n1 n2 n3
  3. sudo blockade partition n1 n2,n3
  4. sudo blockade join

Implemented as a nemesis in Baliant_Pato_partitioning-rabbitmq-test.py:

t = sorted(nodes)

# partition rabbit3 away from rabbit2 and rabbit1
yield ['%s,%s' % (t[0], t[1]), t[2]]
# partition rabbit1 away from rabbit2 (rabbit3 already away)
yield t[:]
# join rabbit3 with rabbit2:
yield [t[0], '%s,%s' % (t[1], t[2])]
# finally join (all nodes belong to the same partition)
while True:
    yield [','.join(t)]Running the test:
$ sudo python src/Baliant_Pato_partitioning-rabbitmq-test.py 3
...
[ERROR] (rabbit 5674) failed to send 10041, cannot connect to rabbit, stopping the test
[INFO] (rabbit 5672) sent: 785, failed: 1, total: 5000
[ERROR] (rabbit 5673) failed to send 5783, cannot connect to rabbit, stopping the test
[INFO] (rabbit 5673) sent: 783, failed: 1, total: 5000
[INFO] (MainThread) 1609 messages sent, 3 failed
[WARNING] (MainThread) RECEIVED: 1612, unexpected: 0. [], LOST MESSAGES 0, []

rabbit2&rabbit3 are down, producers cannot finish the test.. let’s run one more time:

$ sudo python src/Baliant_Pato_partitioning-rabbitmq-test.py 3
...
[INFO] (MainThread) producers done
[INFO] (MainThread) draining the queue
[INFO] (MainThread) 15000 messages sent, 0 failed
[INFO] (MainThread) rabbitmq client at 192.168.54.136:5672
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[WARNING] (MainThread) RECEIVED: 5841, unexpected: 1. [799], LOST MESSAGES 9159

This leaves rabbit2 & rabbit3 again down.

# rabbit2 logs: 
...
=WARNING REPORT==== 5-Jan-2017::08:23:18 ===
Mirrored queue 'jepsen.queue' in vhost '/': Stopping all nodes on master shutdown since no synchronised slave is available

=INFO REPORT==== 5-Jan-2017::08:23:25 ===
Stopped RabbitMQ application

=ERROR REPORT==== 5-Jan-2017::08:25:19 ===
** Node rabbit@rabbit1 not responding **
** Removing (timedout) connection **

=ERROR REPORT==== 5-Jan-2017::08:25:33 ===
Mnesia(rabbit@rabbit2): ** ERROR ** mnesia_event got {inconsistent_database, starting_partitioned_network, rabbit@rabbit3}

=INFO REPORT==== 5-Jan-2017::08:25:33 ===
Starting RabbitMQ 3.6.5 on Erlang R16B03-1
Copyright (C) 2007-2016 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/
...

Mirrored queue 'jepsen.queue' in vhost '/': Adding mirror on node rabbit@rabbit3: <32532.2772.0>

hm, rabbit2 synchronized with rabbit3.

osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n3 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]},
 {cluster_name,<<"rabbit@rabbit1">>},
 {partitions,[]},
 {alarms,[{rabbit@rabbit2,[]},{rabbit@rabbit3,[]}]}]
osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n2 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]},
 {cluster_name,<<"rabbit@rabbit1">>},
 {partitions,[]},
 {alarms,[{rabbit@rabbit3,[]},{rabbit@rabbit2,[]}]}]
osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n1 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit1]},
 {cluster_name,<<"rabbit@rabbit1">>},
 {partitions,[]},
 {alarms,[{rabbit@rabbit1,[]}]}]
osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n2 rabbitmqctl list_queues
Listing queues ...
jepsen.queue 10801

So rabbit2 & rabbit3 think rabbit1 is dead and vice versa. If i connect to rabbit2 and drain the queue, then no messages are lost.

But wasn’t rabbitmq supposed to have detected a partition and report it? As in:

osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n1 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit1]},
{cluster_name,<<"rabbit@rabbit1">>},
{partitions,[{rabbit@rabbit1,[rabbit@rabbit2,rabbit@rabbit3]}]},
{alarms,[{rabbit@rabbit1,[]}]}]

 

pause_minority is supposedly an “automatic partition handling” mechanism (excerupt from the rabbitmq.com/partitions.html)

RabbitMQ also offers three ways to deal with network partitions automatically: pause-minority mode, pause-if-all-down mode and autoheal mode.

In pause-minority mode RabbitMQ will automatically pause cluster nodes which determine themselves to be in a minority (i.e. fewer or equal than half the total number of nodes) after seeing other nodes go down. … The minority nodes will pause as soon as a partition starts, and will start again when the partition ends.

If you after reading this assume (like I did) that no intervention is needed then you’re up for a surprise. I ended up here with the rabbit2&rabbit3 nodes reported as being down, and I have to manually restart them.

Jepsen-testing RabbitMQ [with python]

UPDATE: the code is now available on GitHub https://github.com/mbsimonovic/jepsen-python

In this post I’ll tell you about trying to reproduce the Jepsen RabbitMQ test (using python, not clojure).
It’s been more than 2 years since the test, and rabbitmq went from 3.3 to 3.6 meanwhile, so I was wondering if anything’s different these days (end of 2016).

Messaging is a legit communication pattern, as nicely documented in the Enterprise Integration Patterns (2003) book, and with the rise of microservices it’s even more relevant that it was 10-15yrs ago.

Seems like there’s a consensus about preferring smart endpoints and dumb pipes, so when it comes to picking up a messaging provider, there’s a few options: RabbitMQ, Apache Kafka and Apache ActiveMQ. Google Trends do back up this claim about the rise of interest:

 google-trends-rabbitmq-vs-kafka-vs-activemq

Back in 2013 Kyle Kingsbury started publishing a fantastic series of blog posts called Jepsen where he tested how databases behave in a distributed environment. Turned out bad for most of them, RabbitMQ included.

In short, under certain failure scenarios, it’s possible to lose data, and losing here means losing a rabbitmq acknowledged write.

I was wondering if today (December 2016) things are any different.

Setting up the environment (docker)

I’ve been using docker for almost 2 years now, so that was a natural choice for this test. I use Vmware Fusion to run ubuntu xenial, and run docker on ubuntu (you can grab an image from http://www.osboxes.org/ubuntu/).


# enable ssh
sudo apt-get install openssh-server
# login from your laptop
# update & upgrade
#
$ sudo apt-get -y install python-pip python-virtualenv python-dev
$ python --version Python 2.7.12
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 16.04.1 LTS
Release: 16.04
Codename: xenial
$ uname -a
Linux osboxes 4.4.0-53-generic #74-Ubuntu SMP Fri Dec 2 15:59:10 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
#
# install docker https://docs.docker.com/engine/installation/linux/ubuntulinux/

Jepsen has meanwhile gotten a docker setup so I tried that first:

$ ssh [email protected]
$ git clone https://github.com/jepsen-io/jepsen.git && cd jepsen/docker
$ ./up.sh # this runs docker-compose build and up, so in a new terminal:
$ sudo docker exec -it jepsen-control bash
root@control $ cd rabbitmq && lein test :only jepsen.rabbitmq-test

This failed with JSchException: Packet corrupt. ssh-ing once to each node and accepting the host key fixed this problem.
Trying lein test again failed with the message “rabbitmq-server: unrecognized service”. Looking at the console output, the test tried to install rabbitmq only on 3 nodes (out of 5). So I manually installed rabbit on the remaining 2 nodes and ran the test again. It now hangs while starting rabbit. I tried changing the rabbit-test.clj and adding :nodes [:n1 :n2 :n3 :n4 :n5] but it tries to start rabbit on 4 nodes and hangs again. Not familiar that much with clojure enough to debug so a Jepsen bug submitted.

Running a RabbitMQ cluster in Docker

First thing I need a rabbitmq cluster in docker, will start with bijukunjummen/docker-rabbitmq-cluster:

$ git clone https://github.com/bijukunjummen/docker-rabbitmq-cluster.git
$ cd docker-rabbitmq-cluster/base
$ sudo docker build -t bijukunjummen/rabbitmq-base .
# this hangs while installing plugins, seems like erlang freaks out when it's pid 1, so just add && true ad the end:
# RUN /usr/sbin/rabbitmq-plugins enable rabbitmq_mqtt rabbitmq_stomp rabbitmq_management rabbitmq_management_agent rabbitmq_management_visualiser rabbitmq_federation rabbitmq_federation_management sockjs && true
$ cd ../server
$ wget 'https://github.com/jepsen-io/jepsen/raw/master/rabbitmq/resources/rabbitmq/rabbitmq.config'
$ sudo docker build -t bijukunjummen/rabbitmq-server .
$ cd ../cluster
# edit docker-compose.yml

rabbit1:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit1
  ports:
    - "5672:5672"
    - "15672:15672"

rabbit2:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit2
  links:
    - rabbit1
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
  ports:
      - "5673:5672"
      - "15673:15672"

rabbit3:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit3
  links:
    - rabbit1
    - rabbit2
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   
  ports:
        - "5674:5672"

rabbit4:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit4
  links:
    - rabbit1
    - rabbit2
    - rabbit3
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
  ports:
        - "5675:5672"

rabbit5:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit5
  links:
    - rabbit1
    - rabbit2
    - rabbit3
    - rabbit4
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   
  ports:
        - "5676:5672"

Starting up a cluster using links didn’t work for me, some nodes would hang because they didn’t know
all other nodes (rabbit3 doesn’t know about rabbit5 because it was created earlier), so had to go with a DNS.

version: '2'
services:
  rabbit1:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit1
    ports:
      - "5672:5672"
      - "15672:15672"

  rabbit2:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit2
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5673:5672"
      - "15673:15672"
  rabbit3:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit3
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1   
    ports:
      - "5674:5672"

  rabbit4:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit4
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5675:5672"

  rabbit5:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit5
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5676:5672"

# need new docker-compose for version 2 syntax:
$ sudo apt-get install docker-compose
$ sudo docker-compose up

$ sudo docker-compose up
Creating jepsen_rabbit4_1
Creating jepsen_rabbit5_1
Creating jepsen_rabbit2_1
Creating jepsen_rabbit3_1
Creating jepsen_rabbit1_1
Attaching to jepsen_rabbit2_1, jepsen_rabbit3_1, jepsen_rabbit1_1, jepsen_rabbit5_1, jepsen_rabbit4_1
rabbit2_1  | Warning: PID file not written; -detached was passed.
rabbit3_1  | Warning: PID file not written; -detached was passed.
rabbit5_1  | Warning: PID file not written; -detached was passed.
rabbit4_1  | Warning: PID file not written; -detached was passed.
rabbit1_1  | 
rabbit1_1  |               RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
rabbit1_1  |   ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
rabbit1_1  |   ##  ##
rabbit1_1  |   ##########  Logs: /var/log/rabbitmq/[email protected]
rabbit1_1  |   ######  ##        /var/log/rabbitmq/[email protected]
rabbit1_1  |   ##########
rabbit1_1  |               Starting broker...
rabbit2_1  | Stopping node rabbit@rabbit2 ...
rabbit3_1  | Stopping node rabbit@rabbit3 ...
rabbit4_1  | Stopping node rabbit@rabbit4 ...
rabbit5_1  | Stopping node rabbit@rabbit5 ...
rabbit1_1  |  completed with 12 plugins.
rabbit3_1  | Clustering node rabbit@rabbit3 with rabbit@rabbit1 ...
rabbit2_1  | Clustering node rabbit@rabbit2 with rabbit@rabbit1 ...
rabbit4_1  | Clustering node rabbit@rabbit4 with rabbit@rabbit1 ...
rabbit5_1  | Clustering node rabbit@rabbit5 with rabbit@rabbit1 ...
rabbit3_1  | Starting node rabbit@rabbit3 ...
rabbit2_1  | Starting node rabbit@rabbit2 ...
rabbit4_1  | Starting node rabbit@rabbit4 ...
rabbit2_1  |  * sockjs
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit1 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit3 down
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | Keep rabbit@rabbit3 listeners: the node is already back
rabbit5_1  | Starting node rabbit@rabbit5 ...
rabbit3_1  |  * rabbitmq_management
rabbit3_1  |  * rabbitmq_web_dispatch
rabbit3_1  |  * webmachine
rabbit3_1  |  * mochiweb
rabbit3_1  |  * rabbitmq_mqtt
rabbit3_1  |  * rabbitmq_federation
rabbit3_1  |  * rabbitmq_stomp
rabbit3_1  |  * rabbitmq_management_agent
rabbit3_1  |  * amqp_client
rabbit3_1  |  * sockjs
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit3 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit2_1  | rabbit on node rabbit@rabbit5 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit2_1  | rabbit on node rabbit@rabbit4 up
rabbit5_1  |  * mochiweb
rabbit5_1  |  * rabbitmq_mqtt
rabbit5_1  |  * rabbitmq_federation
rabbit5_1  |  * rabbitmq_stomp
rabbit5_1  |  * rabbitmq_management_agent
rabbit5_1  |  * amqp_client
rabbit5_1  |  * sockjs
rabbit5_1  | 
rabbit5_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit5_1  | rabbit on node rabbit@rabbit4 up
rabbit4_1  |  * rabbitmq_management
rabbit4_1  |  * rabbitmq_web_dispatch
rabbit4_1  |  * webmachine
rabbit4_1  |  * mochiweb
rabbit4_1  |  * rabbitmq_mqtt
rabbit4_1  |  * rabbitmq_federation
rabbit4_1  |  * rabbitmq_stomp
rabbit4_1  |  * rabbitmq_management_agent
rabbit4_1  |  * amqp_client
rabbit4_1  |  * sockjs
rabbit3_1  | 
rabbit3_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit3_1  | rabbit on node rabbit@rabbit5 up
rabbit3_1  | 
rabbit3_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit3_1  | rabbit on node rabbit@rabbit4 up

 

After a few seconds the cluster should be up and running so open http://192.168.54.136:15672/ and login with guest/guest.

screen-shot-2016-12-26-at-12-02-37

Let’s run a simple hello world test:

$ sudo pip install pika
$ cat jepsen_produce.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='jepsen.queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
 routing_key='jepsen.queue',
 body=message,
 properties=pika.BasicProperties(
 delivery_mode = 2, # make message persistent
 ))
print(" [x] Sent %r" % message)
connection.close()
# 
#
$ cat jepsen_consume.py
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='jepsen.queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
 print(" [x] Received %r" % body)
 time.sleep(body.count(b'.'))
 print(" [x] Done")
 ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='jepsen.queue')

channel.start_consuming()
# 
$ python jepsen_produce.py 'hi there'
[x] Sent 'hi there'

$ python jepsen_consume.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi there'
[x] Done

Blockade – Jepsen port in python

There’s a python port of Jepsen called blockade, so while waiting for jepsen-160, I’ve decided to write my own rabbitmq test using blockade.

With the cluster up and running let’s setup blockade to manage it:

$ # sudo pip install blockade; this has a bug that was fixed in master after the last 0.3.1 release
# so had to install from sources:
$ git clone https://github.com/dcm-oss/blockade.git
$ cd blockade && sudo python setup.py install
# add existing nodes named jepsen_rabbit{1..5}_1 to blockade:
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
83db717f40f5 bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, 0.0.0.0:15673->15672/tcp jepsen_rabbit2_1
7ec3390ea0fc bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 0.0.0.0:5672->5672/tcp, 9100-9105/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp jepsen_rabbit1_1
4599b42b5d9a bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5675->5672/tcp jepsen_rabbit4_1
4eaa2e7a76c2 bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp jepsen_rabbit3_1
aa409bf28eba bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5676->5672/tcp jepsen_rabbit5_1
$
$ for i in `seq 1 5`; do sudo blockade add jepsen_rabbit${i}_1; done
$
$ sudo blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
jepsen_rabbit1_ 7ec3390ea0fc UP 172.20.0.2 UNKNOWN
1
jepsen_rabbit2_ 83db717f40f5 UP 172.20.0.3 UNKNOWN
1
jepsen_rabbit3_ 4eaa2e7a76c2 UP 172.20.0.4 UNKNOWN
1
jepsen_rabbit4_ 4599b42b5d9a UP 172.20.0.5 UNKNOWN
1
jepsen_rabbit5_ aa409bf28eba UP 172.20.0.6 UNKNOWN
1

 

The original jepsen test uses triple-mirrored writes so need to configure that:

Follow Milan Simonovic on Linkedin

screen-shot-2016-12-19-at-12-46-25

 

rabbitmq randomly chooses two more slaves for the queue, and it my case they were rabbit3 and rabbit5, you can find this on the queue tab, under Details.

Finally testing RabbitMQ

Let’s rewrite jepsen’s rabbit.clj test in python:

#!/usr/bin/env python
import pika
import sys
import time
import exceptions
import threading
import logging

logging.basicConfig(level=logging.INFO, format='[%(levelname)s] (%(threadName)-10s) %(message)s', )


RABBIT_HOST = '192.168.54.136'
RABBIT_PORT = 5672
TOTAL_MSGS = 5000
MAX_RECONN_ATTEMPTS = 300000
MAX_PUBLISH_ATTEMPTS = 10

num_rabbits = int(sys.argv[1]) if len(sys.argv) > 1 else 5

class JepsenRabbitmq():
    '''
    Is Pika thread safe?

    Pika does not have any notion of threading in the code. If you want to use Pika with threading,
    make sure you have a Pika connection per thread, created in that thread. It is not safe to
    share one Pika connection across threads.
    '''
    def __init__(self, rabbit_port):
        logging.info('rabbitmq client at %i', rabbit_port)
        self.rabbit_hole = rabbit_port
        self.sent = []
        self.failed = []
        self.dig()

    def dig(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBIT_HOST, port=self.rabbit_hole))
        self.channel = self.connection.channel()
        # for publish:
        # jepsen uses confirmation trackings:   (lco/select ch) ; Use confirmation tracking
        # is this the way to enable this in python:

        # http://www.rabbitmq.com/extensions.html#confirms
        self.channel.confirm_delivery()  # Turn on RabbitMQ-proprietary Confirm mode in the channel; Enabled delivery confirmations

        # https://github.com/jepsen-io/jepsen/blob/master/rabbitmq/src/jepsen/rabbitmq.clj#L132
        self.channel.queue_declare(queue='jepsen.queue', durable=True, auto_delete=False, exclusive=False)

    def enqueue(self, message):
        logging.debug('sending %s', message)
        # see http://pika.readthedocs.io/en/0.10.0/examples/blocking_delivery_confirmations.html
        res = self.channel.basic_publish(exchange='',
                                             routing_key='jepsen.queue',
                                             body=message,
                                             properties=pika.BasicProperties(
                                                 delivery_mode=2,  # make message persistent
                                                 ## in jepsen:     :content-type  "application/edn"
                                                 # content_type='text/plain'
                                             ),
                                             mandatory=True)
        if not res:
            raise Exception('Message could not be confirmed')
            # TODO 5sec timeout
            # (if (lco/wait-for-confirms ch 5000)     # ; Block until message acknowledged

    def dequeue(self):
        """Given a channel and an operation, dequeues a value and returns the
        corresponding operation.
          ; auto-ack dynamics mean that even if we issue a dequeue req then crash,
          ; the message should be re-delivered and we can count this as a failure.
        """
        self.channel.basic_qos(prefetch_count=1)
        # see http://pika.readthedocs.io/en/0.10.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.basic_get
        method, header, body = self.channel.basic_get(queue='jepsen.queue', no_ack=False)
        if method:
            self.channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
        return body

        # TODO timeout
    def close(self):
        try:
            self.connection.close()
        except:
            logging.warning('failed to close the connection')

class JepsenProducer(JepsenRabbitmq):
    def __init__(self, rabbit_port):
        JepsenRabbitmq.__init__(self, rabbit_port)

        self.thread = threading.Thread(name='rabbit %i' % rabbit_port, target=self._test)

    def test(self):
        logging.info('starting producer')
        self.thread.start()

    def _test(self):
        self._produce()
        self.report()
        self.close()

    def wait_for_test_to_complete(self, timeout = None):
        self.thread.join(timeout)
        logging.info('producer finished work')

    # (timeout 5000 (assoc op :type :fail :value :timeout)
    #           (let [[meta payload] (lb/get ch queue)
    #                 value          (codec/decode payload)]
    #             (if (nil? meta)
    #               (assoc op :type :fail :value :exhausted)
    #               (assoc op :type :ok :value value)))))
    # timeout = 5
    # def on_timeout():
    #    global connection
    #    connection.close()
    # connection.add_timeout(timeout, on_timeout)
    # if basic_get returns than clear the timeout
    def _produce(self):
        logging.info('sending messages')
        for i in range(TOTAL_MSGS):
            msg = i + TOTAL_MSGS * (self.rabbit_hole - RABBIT_PORT)
            for publish_attempt in range(MAX_PUBLISH_ATTEMPTS):
                try:
                    self.enqueue(str(msg))
                    self.sent.append(msg)
                    time.sleep(0.1)
                    break
                    # (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
                except pika.exceptions.ConnectionClosed as c:
                    for conn_attempt in range(MAX_RECONN_ATTEMPTS):
                        try:
                            logging.debug('trying to re-open connection')
                            self.dig()
                            break
                        except:
                            time.sleep(conn_attempt)
                            # time.sleep(1)
                    else: # failed to re-open
                        self.failed.append(i)
                        logging.error('failed to send %i, cannot connect to rabbit, stopping the test' % msg, c)
                        return
                except exceptions.KeyboardInterrupt as ke:
                    logging.warning('keyboard exception, stopping msg publishing')
                    # failed.append(msg)
                    return
                # except Exception as e:
                except:
                    e = sys.exc_info()[0]
                    if publish_attempt == MAX_PUBLISH_ATTEMPTS - 1:
                        logging.exception('failed to send %i' % msg)
                        self.failed.append(i)
                    else:
                        logging.exception('bumpy road',)
                        time.sleep(1)

    def report(self):
        logging.info('failed: %s', self.failed)
        logging.info('sent: %i, failed: %i, total: %i' , len(self.sent), len(self.failed), TOTAL_MSGS)

class JepsenConsumer(JepsenRabbitmq):
    def __init__(self, rabbit_port, all_sent, all_failed):
        JepsenRabbitmq.__init__(self, rabbit_port)
        self.failed = all_failed
        self.sent = all_sent
        self.lost = set()
        self.unexpected = []

    def wrapup(self):
        # i = raw_input('press ENTER to drain the queue')

        try:
            self._drain()
            logging.warning('RECEIVED: %i, unexpected: %i. %s, LOST MESSAGES %i, %s',
                            len(self.sent), len(self.unexpected), self.unexpected, len(self.lost), sorted(list(self.lost)))
        except:
            logging.exception('failed to drain')
        finally:
            self.close()

    def _drain(self):
        # i = raw_input('press ENTER to drain the queue')

        received = set()
        # new connection and channel?
        self.dig()

        while True:
            r = self.dequeue()
            if not r or len(r) == 0:
                break
            i = int(r)
            if i in received:
                self.unexpected.append(i)
            received.add(i)

        self.lost.update(set(self.sent).difference(received))

rabbits = []
for r in range(num_rabbits):
    rabbits.append(JepsenProducer(RABBIT_PORT + r))
for r in rabbits:
    r.test()

# wait for tests to complete
for r in rabbits:
    r.wait_for_test_to_complete()

time.sleep(60)
all_sent = []
all_failed = []
for r in rabbits:
    all_sent.extend(r.sent)
    all_failed.extend(r.failed)

c = JepsenConsumer(RABBIT_PORT, all_sent, all_failed)
c.wrapup()
print('done')

The code uses separate threads to send messages to rabbitmq, then waits for all to finish before collecting all messages. To introduce network problems while the test is running, I use blockade in another terminal:

for i in `seq 6`
do 
    echo 'creating partition'
    sudo blockade partition jepsen_rabbit1_1,jepsen_rabbit2_1,jepsen_rabbit3_1 jepsen_rabbit4_1,jepsen_rabbit5_1
    sudo blockade status
    sleep 60
    echo 'healing partition'
    sudo blockade join
    sudo blockade status
    sleep 60
done

 

Results

The clients connects and starts sending messages:

/usr/bin/python2.7 src/jepsen_test.py
[INFO] (MainThread) rabbitmq client at 5672
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5673
[INFO] (MainThread) Connecting to 192.168.54.136:5673
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5674
[INFO] (MainThread) Connecting to 192.168.54.136:5674
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5675
[INFO] (MainThread) Connecting to 192.168.54.136:5675
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5676
[INFO] (MainThread) Connecting to 192.168.54.136:5676
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) starting producer
[INFO] (rabbit 5672) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5673) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5674) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5675) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5676) sending messages

then when rabbitmq detects a partition (pause minority), clients loses a connection:

[WARNING] (rabbit 5675) Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=jepsen.queue'])>; properties=<BasicProperties(['delivery_mode=2'])>; body_size=5; body_prefix='15310'
[WARNING] (rabbit 5676) Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=jepsen.queue'])>; properties=<BasicProperties(['delivery_mode=2'])>; body_size=5; body_prefix='20312'
[WARNING] (rabbit 5676) Socket closed when connection was open
[WARNING] (rabbit 5676) Disconnected from RabbitMQ at 192.168.54.136:5676 (320): CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
[CRITICAL] (rabbit 5676) Connection close detected; result=BlockingConnection__OnClosedArgs(connection=, reason_code=320, reason_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")

screen-shot-2016-12-21-at-10-39-03

Then after 60sec the partition is healed and the clients connects again:

[INFO] (rabbit 5675) Connecting to 192.168.54.136:5675
[INFO] (rabbit 5675) Created channel=1

End result, all 25000 messages sent:

[INFO] (rabbit 5672) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5673) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5674) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5675) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5676) sent: 5000, failed: 0, total: 5000

Note that rabbitmq reports 25020 messages:

screen-shot-2016-12-26-at-13-02-41

That’s ok, some are duplicates.

[WARNING] (MainThread) RECEIVED: 25000, DUPLICATE: 20. [20123, 15121, 15630, 20644..., 22713], LOST MESSAGES 0, []

Bottom line: 0 messages lost. I’ve found a great post by Balint Pato where he explained how he managed to reproduce the Jepsen results. Unfortunately, his code and setup are not publicly available, so until he gets it out, I’ll try to reproduce his results. Stay tuned, more posts coming..

5.1.17 UPDATE: second post available where I try different partitioning schemes.

Smashing CouchBase

You might remember year 2k for Bill Gates stepping down as CEO (and promoting Steve Balmer), American Beauty winning 5 Oscars, liberation of Lebanon after 22 years of Israeli occupation, end of RSA patent, Bill Clinton becoming the first U.S. president to visit Vietnam, and I will certainly remember it for the Bulldozer Revolution in my country which lead to the resignation of Slobodan Milošević

cloud / big dataThat year marks a big milestone in the world of software engineering: in a conference keynote, prof. Eric Brewer postulated the now famous and much discussed CAP theorem (btw it was proven 2 years later by Gilbert and Lynch), and paved the way for the NoSQL revolution.

Another big factor for NoSQL was getting away from the relational model (proposed way back in 1969) and ACID to achieve scale, performance. Having felt the pain of working with object-relational mappers, developing a webapp on MongoDB for the first time was such a pleasant experience.

Unfortunately (most of) the current NoSQL systems (read MongoDB, Riak, Cassandra, …) force developers to deal with messy aspects of consistency, concurrency, non-zero latencies, weird corner cases of failure scenarios, not to mention designing CRDTs, or asking some philosophical questions like how-eventual-is-eventual-consistency (there’re some promising developments like Google’s Spanner and HyperDex, but haven’t had time to have a look).

I’ve successfully managed to avoid dealing with these thorny issues for a few years now, but this fantastic series of articles called Jepsen by Kyle Kingsbury got me motivated to address these using CouchBase. His series of articles is not only eye-opening from the developer’s point of view, but it also reveals some shocking discoveries, namely some databases claim things they don’t provide (Redis being a CP system, MongoDB being safe, etc).

I’m not smart enough to prove that a database is broken by design, and even if I was it would require more time than I afford to invest, so Kyle’s experimental approach is the preferred way.  Since I don’t speak Clojure, the implementation language of his choice for Jepsen, this will be done in Java (yeah, I know, it’s not that cool anymore, but it gets the job done).

Cluster setup

The first cluster setup was just like Kyle’s using LXC, but I ran into some networking issues, one node would just deny connection attempts, so I ended up using 5 VMs on my Mac laptop.

For a detailed explanation of what and how this test is supposed to work have a look at http://aphyr.com/posts/281-call-me-maybe-carly-rae-jepsen-and-the-perils-of-network-partitions. Here’s a quick recap:

A five node cluster of Ubuntu 14.04 machines, running in VMs (VmWare or VirtualBox). VMs – 192.168.1.131 – 192.168.1.135 (named n1-n5 – add these entries to /etc/hosts on your computer and on each of the nodes themselves).

To cause a partition, Kyle makes “nodes n1 and n2 to drop IP traffic from n3, n4, and n5–essentially by running”:

iptables -A INPUT -s n3 -j DROP
iptables -A INPUT -s n4 -j DROP
iptables -A INPUT -s n5 -j DROP

In my experiments, I wanted to have ssh access all the time on all VMs so I drop a port range that covers all CouchBase ports (they should be above 1024), like this:

iptables -A INPUT -p tcp --dport 200:65535 -j DROP

To resolve the partition, we’ll reset iptables chains to their defaults:

iptables -F

A client will be a simple app that appends integers to a single document – read & append number & use cas to update, and record if the db acknowledged the write or there was an error. At the end I’ll compare if the db and client apps agree on the state, or acknowledged writes are missing and/or unacknowledged writes are present.

A simple time-out test first

Before going with the full cluster, let’s test the code against a single node cluster running CouchBase 2.5.1: 256MB ram; default bucket is type Couchbase, 256MB ram quota, 0 replicas, 2 workers, flush enabled.

The client app will use Java SDK 1.4.3 and Oracle JDK 1.7.0_55.

CouchTimeoutTest first opens a connection to CouchBase – CouchbaseClient with the OP timeout of 10sec, sets an empty JSON document with a single array element, appends 50 numbers to the array, then makes the node drop all traffic for CouchBase ports, and tries to add 150 more elements to the array:

public void run() {
    cluster.getHealthy();
    setEmptyDocument();
    appendToDocument(50);
    cluster.uniDirectionalPartition();
    cluster.scheduleHealAndShutdownIn(15);
    //sleep while the node recovers?
    appendToDocument(150);
    client.shutdown();
    log.info("DONE!");
}

All operations are synchronous. The Java SDK driver docs describe how to deal with time-outs: “When a time-out occurs, most of the synchronous methods on the client return a RuntimeException showing a time-out as the root cause.”, so the appendToDocument implementation looks like this:

String addElementToArray(Integer nextVal, int numAttempts) {
        long start = System.currentTimeMillis();
        for (int i = 0; i < numAttempts; i++) {
            CASValue<Object> casValue = client.gets(key);
            String updatedValue = appendElement(casValue.getValue().toString(), nextVal);
            try {
                final CASResponse casResponse = client.cas(key, casValue.getCas(), updatedValue);
                switch (casResponse) {
                    case OK:
                        log.info("added " + nextVal + " in " + (System.currentTimeMillis() - start));
                        return updatedValue;
                    case EXISTS:
                        log.debug("retrying " + nextVal);
                        break;
                    default:
                        log.error("error trying to add " + nextVal + ": " + casResponse);
                        return null;
                }
            } catch (IllegalStateException ex) {
                log.warn("outpacing the network, backing off: " + ex);
                try {
                    Thread.currentThread().sleep(1000 * random.nextInt(5));
                } catch (InterruptedException e) {
                    log.warn("interrupted while sleeping, failed to add " + nextVal, e);
                    Thread.currentThread().interrupt();
                    return null;
                }
            } catch (RuntimeException ex) {
                if (ex.getCause() instanceof TimeoutException) {
                    log.error("timed out adding " + nextVal);
                } else {
                    log.error("error adding " + nextVal, ex);
                    return null;
                }
            }
        }
        log.error("failed to append " + nextVal + " in " + numAttempts + " attempts");
        return null;
    }

Test result

The client is configured to timeout after 10sec, and the partition lasts for 15s so I expect to see some timeouts and retries. In the end the document should have 200 integers in the array. Manually checking the iptables rules I see the code correctly heals the node. What do you think happened when I ran this code (using maven exec plugin, mvn test -Pcouch-timeout)?

  1. a few operations timed out and then succeeded
  2. a few operations timed out and then failed
  3. CouchbaseClient failed to reconnect so only 50 integers were written
  4. an infinite loop

Believe it or not it’s the 4th option. CouchbaseClient never timeouts or throws anything, just keeps running forever broadcasting noops to probably keep the connection alive:

2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Handling IO for:  sun.nio.ch.SelectionKeyImpl@208ee9c9 (r=true, w=false, c=false, op={QA sa=dimsplatform.com/192.168.1.128:11210, #Rops=1, #Wops=0, #iq=0, topRop=Cmd: 10 Opaque: 354, topWop=null, toWrite=0, interested=1})
2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Read 24 bytes
2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Completed read op: Cmd: 10 Opaque: 354 and giving the next 0 bytes
2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Done dealing with queue.

The cluster is healthy, node up and running; if I spin another client it reads/writes without any problems.

What happens if the server is unavailable for some amount of time less than the configured timeout (10sec in this test)? The magic number seems to be 7, for anything longer the test deadlocks.

The SDK docs are pretty clear in that “when a time-out occurs, most of the synchronous methods on the client return a RuntimeException“.

It turned out (thanks Michael) the problem was the test code – getting the doc was outside any try/catch and somehow the exceptions were getting swallowed (not even showing in the log?!). The fix is:

couch_timeout_test_bugfix

With this change, the test runs as expected, a few ops time out but eventually all succeed:

...
2014-08-25 11:52:04,749 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 49 in 1
2014-08-25 11:52:04,750 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 50 in 1
2014-08-25 11:52:04,750 INFO [com.milansimonovic.jepsen.Cluster] - creating 1 way partition
2014-08-25 11:52:05,753 INFO [com.milansimonovic.jepsen.Cluster] - creating partition DONE
2014-08-25 11:52:05,753 INFO [com.milansimonovic.jepsen.Cluster] - scheduled heal in 15 seconds
2014-08-25 11:52:15,755 ERROR [com.milansimonovic.jepsen.CouchTimeoutTest] - timed out adding 51
2014-08-25 11:52:20,754 INFO [com.milansimonovic.jepsen.Cluster] - healing partition
2014-08-25 11:52:21,757 INFO [com.milansimonovic.jepsen.Cluster] - healing partition - DONE
2014-08-25 11:52:21,757 INFO [com.milansimonovic.jepsen.Cluster] - closing connections
2014-08-25 11:52:25,758 ERROR [com.milansimonovic.jepsen.CouchTimeoutTest] - timed out adding 51

2014-08-25 11:52:26,667 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 51 in 21035
2014-08-25 11:52:26,669 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 52 in 2
...
2014-08-25 11:52:27,072 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 200 in 1
2014-08-25 11:52:27,073 INFO [com.couchbase.client.CouchbaseConnection] - Shut down Couchbase client
2014-08-25 11:52:27,074 INFO [com.couchbase.client.ViewConnection] - I/O reactor terminated
2014-08-25 11:52:27,074 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - DONE!

The next article will explore how CouchBase behaves when this test runs against a 5 node cluster.