Benchmarking Node.js app running on DigitalOcean

In this post I talk about an unexpected finding from 2021, discovered while rearchitecting and benchmarking a backend API running on DigitalOcean.

The starting point was an expressjs app running on a Digital Ocean droplet:

Before making any changes I do the initial benchmark to establish the baseline and got about 10 req/sec throughput.

After a couple of rounds of optimizations the throughput went up to 100 rps. Good but not enough.

Eventually I figured a way to replace MySQL and Redis with plain files and keep everything in memory. Excited about possible blazing performance I was shocked when the next benchmark reported ~300 rps. What am I missing here 🤔 Ah, perhaps nginx wasn’t properly configured, so a round of tuning but still no major improvements. What else to try now? With no new ideas I went to bed.

The first thing that came to mind when I woke up was hey let’s try AWS. Quickly spin up a new EC2 with similar resources and run a test:

$ ./wrk –latency -t10 -c1000 -d30s ‘http://my-api/endpoint’
Running 30s test
10 threads and 1000 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 481.52ms 211.37ms 1.87s 61.73%
Req/Sec 207.61 49.19 464.00 73.68%
60779 requests in 30.09s, 121.22MB read
Requests/sec: 2019.81

Wow, 2000 rps, compared to 300 on DigitalOcean, what the heck. At this moment DigitalOcean looks suspicious, so in order to test this hypothesis I tested nginx serving a static file on DO and AWS:

An nginx benchmark on an AWS c5.xlarge instance (no config tweaks):

$ ./wrk --latency -t10 -c300 -d30s 'http://MY_EC2_IP'
Running 30s test @ http://MY_EC2_IP
  10 threads and 300 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    40.32ms    2.39ms 288.38ms   98.00%
    Req/Sec   739.19     43.71     0.91k    83.03%
  220828 requests in 30.04s, 180.89MB read
Requests/sec:   7351.19

Same test but nginx running on a 80$ DigitalOcean droplet:

./wrk --latency -t10 -c300 -d30s 'http://MY_DROPLET_IP'
Running 30s test @ http://MY_DROPLET_IP
  10 threads and 300 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   704.66ms  224.93ms   2.00s    90.89%
    Req/Sec    44.66     29.67   290.00     73.40%
  12426 requests in 30.05s, 132.32MB read
112
Requests/sec:    413.54

So 413rps vs 7351rps, that’s an order of magnitude! I could not get more than 2000 rps from DigitalOcean’s networking infrastructure, even tried their most expensive load balancer.

droplet-1 $ ./wrk --latency -t3 -c300 -d20s 'http://MY_DROPLET_IP/api/listing'
Running 20s test @ http://MY_DROPLET_IP/api/listing
  3 threads and 300 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    38.38ms   94.40ms   1.40s    96.61%
    Req/Sec     4.23k   515.62     5.01k    81.50%
  252290 requests in 20.03s, 376.54MB read
Requests/sec:  12595.20

There’s some throttling when i hit nginx from outside. Load testing nginx for a static file outside digital ocean gives only 700 rps:


aws-ec2-1 $ ./wrk --latency -t10 -c300 -d30s 'http://MY_DROPLET_IP'
Requests/sec: 642.61
Transfer/sec: 6.84MB

Load testing from the droplet itself::


MY_DROPLET_IP $ ./wrk --latency -t3 -c300 -d10s 'http://10.120.0.10'
Running 10s test @ http://10.120.0.10
  3 threads and 300 connections
  483768 requests in 10.04s, 722.02MB read
Requests/sec:  48176.13

From a droplet in another region:

root@milanLoadTest:~# ./wrk --latency -t10 -c300 -d30s 'http://138.68.235.33/index.html'
Running 30s test @ http://138.68.235.33/index.html
  10 threads and 300 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   704.66ms  224.93ms   2.00s    90.89%
    Req/Sec    44.66     29.67   290.00     73.40%
  12426 requests in 30.05s, 132.32MB read
Requests/sec:    413.54

Conclusion: DigitalOcean has a serious throughput limit.

No parts of this post have been written by AI.

Quantifying the effect of location on Swiss multi-family house prices

It doesn’t take long before one discovers that in real-estate location is key:

Location, location, location. You may have heard this mantra when talking to an agent about the home values. In a nutshell, it means homes can vary widely in value due to their location. For example, the median cost of a single-family home in Decatur, Ill., is $107,900. The median cost of a single-family home in the Honolulu, Hawaii, area is $813,500. Location is essential when it comes to the value of a property.

thebalance.com/what-location-means-in-real-estate-1798766

… only homebuyers who choose the best locales will be holding the most valuable property that also depreciates at a much slower rate. This difference in value is largely a result of a home’s location.

investopedia.com

Recently I worked on real-estate price prediction at CrowdHouse. We’d developed an in-house method to quickly qualify incoming properties (exclusively multi-family ones). It’s mostly based on construction/renovation year and location. Our analysts make heavy use of a thing called “micro/macro location“: https://www.wuestpartner.com/data/ratings. In short, it’s a single number representing location’s quality (1-6, the higher the better), based on a number of factors such as public transport infrastructure, schools, noise, shops, leisure possibilities, etc. Here’s our blog post explaining the concept in more detail: https://crowdhouse.ch/de/blog/lagebeurteilung-von-immobilien-mikro-und-makrobetrachtungen/ , but you can imagine it as a heat-map:

hypothetical macro-location heat-map, source: immomapper.ch

While benchmarking our price estimator on some 160+ properties, I looked into micro/macro location <-> price correlation, expecting to see a strong one, because the better the location the higher the price, right? Surprise, surprise:

0.13 and -0.05, in other words nothing, nix, nada, zip. Property price has nothing to do with its (macro/micro) location.

To put this into perspective, consider these: property’s construction year and price correlate with ~0.3; rental income and price ~0.98.

How’s that possible? Couple of reasons:

  • inaccurate data (this is for sure the case, but not sure to which extent)
  • our data sample is not representative
  • the differences are so small that they don’t show up, i.e. all locations are pretty good
  • these models behind micro/macro location are incorrect
  • bug in my python script

It’s possible that these micro/macro location models do not even try to capture pricing information, but some other qualities, such as how fast does it sell, how well does it maintain its value, etc.

And yes, correlation does not mean causation:

spurious correlations, source tylervigen.com

Jepsen-testing RabbitMQ [with python] – Part 2

This is the second post about my efforts to reproduce the Jepsen RabbitMQ test (using python). The first one failed to reproduce data loss by cutting the network in half the same way every time. Here I’ll try different partitioning schemes.

Random blockade partitions

First, let’s try the blockade’s random partitions:

$ #!/bin/bash
$ for i in `seq 5`
do 
  echo 'creating partition'
  sudo blockade partition --random
  sudo blockade status
  sleep 60
  echo 'healing partition'
  sudo blockade join
  sudo blockade status
  sleep 60
done

This is implemented as a nemesis in blockade_random_partitions-rabbitmq-test.py.

Running the test and yet again no messages were lost:

[WARNING] (MainThread) RECEIVED: 25000, unexpected: 19. [15042, 20042, 41, 10041, 5042, 10590, 20609, 604, 15630, 16155, 21134, 1134, 11709, 21709, 22240, 2496, 7498, 12279, 17532], LOST MESSAGES 0, []

Jepsen random halves

Let’s try to cut the network into random halves like in the original Jepsen test:

Meanwhile, the nemesis cuts the network into random halves every sixty seconds, then repairs it for sixty seconds, and so on.

This is implemented in random-majority-rabbitmq-test.py

Running this test and yet again no messages were lost.

Baliant Pato’s partitions

In an excellent post, Baliant Pato explained how he managed to reproduce the data loss. Let’s try Baliant Pato’s partitions (in a 3 node cluster: rabbit1, rabbit2, rabbit3 with pause_minority)

  1. Publisher A connecting to rabbit1
  2. partition rabbit3 away from rabbit2 and rabbit1 (now rabbit3 is stale pretty quickly, he’s gonna be the future “stale node”, >evil laughter<.)
  3. partition rabbit1 away from rabbit2 and rabbit3 (at this point the cluster is stopped, the final two minorities are in sync but dead)
  4. partition rabbit2 away from rabbit1 and rabbit3
  5. heal rabbit3 (cluster is still stopped)
  6. heal rabbit2

For this test I need 3 instead of 5 rabbitmq nodes (make a copy of docker-compose.yml and then sudo docker-compose -f docker-compose-3_nodes.yml up). I’ll be using 3 instead of 2 producers, one per rabbitmq node, and a single queue with a default exchange mirrored to all the nodes (“ha-mode”:”all”).

The partitioning scheme is:

  1. sudo blockade partition n1,n2 n3
  2. sudo blockade partition n1 n2 n3
  3. sudo blockade partition n1 n2,n3
  4. sudo blockade join

Implemented as a nemesis in Baliant_Pato_partitioning-rabbitmq-test.py:

t = sorted(nodes)

# partition rabbit3 away from rabbit2 and rabbit1
yield ['%s,%s' % (t[0], t[1]), t[2]]
# partition rabbit1 away from rabbit2 (rabbit3 already away)
yield t[:]
# join rabbit3 with rabbit2:
yield [t[0], '%s,%s' % (t[1], t[2])]
# finally join (all nodes belong to the same partition)
while True:
    yield [','.join(t)]Running the test:
$ sudo python src/Baliant_Pato_partitioning-rabbitmq-test.py 3
...
[ERROR] (rabbit 5674) failed to send 10041, cannot connect to rabbit, stopping the test
[INFO] (rabbit 5672) sent: 785, failed: 1, total: 5000
[ERROR] (rabbit 5673) failed to send 5783, cannot connect to rabbit, stopping the test
[INFO] (rabbit 5673) sent: 783, failed: 1, total: 5000
[INFO] (MainThread) 1609 messages sent, 3 failed
[WARNING] (MainThread) RECEIVED: 1612, unexpected: 0. [], LOST MESSAGES 0, []

rabbit2&rabbit3 are down, producers cannot finish the test.. let’s run one more time:

$ sudo python src/Baliant_Pato_partitioning-rabbitmq-test.py 3
...
[INFO] (MainThread) producers done
[INFO] (MainThread) draining the queue
[INFO] (MainThread) 15000 messages sent, 0 failed
[INFO] (MainThread) rabbitmq client at 192.168.54.136:5672
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[WARNING] (MainThread) RECEIVED: 5841, unexpected: 1. [799], LOST MESSAGES 9159

This leaves rabbit2 & rabbit3 again down.

# rabbit2 logs: 
...
=WARNING REPORT==== 5-Jan-2017::08:23:18 ===
Mirrored queue 'jepsen.queue' in vhost '/': Stopping all nodes on master shutdown since no synchronised slave is available

=INFO REPORT==== 5-Jan-2017::08:23:25 ===
Stopped RabbitMQ application

=ERROR REPORT==== 5-Jan-2017::08:25:19 ===
** Node rabbit@rabbit1 not responding **
** Removing (timedout) connection **

=ERROR REPORT==== 5-Jan-2017::08:25:33 ===
Mnesia(rabbit@rabbit2): ** ERROR ** mnesia_event got {inconsistent_database, starting_partitioned_network, rabbit@rabbit3}

=INFO REPORT==== 5-Jan-2017::08:25:33 ===
Starting RabbitMQ 3.6.5 on Erlang R16B03-1
Copyright (C) 2007-2016 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/
...

Mirrored queue 'jepsen.queue' in vhost '/': Adding mirror on node rabbit@rabbit3: <32532.2772.0>

hm, rabbit2 synchronized with rabbit3.

osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n3 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]},
 {cluster_name,<<"rabbit@rabbit1">>},
 {partitions,[]},
 {alarms,[{rabbit@rabbit2,[]},{rabbit@rabbit3,[]}]}]
osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n2 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]},
 {cluster_name,<<"rabbit@rabbit1">>},
 {partitions,[]},
 {alarms,[{rabbit@rabbit3,[]},{rabbit@rabbit2,[]}]}]
osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n1 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
 {running_nodes,[rabbit@rabbit1]},
 {cluster_name,<<"rabbit@rabbit1">>},
 {partitions,[]},
 {alarms,[{rabbit@rabbit1,[]}]}]
osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n2 rabbitmqctl list_queues
Listing queues ...
jepsen.queue 10801

So rabbit2 & rabbit3 think rabbit1 is dead and vice versa. If i connect to rabbit2 and drain the queue, then no messages are lost.

But wasn’t rabbitmq supposed to have detected a partition and report it? As in:

osboxes@osboxes:~/jepsen-rabbitmq$ sudo docker exec -it n1 rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit1]},
{cluster_name,<<"rabbit@rabbit1">>},
{partitions,[{rabbit@rabbit1,[rabbit@rabbit2,rabbit@rabbit3]}]},
{alarms,[{rabbit@rabbit1,[]}]}]

 

pause_minority is supposedly an “automatic partition handling” mechanism (excerupt from the rabbitmq.com/partitions.html)

RabbitMQ also offers three ways to deal with network partitions automatically: pause-minority mode, pause-if-all-down mode and autoheal mode.

In pause-minority mode RabbitMQ will automatically pause cluster nodes which determine themselves to be in a minority (i.e. fewer or equal than half the total number of nodes) after seeing other nodes go down. … The minority nodes will pause as soon as a partition starts, and will start again when the partition ends.

If you after reading this assume (like I did) that no intervention is needed then you’re up for a surprise. I ended up here with the rabbit2&rabbit3 nodes reported as being down, and I have to manually restart them.

Jepsen-testing RabbitMQ [with python]

UPDATE: the code is now available on GitHub https://github.com/mbsimonovic/jepsen-python

In this post I’ll tell you about trying to reproduce the Jepsen RabbitMQ test (using python, not clojure).
It’s been more than 2 years since the test, and rabbitmq went from 3.3 to 3.6 meanwhile, so I was wondering if anything’s different these days (end of 2016).

Messaging is a legit communication pattern, as nicely documented in the Enterprise Integration Patterns (2003) book, and with the rise of microservices it’s even more relevant that it was 10-15yrs ago.

Seems like there’s a consensus about preferring smart endpoints and dumb pipes, so when it comes to picking up a messaging provider, there’s a few options: RabbitMQ, Apache Kafka and Apache ActiveMQ. Google Trends do back up this claim about the rise of interest:

 google-trends-rabbitmq-vs-kafka-vs-activemq

Back in 2013 Kyle Kingsbury started publishing a fantastic series of blog posts called Jepsen where he tested how databases behave in a distributed environment. Turned out bad for most of them, RabbitMQ included.

In short, under certain failure scenarios, it’s possible to lose data, and losing here means losing a rabbitmq acknowledged write.

I was wondering if today (December 2016) things are any different.

Setting up the environment (docker)

I’ve been using docker for almost 2 years now, so that was a natural choice for this test. I use Vmware Fusion to run ubuntu xenial, and run docker on ubuntu (you can grab an image from http://www.osboxes.org/ubuntu/).


# enable ssh
sudo apt-get install openssh-server
# login from your laptop
# update & upgrade
#
$ sudo apt-get -y install python-pip python-virtualenv python-dev
$ python --version Python 2.7.12
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 16.04.1 LTS
Release: 16.04
Codename: xenial
$ uname -a
Linux osboxes 4.4.0-53-generic #74-Ubuntu SMP Fri Dec 2 15:59:10 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
#
# install docker https://docs.docker.com/engine/installation/linux/ubuntulinux/

Jepsen has meanwhile gotten a docker setup so I tried that first:

$ ssh [email protected]
$ git clone https://github.com/jepsen-io/jepsen.git && cd jepsen/docker
$ ./up.sh # this runs docker-compose build and up, so in a new terminal:
$ sudo docker exec -it jepsen-control bash
root@control $ cd rabbitmq && lein test :only jepsen.rabbitmq-test

This failed with JSchException: Packet corrupt. ssh-ing once to each node and accepting the host key fixed this problem.
Trying lein test again failed with the message “rabbitmq-server: unrecognized service”. Looking at the console output, the test tried to install rabbitmq only on 3 nodes (out of 5). So I manually installed rabbit on the remaining 2 nodes and ran the test again. It now hangs while starting rabbit. I tried changing the rabbit-test.clj and adding :nodes [:n1 :n2 :n3 :n4 :n5] but it tries to start rabbit on 4 nodes and hangs again. Not familiar that much with clojure enough to debug so a Jepsen bug submitted.

Running a RabbitMQ cluster in Docker

First thing I need a rabbitmq cluster in docker, will start with bijukunjummen/docker-rabbitmq-cluster:

$ git clone https://github.com/bijukunjummen/docker-rabbitmq-cluster.git
$ cd docker-rabbitmq-cluster/base
$ sudo docker build -t bijukunjummen/rabbitmq-base .
# this hangs while installing plugins, seems like erlang freaks out when it's pid 1, so just add && true ad the end:
# RUN /usr/sbin/rabbitmq-plugins enable rabbitmq_mqtt rabbitmq_stomp rabbitmq_management rabbitmq_management_agent rabbitmq_management_visualiser rabbitmq_federation rabbitmq_federation_management sockjs && true
$ cd ../server
$ wget 'https://github.com/jepsen-io/jepsen/raw/master/rabbitmq/resources/rabbitmq/rabbitmq.config'
$ sudo docker build -t bijukunjummen/rabbitmq-server .
$ cd ../cluster
# edit docker-compose.yml

rabbit1:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit1
  ports:
    - "5672:5672"
    - "15672:15672"

rabbit2:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit2
  links:
    - rabbit1
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
  ports:
      - "5673:5672"
      - "15673:15672"

rabbit3:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit3
  links:
    - rabbit1
    - rabbit2
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   
  ports:
        - "5674:5672"

rabbit4:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit4
  links:
    - rabbit1
    - rabbit2
    - rabbit3
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1
  ports:
        - "5675:5672"

rabbit5:
  image: bijukunjummen/rabbitmq-server
  hostname: rabbit5
  links:
    - rabbit1
    - rabbit2
    - rabbit3
    - rabbit4
  environment: 
   - CLUSTERED=true
   - CLUSTER_WITH=rabbit1   
  ports:
        - "5676:5672"

Starting up a cluster using links didn’t work for me, some nodes would hang because they didn’t know
all other nodes (rabbit3 doesn’t know about rabbit5 because it was created earlier), so had to go with a DNS.

version: '2'
services:
  rabbit1:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit1
    ports:
      - "5672:5672"
      - "15672:15672"

  rabbit2:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit2
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5673:5672"
      - "15673:15672"
  rabbit3:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit3
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1   
    ports:
      - "5674:5672"

  rabbit4:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit4
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5675:5672"

  rabbit5:
    image: bijukunjummen/rabbitmq-server
    hostname: rabbit5
    environment: 
     - CLUSTERED=true
     - CLUSTER_WITH=rabbit1
    ports:
      - "5676:5672"

# need new docker-compose for version 2 syntax:
$ sudo apt-get install docker-compose
$ sudo docker-compose up

$ sudo docker-compose up
Creating jepsen_rabbit4_1
Creating jepsen_rabbit5_1
Creating jepsen_rabbit2_1
Creating jepsen_rabbit3_1
Creating jepsen_rabbit1_1
Attaching to jepsen_rabbit2_1, jepsen_rabbit3_1, jepsen_rabbit1_1, jepsen_rabbit5_1, jepsen_rabbit4_1
rabbit2_1  | Warning: PID file not written; -detached was passed.
rabbit3_1  | Warning: PID file not written; -detached was passed.
rabbit5_1  | Warning: PID file not written; -detached was passed.
rabbit4_1  | Warning: PID file not written; -detached was passed.
rabbit1_1  | 
rabbit1_1  |               RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
rabbit1_1  |   ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
rabbit1_1  |   ##  ##
rabbit1_1  |   ##########  Logs: /var/log/rabbitmq/[email protected]
rabbit1_1  |   ######  ##        /var/log/rabbitmq/[email protected]
rabbit1_1  |   ##########
rabbit1_1  |               Starting broker...
rabbit2_1  | Stopping node rabbit@rabbit2 ...
rabbit3_1  | Stopping node rabbit@rabbit3 ...
rabbit4_1  | Stopping node rabbit@rabbit4 ...
rabbit5_1  | Stopping node rabbit@rabbit5 ...
rabbit1_1  |  completed with 12 plugins.
rabbit3_1  | Clustering node rabbit@rabbit3 with rabbit@rabbit1 ...
rabbit2_1  | Clustering node rabbit@rabbit2 with rabbit@rabbit1 ...
rabbit4_1  | Clustering node rabbit@rabbit4 with rabbit@rabbit1 ...
rabbit5_1  | Clustering node rabbit@rabbit5 with rabbit@rabbit1 ...
rabbit3_1  | Starting node rabbit@rabbit3 ...
rabbit2_1  | Starting node rabbit@rabbit2 ...
rabbit4_1  | Starting node rabbit@rabbit4 ...
rabbit2_1  |  * sockjs
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit1 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit3 down
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | Keep rabbit@rabbit3 listeners: the node is already back
rabbit5_1  | Starting node rabbit@rabbit5 ...
rabbit3_1  |  * rabbitmq_management
rabbit3_1  |  * rabbitmq_web_dispatch
rabbit3_1  |  * webmachine
rabbit3_1  |  * mochiweb
rabbit3_1  |  * rabbitmq_mqtt
rabbit3_1  |  * rabbitmq_federation
rabbit3_1  |  * rabbitmq_stomp
rabbit3_1  |  * rabbitmq_management_agent
rabbit3_1  |  * amqp_client
rabbit3_1  |  * sockjs
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:49 ===
rabbit2_1  | rabbit on node rabbit@rabbit3 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit2_1  | rabbit on node rabbit@rabbit5 up
rabbit2_1  | 
rabbit2_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit2_1  | rabbit on node rabbit@rabbit4 up
rabbit5_1  |  * mochiweb
rabbit5_1  |  * rabbitmq_mqtt
rabbit5_1  |  * rabbitmq_federation
rabbit5_1  |  * rabbitmq_stomp
rabbit5_1  |  * rabbitmq_management_agent
rabbit5_1  |  * amqp_client
rabbit5_1  |  * sockjs
rabbit5_1  | 
rabbit5_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit5_1  | rabbit on node rabbit@rabbit4 up
rabbit4_1  |  * rabbitmq_management
rabbit4_1  |  * rabbitmq_web_dispatch
rabbit4_1  |  * webmachine
rabbit4_1  |  * mochiweb
rabbit4_1  |  * rabbitmq_mqtt
rabbit4_1  |  * rabbitmq_federation
rabbit4_1  |  * rabbitmq_stomp
rabbit4_1  |  * rabbitmq_management_agent
rabbit4_1  |  * amqp_client
rabbit4_1  |  * sockjs
rabbit3_1  | 
rabbit3_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit3_1  | rabbit on node rabbit@rabbit5 up
rabbit3_1  | 
rabbit3_1  | =INFO REPORT==== 26-Dec-2016::11:33:50 ===
rabbit3_1  | rabbit on node rabbit@rabbit4 up

 

After a few seconds the cluster should be up and running so open http://192.168.54.136:15672/ and login with guest/guest.

screen-shot-2016-12-26-at-12-02-37

Let’s run a simple hello world test:

$ sudo pip install pika
$ cat jepsen_produce.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='jepsen.queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
 routing_key='jepsen.queue',
 body=message,
 properties=pika.BasicProperties(
 delivery_mode = 2, # make message persistent
 ))
print(" [x] Sent %r" % message)
connection.close()
# 
#
$ cat jepsen_consume.py
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='jepsen.queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
 print(" [x] Received %r" % body)
 time.sleep(body.count(b'.'))
 print(" [x] Done")
 ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='jepsen.queue')

channel.start_consuming()
# 
$ python jepsen_produce.py 'hi there'
[x] Sent 'hi there'

$ python jepsen_consume.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hi there'
[x] Done

Blockade – Jepsen port in python

There’s a python port of Jepsen called blockade, so while waiting for jepsen-160, I’ve decided to write my own rabbitmq test using blockade.

With the cluster up and running let’s setup blockade to manage it:

$ # sudo pip install blockade; this has a bug that was fixed in master after the last 0.3.1 release
# so had to install from sources:
$ git clone https://github.com/dcm-oss/blockade.git
$ cd blockade && sudo python setup.py install
# add existing nodes named jepsen_rabbit{1..5}_1 to blockade:
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
83db717f40f5 bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 25672/tcp, 0.0.0.0:5673->5672/tcp, 0.0.0.0:15673->15672/tcp jepsen_rabbit2_1
7ec3390ea0fc bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 0.0.0.0:5672->5672/tcp, 9100-9105/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp jepsen_rabbit1_1
4599b42b5d9a bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5675->5672/tcp jepsen_rabbit4_1
4eaa2e7a76c2 bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5674->5672/tcp jepsen_rabbit3_1
aa409bf28eba bijukunjummen/rabbitmq-server "/bin/sh -c /opt/rabb" 2 days ago Up About a minute 4369/tcp, 9100-9105/tcp, 15672/tcp, 25672/tcp, 0.0.0.0:5676->5672/tcp jepsen_rabbit5_1
$
$ for i in `seq 1 5`; do sudo blockade add jepsen_rabbit${i}_1; done
$
$ sudo blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
jepsen_rabbit1_ 7ec3390ea0fc UP 172.20.0.2 UNKNOWN
1
jepsen_rabbit2_ 83db717f40f5 UP 172.20.0.3 UNKNOWN
1
jepsen_rabbit3_ 4eaa2e7a76c2 UP 172.20.0.4 UNKNOWN
1
jepsen_rabbit4_ 4599b42b5d9a UP 172.20.0.5 UNKNOWN
1
jepsen_rabbit5_ aa409bf28eba UP 172.20.0.6 UNKNOWN
1

 

The original jepsen test uses triple-mirrored writes so need to configure that:

Follow Milan Simonovic on Linkedin

screen-shot-2016-12-19-at-12-46-25

 

rabbitmq randomly chooses two more slaves for the queue, and it my case they were rabbit3 and rabbit5, you can find this on the queue tab, under Details.

Finally testing RabbitMQ

Let’s rewrite jepsen’s rabbit.clj test in python:

#!/usr/bin/env python
import pika
import sys
import time
import exceptions
import threading
import logging

logging.basicConfig(level=logging.INFO, format='[%(levelname)s] (%(threadName)-10s) %(message)s', )


RABBIT_HOST = '192.168.54.136'
RABBIT_PORT = 5672
TOTAL_MSGS = 5000
MAX_RECONN_ATTEMPTS = 300000
MAX_PUBLISH_ATTEMPTS = 10

num_rabbits = int(sys.argv[1]) if len(sys.argv) > 1 else 5

class JepsenRabbitmq():
    '''
    Is Pika thread safe?

    Pika does not have any notion of threading in the code. If you want to use Pika with threading,
    make sure you have a Pika connection per thread, created in that thread. It is not safe to
    share one Pika connection across threads.
    '''
    def __init__(self, rabbit_port):
        logging.info('rabbitmq client at %i', rabbit_port)
        self.rabbit_hole = rabbit_port
        self.sent = []
        self.failed = []
        self.dig()

    def dig(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBIT_HOST, port=self.rabbit_hole))
        self.channel = self.connection.channel()
        # for publish:
        # jepsen uses confirmation trackings:   (lco/select ch) ; Use confirmation tracking
        # is this the way to enable this in python:

        # http://www.rabbitmq.com/extensions.html#confirms
        self.channel.confirm_delivery()  # Turn on RabbitMQ-proprietary Confirm mode in the channel; Enabled delivery confirmations

        # https://github.com/jepsen-io/jepsen/blob/master/rabbitmq/src/jepsen/rabbitmq.clj#L132
        self.channel.queue_declare(queue='jepsen.queue', durable=True, auto_delete=False, exclusive=False)

    def enqueue(self, message):
        logging.debug('sending %s', message)
        # see http://pika.readthedocs.io/en/0.10.0/examples/blocking_delivery_confirmations.html
        res = self.channel.basic_publish(exchange='',
                                             routing_key='jepsen.queue',
                                             body=message,
                                             properties=pika.BasicProperties(
                                                 delivery_mode=2,  # make message persistent
                                                 ## in jepsen:     :content-type  "application/edn"
                                                 # content_type='text/plain'
                                             ),
                                             mandatory=True)
        if not res:
            raise Exception('Message could not be confirmed')
            # TODO 5sec timeout
            # (if (lco/wait-for-confirms ch 5000)     # ; Block until message acknowledged

    def dequeue(self):
        """Given a channel and an operation, dequeues a value and returns the
        corresponding operation.
          ; auto-ack dynamics mean that even if we issue a dequeue req then crash,
          ; the message should be re-delivered and we can count this as a failure.
        """
        self.channel.basic_qos(prefetch_count=1)
        # see http://pika.readthedocs.io/en/0.10.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.basic_get
        method, header, body = self.channel.basic_get(queue='jepsen.queue', no_ack=False)
        if method:
            self.channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
        return body

        # TODO timeout
    def close(self):
        try:
            self.connection.close()
        except:
            logging.warning('failed to close the connection')

class JepsenProducer(JepsenRabbitmq):
    def __init__(self, rabbit_port):
        JepsenRabbitmq.__init__(self, rabbit_port)

        self.thread = threading.Thread(name='rabbit %i' % rabbit_port, target=self._test)

    def test(self):
        logging.info('starting producer')
        self.thread.start()

    def _test(self):
        self._produce()
        self.report()
        self.close()

    def wait_for_test_to_complete(self, timeout = None):
        self.thread.join(timeout)
        logging.info('producer finished work')

    # (timeout 5000 (assoc op :type :fail :value :timeout)
    #           (let [[meta payload] (lb/get ch queue)
    #                 value          (codec/decode payload)]
    #             (if (nil? meta)
    #               (assoc op :type :fail :value :exhausted)
    #               (assoc op :type :ok :value value)))))
    # timeout = 5
    # def on_timeout():
    #    global connection
    #    connection.close()
    # connection.add_timeout(timeout, on_timeout)
    # if basic_get returns than clear the timeout
    def _produce(self):
        logging.info('sending messages')
        for i in range(TOTAL_MSGS):
            msg = i + TOTAL_MSGS * (self.rabbit_hole - RABBIT_PORT)
            for publish_attempt in range(MAX_PUBLISH_ATTEMPTS):
                try:
                    self.enqueue(str(msg))
                    self.sent.append(msg)
                    time.sleep(0.1)
                    break
                    # (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
                except pika.exceptions.ConnectionClosed as c:
                    for conn_attempt in range(MAX_RECONN_ATTEMPTS):
                        try:
                            logging.debug('trying to re-open connection')
                            self.dig()
                            break
                        except:
                            time.sleep(conn_attempt)
                            # time.sleep(1)
                    else: # failed to re-open
                        self.failed.append(i)
                        logging.error('failed to send %i, cannot connect to rabbit, stopping the test' % msg, c)
                        return
                except exceptions.KeyboardInterrupt as ke:
                    logging.warning('keyboard exception, stopping msg publishing')
                    # failed.append(msg)
                    return
                # except Exception as e:
                except:
                    e = sys.exc_info()[0]
                    if publish_attempt == MAX_PUBLISH_ATTEMPTS - 1:
                        logging.exception('failed to send %i' % msg)
                        self.failed.append(i)
                    else:
                        logging.exception('bumpy road',)
                        time.sleep(1)

    def report(self):
        logging.info('failed: %s', self.failed)
        logging.info('sent: %i, failed: %i, total: %i' , len(self.sent), len(self.failed), TOTAL_MSGS)

class JepsenConsumer(JepsenRabbitmq):
    def __init__(self, rabbit_port, all_sent, all_failed):
        JepsenRabbitmq.__init__(self, rabbit_port)
        self.failed = all_failed
        self.sent = all_sent
        self.lost = set()
        self.unexpected = []

    def wrapup(self):
        # i = raw_input('press ENTER to drain the queue')

        try:
            self._drain()
            logging.warning('RECEIVED: %i, unexpected: %i. %s, LOST MESSAGES %i, %s',
                            len(self.sent), len(self.unexpected), self.unexpected, len(self.lost), sorted(list(self.lost)))
        except:
            logging.exception('failed to drain')
        finally:
            self.close()

    def _drain(self):
        # i = raw_input('press ENTER to drain the queue')

        received = set()
        # new connection and channel?
        self.dig()

        while True:
            r = self.dequeue()
            if not r or len(r) == 0:
                break
            i = int(r)
            if i in received:
                self.unexpected.append(i)
            received.add(i)

        self.lost.update(set(self.sent).difference(received))

rabbits = []
for r in range(num_rabbits):
    rabbits.append(JepsenProducer(RABBIT_PORT + r))
for r in rabbits:
    r.test()

# wait for tests to complete
for r in rabbits:
    r.wait_for_test_to_complete()

time.sleep(60)
all_sent = []
all_failed = []
for r in rabbits:
    all_sent.extend(r.sent)
    all_failed.extend(r.failed)

c = JepsenConsumer(RABBIT_PORT, all_sent, all_failed)
c.wrapup()
print('done')

The code uses separate threads to send messages to rabbitmq, then waits for all to finish before collecting all messages. To introduce network problems while the test is running, I use blockade in another terminal:

for i in `seq 6`
do 
    echo 'creating partition'
    sudo blockade partition jepsen_rabbit1_1,jepsen_rabbit2_1,jepsen_rabbit3_1 jepsen_rabbit4_1,jepsen_rabbit5_1
    sudo blockade status
    sleep 60
    echo 'healing partition'
    sudo blockade join
    sudo blockade status
    sleep 60
done

 

Results

The clients connects and starts sending messages:

/usr/bin/python2.7 src/jepsen_test.py
[INFO] (MainThread) rabbitmq client at 5672
[INFO] (MainThread) Connecting to 192.168.54.136:5672
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5673
[INFO] (MainThread) Connecting to 192.168.54.136:5673
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5674
[INFO] (MainThread) Connecting to 192.168.54.136:5674
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5675
[INFO] (MainThread) Connecting to 192.168.54.136:5675
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) rabbitmq client at 5676
[INFO] (MainThread) Connecting to 192.168.54.136:5676
[INFO] (MainThread) Created channel=1
[INFO] (MainThread) starting producer
[INFO] (rabbit 5672) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5673) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5674) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5675) sending messages
[INFO] (MainThread) starting producer
[INFO] (rabbit 5676) sending messages

then when rabbitmq detects a partition (pause minority), clients loses a connection:

[WARNING] (rabbit 5675) Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=jepsen.queue'])>; properties=<BasicProperties(['delivery_mode=2'])>; body_size=5; body_prefix='15310'
[WARNING] (rabbit 5676) Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=jepsen.queue'])>; properties=<BasicProperties(['delivery_mode=2'])>; body_size=5; body_prefix='20312'
[WARNING] (rabbit 5676) Socket closed when connection was open
[WARNING] (rabbit 5676) Disconnected from RabbitMQ at 192.168.54.136:5676 (320): CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
[CRITICAL] (rabbit 5676) Connection close detected; result=BlockingConnection__OnClosedArgs(connection=, reason_code=320, reason_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")

screen-shot-2016-12-21-at-10-39-03

Then after 60sec the partition is healed and the clients connects again:

[INFO] (rabbit 5675) Connecting to 192.168.54.136:5675
[INFO] (rabbit 5675) Created channel=1

End result, all 25000 messages sent:

[INFO] (rabbit 5672) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5673) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5674) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5675) sent: 5000, failed: 0, total: 5000
[INFO] (rabbit 5676) sent: 5000, failed: 0, total: 5000

Note that rabbitmq reports 25020 messages:

screen-shot-2016-12-26-at-13-02-41

That’s ok, some are duplicates.

[WARNING] (MainThread) RECEIVED: 25000, DUPLICATE: 20. [20123, 15121, 15630, 20644..., 22713], LOST MESSAGES 0, []

Bottom line: 0 messages lost. I’ve found a great post by Balint Pato where he explained how he managed to reproduce the Jepsen results. Unfortunately, his code and setup are not publicly available, so until he gets it out, I’ll try to reproduce his results. Stay tuned, more posts coming..

5.1.17 UPDATE: second post available where I try different partitioning schemes.

Branding as an unfair advantage

There was an interesting point made at the previous Zürich Lean Startup meetup event (slides are available here), Michael Wiedemann argued branding could be used as an unfair advantage.

If you’re not familiar, the concept was introduced by Ash Maurya in his book Running Lean (quoting Jason Cohen):

A real unfair advantage is one that cannot easily be copied or bought.

There’re few examples mentioned in the book, such as insider information, personal authority, community, etc. I read that as something that cannot be quickly acquired even with a lot of money. Ash says it’s the hardest section to fill, so I was intrigued by how branding be used an unfair advantage.

The first argument was that only a handful startups do any branding so if you do it, you’ll be way ahead of others. That sounds more like a competitive advantage.

The second argument made more sense and went along these lines. In any new market, there’s an unused space to be filled, and usually whoever does that first will have a big advantage. Take Google for example. The word/brand has become a synonym for search. Or skype – to call someone (http://www.oxforddictionaries.com/definition/english/Skype).

Focusing early on branding might have another benefit: to force entrepreneurs to think about how to tell their story, and which idea to sell. This is nicely explained in Seth Godin’s All Marketers Are Liars. We don’t buy bottled water because it’s better than tap water, we buy it because it makes us feel healthier, smarter, and believe it or not – even sexy:

Successful marketing strategy for bottled water should then be framed around these lines: natural, pure, balanced, youthful (see this for a longer explanation).

Smashing CouchBase

You might remember year 2k for Bill Gates stepping down as CEO (and promoting Steve Balmer), American Beauty winning 5 Oscars, liberation of Lebanon after 22 years of Israeli occupation, end of RSA patent, Bill Clinton becoming the first U.S. president to visit Vietnam, and I will certainly remember it for the Bulldozer Revolution in my country which lead to the resignation of Slobodan Milošević

cloud / big dataThat year marks a big milestone in the world of software engineering: in a conference keynote, prof. Eric Brewer postulated the now famous and much discussed CAP theorem (btw it was proven 2 years later by Gilbert and Lynch), and paved the way for the NoSQL revolution.

Another big factor for NoSQL was getting away from the relational model (proposed way back in 1969) and ACID to achieve scale, performance. Having felt the pain of working with object-relational mappers, developing a webapp on MongoDB for the first time was such a pleasant experience.

Unfortunately (most of) the current NoSQL systems (read MongoDB, Riak, Cassandra, …) force developers to deal with messy aspects of consistency, concurrency, non-zero latencies, weird corner cases of failure scenarios, not to mention designing CRDTs, or asking some philosophical questions like how-eventual-is-eventual-consistency (there’re some promising developments like Google’s Spanner and HyperDex, but haven’t had time to have a look).

I’ve successfully managed to avoid dealing with these thorny issues for a few years now, but this fantastic series of articles called Jepsen by Kyle Kingsbury got me motivated to address these using CouchBase. His series of articles is not only eye-opening from the developer’s point of view, but it also reveals some shocking discoveries, namely some databases claim things they don’t provide (Redis being a CP system, MongoDB being safe, etc).

I’m not smart enough to prove that a database is broken by design, and even if I was it would require more time than I afford to invest, so Kyle’s experimental approach is the preferred way.  Since I don’t speak Clojure, the implementation language of his choice for Jepsen, this will be done in Java (yeah, I know, it’s not that cool anymore, but it gets the job done).

Cluster setup

The first cluster setup was just like Kyle’s using LXC, but I ran into some networking issues, one node would just deny connection attempts, so I ended up using 5 VMs on my Mac laptop.

For a detailed explanation of what and how this test is supposed to work have a look at http://aphyr.com/posts/281-call-me-maybe-carly-rae-jepsen-and-the-perils-of-network-partitions. Here’s a quick recap:

A five node cluster of Ubuntu 14.04 machines, running in VMs (VmWare or VirtualBox). VMs – 192.168.1.131 – 192.168.1.135 (named n1-n5 – add these entries to /etc/hosts on your computer and on each of the nodes themselves).

To cause a partition, Kyle makes “nodes n1 and n2 to drop IP traffic from n3, n4, and n5–essentially by running”:

iptables -A INPUT -s n3 -j DROP
iptables -A INPUT -s n4 -j DROP
iptables -A INPUT -s n5 -j DROP

In my experiments, I wanted to have ssh access all the time on all VMs so I drop a port range that covers all CouchBase ports (they should be above 1024), like this:

iptables -A INPUT -p tcp --dport 200:65535 -j DROP

To resolve the partition, we’ll reset iptables chains to their defaults:

iptables -F

A client will be a simple app that appends integers to a single document – read & append number & use cas to update, and record if the db acknowledged the write or there was an error. At the end I’ll compare if the db and client apps agree on the state, or acknowledged writes are missing and/or unacknowledged writes are present.

A simple time-out test first

Before going with the full cluster, let’s test the code against a single node cluster running CouchBase 2.5.1: 256MB ram; default bucket is type Couchbase, 256MB ram quota, 0 replicas, 2 workers, flush enabled.

The client app will use Java SDK 1.4.3 and Oracle JDK 1.7.0_55.

CouchTimeoutTest first opens a connection to CouchBase – CouchbaseClient with the OP timeout of 10sec, sets an empty JSON document with a single array element, appends 50 numbers to the array, then makes the node drop all traffic for CouchBase ports, and tries to add 150 more elements to the array:

public void run() {
    cluster.getHealthy();
    setEmptyDocument();
    appendToDocument(50);
    cluster.uniDirectionalPartition();
    cluster.scheduleHealAndShutdownIn(15);
    //sleep while the node recovers?
    appendToDocument(150);
    client.shutdown();
    log.info("DONE!");
}

All operations are synchronous. The Java SDK driver docs describe how to deal with time-outs: “When a time-out occurs, most of the synchronous methods on the client return a RuntimeException showing a time-out as the root cause.”, so the appendToDocument implementation looks like this:

String addElementToArray(Integer nextVal, int numAttempts) {
        long start = System.currentTimeMillis();
        for (int i = 0; i < numAttempts; i++) {
            CASValue<Object> casValue = client.gets(key);
            String updatedValue = appendElement(casValue.getValue().toString(), nextVal);
            try {
                final CASResponse casResponse = client.cas(key, casValue.getCas(), updatedValue);
                switch (casResponse) {
                    case OK:
                        log.info("added " + nextVal + " in " + (System.currentTimeMillis() - start));
                        return updatedValue;
                    case EXISTS:
                        log.debug("retrying " + nextVal);
                        break;
                    default:
                        log.error("error trying to add " + nextVal + ": " + casResponse);
                        return null;
                }
            } catch (IllegalStateException ex) {
                log.warn("outpacing the network, backing off: " + ex);
                try {
                    Thread.currentThread().sleep(1000 * random.nextInt(5));
                } catch (InterruptedException e) {
                    log.warn("interrupted while sleeping, failed to add " + nextVal, e);
                    Thread.currentThread().interrupt();
                    return null;
                }
            } catch (RuntimeException ex) {
                if (ex.getCause() instanceof TimeoutException) {
                    log.error("timed out adding " + nextVal);
                } else {
                    log.error("error adding " + nextVal, ex);
                    return null;
                }
            }
        }
        log.error("failed to append " + nextVal + " in " + numAttempts + " attempts");
        return null;
    }

Test result

The client is configured to timeout after 10sec, and the partition lasts for 15s so I expect to see some timeouts and retries. In the end the document should have 200 integers in the array. Manually checking the iptables rules I see the code correctly heals the node. What do you think happened when I ran this code (using maven exec plugin, mvn test -Pcouch-timeout)?

  1. a few operations timed out and then succeeded
  2. a few operations timed out and then failed
  3. CouchbaseClient failed to reconnect so only 50 integers were written
  4. an infinite loop

Believe it or not it’s the 4th option. CouchbaseClient never timeouts or throws anything, just keeps running forever broadcasting noops to probably keep the connection alive:

2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Handling IO for:  sun.nio.ch.SelectionKeyImpl@208ee9c9 (r=true, w=false, c=false, op={QA sa=dimsplatform.com/192.168.1.128:11210, #Rops=1, #Wops=0, #iq=0, topRop=Cmd: 10 Opaque: 354, topWop=null, toWrite=0, interested=1})
2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Read 24 bytes
2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Completed read op: Cmd: 10 Opaque: 354 and giving the next 0 bytes
2014-07-22 19:27:52,342 DEBUG [com.couchbase.client.CouchbaseConnection] - Done dealing with queue.

The cluster is healthy, node up and running; if I spin another client it reads/writes without any problems.

What happens if the server is unavailable for some amount of time less than the configured timeout (10sec in this test)? The magic number seems to be 7, for anything longer the test deadlocks.

The SDK docs are pretty clear in that “when a time-out occurs, most of the synchronous methods on the client return a RuntimeException“.

It turned out (thanks Michael) the problem was the test code – getting the doc was outside any try/catch and somehow the exceptions were getting swallowed (not even showing in the log?!). The fix is:

couch_timeout_test_bugfix

With this change, the test runs as expected, a few ops time out but eventually all succeed:

...
2014-08-25 11:52:04,749 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 49 in 1
2014-08-25 11:52:04,750 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 50 in 1
2014-08-25 11:52:04,750 INFO [com.milansimonovic.jepsen.Cluster] - creating 1 way partition
2014-08-25 11:52:05,753 INFO [com.milansimonovic.jepsen.Cluster] - creating partition DONE
2014-08-25 11:52:05,753 INFO [com.milansimonovic.jepsen.Cluster] - scheduled heal in 15 seconds
2014-08-25 11:52:15,755 ERROR [com.milansimonovic.jepsen.CouchTimeoutTest] - timed out adding 51
2014-08-25 11:52:20,754 INFO [com.milansimonovic.jepsen.Cluster] - healing partition
2014-08-25 11:52:21,757 INFO [com.milansimonovic.jepsen.Cluster] - healing partition - DONE
2014-08-25 11:52:21,757 INFO [com.milansimonovic.jepsen.Cluster] - closing connections
2014-08-25 11:52:25,758 ERROR [com.milansimonovic.jepsen.CouchTimeoutTest] - timed out adding 51

2014-08-25 11:52:26,667 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 51 in 21035
2014-08-25 11:52:26,669 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 52 in 2
...
2014-08-25 11:52:27,072 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - added 200 in 1
2014-08-25 11:52:27,073 INFO [com.couchbase.client.CouchbaseConnection] - Shut down Couchbase client
2014-08-25 11:52:27,074 INFO [com.couchbase.client.ViewConnection] - I/O reactor terminated
2014-08-25 11:52:27,074 INFO [com.milansimonovic.jepsen.CouchTimeoutTest] - DONE!

The next article will explore how CouchBase behaves when this test runs against a 5 node cluster.

Tasting first CouchBase

Abstract

In this post I’ll tell you about installing and using CouchBase in a test-first manner. Wiping out all the data between tests turned out to be very slow – 4s per test [MB-7965], and for anything but key lookups indexes have to be used, so the elephant is still in the room. CouchBase’s async nature makes setting up testing infrastructure a bit more difficult compared to MongoDB or relational databases.

Motivation

I’ve used MongoDB for a couple of projects at work (http://pax-db.org); it’s great for read-only data, logs and analytics.

It turned out to be rather difficult to operate (puppetize, coordinating replicasets initialization, watching oplog, …); consistent hashing over a set of equal nodes sounds like it’s more likely to work reliably than async master-slave (plus seems more efficient – mongo secondaries are not used to serve content); sharding+replica sets is way more painful than elastically growing (and shrinking!) a dynamo-like system; plus mongodb can and will lose data (under network partitions).

The document model seems like a good fit, so what’s else out there, and doesn’t suck: CouchBase.

Installing CouchBase

In a virtual box (IP 192.168.1.128), just in case…

$ wget couchbase*.deb
# The docs says dependencies have to be installed manually first, and lists only one, libssl, but trying to install on a vanilla ubuntu 14 box doesn’t agree:

sudo dpkg -i couchbase-server-enterprise_2.5.1_x86_64.deb

#librtmp0 missing..

$ sudo dpkg -I couchbase-server-community_2.2.0_x86_64.deb
new debian package, version 2.0.
size 137295996 bytes: control archive=154416 bytes.
624 bytes, 15 lines control
562481 bytes, 5579 lines md5sums
3100 bytes, 103 lines * postinst #!/bin/sh
171 bytes, 7 lines * postrm #!/bin/sh
2737 bytes, 112 lines * preinst #!/bin/sh
526 bytes, 11 lines * prerm #!/bin/sh
Package: couchbase-server
Version: 2.2.0
Architecture: amd64
Maintainer: Couchbase <[email protected]>
Installed-Size: 402664
Depends: libc6 (>= 2.15), libgcc1 (>= 1:4.1.1), libreadline6 (>= 6.0), librtmp0 (>= 2.3), libssl1.0.0 (>= 1.0.0), libstdc++6 (>= 4.6), libtinfo5, zlib1g (>= 1:1.1.4), lsb-base (>= 3.2)
Breaks: membase-server
Replaces: membase-server
Section: web
Priority: extra
Homepage: http://couchbase.com
Description: Couchbase Server
Couchbase Server is the leading distribution of memcached and
couchbase, created and supported by top contributors to the memcached
and couchbase open source projects.

$ sudo dpkg -i couchbase-server-enterprise_2.5.1_x86_64.deb
Selecting previously unselected package couchbase-server.
(Reading database ... 106095 files and directories currently installed.)
Preparing to unpack couchbase-server-enterprise_2.5.1_x86_64.deb ...
libssl1* is installed. Continue installing
Minimum RAM required : 4 GB
System RAM configured : 2042696 kB

Minimum number of processors required : 4 cores
Number of processors on the system : 2 cores
Unpacking couchbase-server (2.5.1) ...
Setting up couchbase-server (2.5.1) ...
* Started couchbase-server

You have successfully installed Couchbase Server.
Please browse to http://demo:8091/ to configure your server.
Please refer to http://couchbase.com for additional resources.

Please note that you have to update your firewall configuration to
allow connections to the following ports: 11211, 11210, 11209, 4369,
8091, 8092, 18091, 18092, 11214, 11215 and from 21100 to 21299.

By using this software you agree to the End User License Agreement.
See /opt/couchbase/LICENSE.txt.

Processing triggers for ureadahead (0.100.0-16) ...
ureadahead will be reprofiled on next reboot

 

Setup

(mental note: think about puppet-izing ..)

Why do I have to create a default bucket!?

The upcoming Java SDK v2.0 looks very neat, reactive and async, with Java 8 lambdas, but until then need to stick with the latest stable v1.4.2 now.

Using Oracle jdk1.7.0_55.jdk on Mac OS X Mavericks.

Test first

Hello couchbase in java works fine. Encouraging, let’s try to implement a CRUD for a simple class (having a few primitive fields). Spring-data already can provide this but before automating something it’s usually a good idea to do it manually first. Plus spring-data mongodb hurt me once, but more on that in another post.

It starts with a test of course. Need to open a client first (hm, is CouchbaseClient thread-safe, has it got connection pooling under the hood, … the API docs don’t say anything, but the executorService field suggests it does have pooling; mental note: read the source code..). Docs v2.0 say Java SDK is thread safe (the C lib is not): “Couchbase Java and.Net SDKs have been certified as being thread-safe if you utilize thread-safety mechanisms provided by the two languages with the SDKs”. Alright, I can now sleep peacefully.

The documentation is somewhat confusing, http://docs.couchbase.com/, some sections only exist in 2.0, others in 2.1…
For example: thread-safety in sdk exists in v2.0, but not in v2.1 and v2.2. Wish there was a PDF version…

Back to the unit test, open a client in @BeforeClass, lets use a test bucket, so I don’t by mistake mess up the production data. To have tests isolated and repeatable (as defined in F.I.R.S.T), lets create/delete bucket before each test, using ClusterManager#createNamedBucket (introduced in v1.1), and run the test:

@BeforeClass
    public static void setUpBefore() throws Exception {
    clusterManager = new ClusterManager(Arrays.asList(URI.create(COUCH_SERVER)), admin, adminPass);
    clusterManager.createNamedBucket(BucketType.COUCHBASE, TEST_BUCKET, 100, 0, PASSWD, true);
    client = new CouchbaseClient(Arrays.asList(URI.create(COUCH_SERVER)), TEST_BUCKET, PASSWD);
}

creating the CouchbaseClient on the 5th line throws an exception:

com.couchbase.client.vbucket.config.ConfigParsingException: Number of vBuckets must be a power of two, > 0 and <= 65536 (got 0)

A forum thread says it’s probably a timing issue, couch is still not done creating the bucket. The mentioned links to BucketTool.java and ViewTest.java are 404s, so lets ask my friend git:

$ git log --oneline -5 -- src/test/java/com/couchbase/client/BucketTool.java
ec02294 Migrating Codebase.
95569c4 Fix ClusterManager too optimistic timeouts.
3617768 JCBC-280 - Support for edit bucket functionality through cbc.
5d283c2 Happy 2013!
9d948c7 Renamed the bucket create method to abstract type better.

 

Removed in ec02294, let’s go one rev before that one – 95569c4: BucketTool.java, ViewTest.java. The code looks like mine, creating a (default) bucket first then calling initClient. A dead end.

One of the blog posts mentioned something about setting timeouts, lets see:

@BeforeClass
    public static void setUpBefore() throws Exception {
    clusterManager = new ClusterManager(Arrays.asList(URI.create(COUCH_SERVER)), admin, adminPass);
    clusterManager.createNamedBucket(BucketType.COUCHBASE, TEST_BUCKET, 100, 0, PASSWD, true);
    CouchbaseConnectionFactoryBuilder factoryBuilder = new CouchbaseConnectionFactoryBuilder().setObsTimeout(15000); //2.5sec by default
    CouchbaseConnectionFactory cf = factoryBuilder.buildCouchbaseConnection(Arrays.asList(URI.create(COUCH_SERVER)), TEST_BUCKET, PASSWD);
    client = new CouchbaseClient(cf);
}

Nope, still the same error. Going through the couch docs, I noticed the cli tool has an option to wait:

--waitWait for bucket create to be complete before returning

Hm, wonder how is that implemented..

# Make sure the bucket exists before querying its status
bucket_exist=False
while(time.time()-start)<=timeout_in_secondsandnotbucket_exist:
  buckets=rest_query.restCmd('GET',rest_cmds['bucket-list'],
  self.user,self.password,opts)
  for bucket in rest_query.getJson(buckets):
    if bucket["name"]==bucketname:
      bucket_exist=True
      break
    if not bucket_exist:
      sys.stderr.write(".")
      time.sleep(2)

Pooling bucket-list until it’s created. Seems like there’s no other way but to sleep:

@BeforeClass
public static void setUpBefore() throws Exception {
    clusterManager = new ClusterManager(Arrays.asList(URI.create(COUCH_SERVER)), admin, adminPass);
    clusterManager.createNamedBucket(BucketType.COUCHBASE, TEST_BUCKET, 100, 0, PASSWD, true);
    Thread.sleep(500);
    client = new CouchbaseClient(Arrays.asList(URI.create(COUCH_SERVER)), TEST_BUCKET, PASSWD);
}

@org.junit.Test
public void testClient() throws Exception {
    assertNotNull(client);
}

The bar is green, but it makes the code either brittle or slow depending on the timeout: if too big the tests are slow, if too small the tests can fail, and it probably depends on the hardware, network, setup… waitForWarmup is not making the difference since it works once the CouchbaseClient is created.

How about trying a few times until it succeeds?

@BeforeClass
public static void setUpBefore() throws Exception {
    clusterManager = new ClusterManager(Arrays.asList(URI.create(COUCH_SERVER)), admin, adminPass);
    clusterManager.createNamedBucket(BucketType.COUCHBASE, TEST_BUCKET, 100, 0, PASSWD, true);
    for (int i = 1; i <= 10; i++) {
        try {
            client = new CouchbaseClient(Arrays.asList(URI.create(COUCH_SERVER)), TEST_BUCKET, PASSWD);
            System.out.println("managed to open the client in " + i + " attempts");
            break;
        } catch (RuntimeException e) {
            if (e.getMessage().startsWith("Could not fetch a valid Bucket configuration.")) {
                Thread.sleep(100);
            } else {
                throw e;
            }
        }
    }
}

Green bar again, took 4s, with the message: “managed to open the client in 5 attempts”. I swear 500 was just a lucky guess, you can’t make this shit up 🙂

Lets add one more test to see how Fast this is:

public class TestConn {
    static final String adminPass = “*******";
    static final String admin = “admin";
    static final String TEST_BUCKET = "test1";
    static final String PASSWD = "s3cr3t";
    static final String COUCH_SERVER = "http://192.168.1.128:8091/pools";
    static ClusterManager clusterManager;
    CouchbaseClient client;

    @BeforeClass
    public static void setUpBefore() throws Exception {
        clusterManager = new ClusterManager(Arrays.asList(URI.create(COUCH_SERVER)), admin, adminPass);
    }

    @Before
    public void setUp() throws Exception {
        clusterManager.createNamedBucket(BucketType.COUCHBASE, TEST_BUCKET, 100, 0, PASSWD, true);
        for (int i = 1; i <= 10; i++) {
            try {
                client = new CouchbaseClient(Arrays.asList(URI.create(COUCH_SERVER)), TEST_BUCKET, PASSWD);
                System.out.println("managed to open the client in " + i + " attempts");
                break;
            } catch (RuntimeException e) {
                if (e.getMessage().startsWith("Could not fetch a valid Bucket configuration.")) {
                    Thread.sleep(100);
                } else {
                    throw e;
                }
            }
        }
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) client.shutdown();
        client = null;
        clusterManager.deleteBucket(TEST_BUCKET);
    }

    @AfterClass
    public static void tearDownAfter() throws Exception {
        if (clusterManager != null) clusterManager.shutdown();
    }

    @Test
    public void firstTest() throws Exception {
        assertNotNull(client);
    }

    @Test
    public void secondTest() throws Exception {
        assertNotNull(client);
    }
}

unit test console log

Oops, F just went out of window, it’s super slow, took 8 secs without any ops against the database.

Ok, how about creating bucket once per class and then calling flush/flushBucket to delete all data:

public class TestConn {
    static final String adminPass = “********";
    static final String admin = "Administrator";
    static final String TEST_BUCKET = "test1";
    static final String PASSWD = "s3cr3t";
    static final String COUCH_SERVER = "http://192.168.1.128:8091/pools";
    static ClusterManager clusterManager;
    CouchbaseClient client;

    @BeforeClass
    public static void setUpBefore() throws Exception {
        clusterManager = new ClusterManager(Arrays.asList(URI.create(COUCH_SERVER)), admin, adminPass);
        clusterManager.createNamedBucket(BucketType.COUCHBASE, TEST_BUCKET, 100, 0, PASSWD, true);
    }

    @Before
    public void setUp() throws Exception {
        for (int i = 1; i <= 10; i++) {
            try {
                client = new CouchbaseClient(Arrays.asList(URI.create(COUCH_SERVER)), TEST_BUCKET, PASSWD);
                System.out.println("managed to open the client in " + i + " attempts");
                break;
            } catch (RuntimeException e) {
                if (e.getMessage().startsWith("Could not fetch a valid Bucket configuration.")) {
                    Thread.sleep(100);
                } else {
                    throw e;
                }
            }
        }
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.flush().get();
            client.shutdown();
        }
        client = null;
    }

    @AfterClass
    public static void tearDownAfter() throws Exception {
        clusterManager.deleteBucket(TEST_BUCKET);
        if (clusterManager != null) clusterManager.shutdown();
    }

    @Test
    public void firstTest() throws Exception {
        assertNotNull(client);
    }

    @Test
    public void secondTest() throws Exception {
        assertNotNull(client);
    }
}

TestConnTime2

Nope, now it took almost 30sec (note: clusterManager.flushBucket(TEST_BUCKET) does not make any difference):

Hm, for sure I’m not the first one to try this, let’s see how other are doing this. Let’s see if there’s any github repo that does this: https://github.com/search?q=createNamedBucket&ref=cmdform&type=Code only 3 in java as of the writing this post, none doing unit testing…

Maybe the repos are private… Lets search the forums. There’s this post from Mike, a CouchBase engineer:

I can see that after trying flush you then tried to do the recommended thing and delete and recreate the bucket, but your running into a few issues with runtime exceptions. The only time I have run into this issue is when I have been creating and deleting buckets really fast. For example, when we switched our unit tests off of flush and moved to deleting and recreating buckets we were doing this process many times per second.

By “really fast” Mike obviously means way more than “many times per second”, but I’d happily take even once or twice per second. ViewTest.java used to create buckets once per class, but never used flush:

@BeforeClass
public static void before() throws Exception {
    BucketTool bucketTool = new BucketTool();
    bucketTool.deleteAllBuckets();
    bucketTool.createDefaultBucket(BucketType.COUCHBASE, 256, 0, true);
}

What options do I have now? Instead of creating/deleting buckets or using flush(), think I’ll have to manually delete all items in the bucket. This turned out to be fast enough.

With the test setup in place, let’s spin up that fail/pass/refactor wheel.

CRUD class

I usually start with this test when implementing a CRUD:

@Test
public void test_save_and_load() {
    assertEquals("should be empty to start with”,0, repo.count());
    repo.save(item);
    assertEquals(“failed to save”,1, repo.count());
}

If you object that testing save() using count() isn’t true to TDD’s principles, let’s say I’m just being pragmatic. Can you find a case where this logic fails to catch a bug?

I like to start with an empty table/collection/bucket, run the test, and don’t clean up afterwards, so it’s easy to manually verify if the data is really there.

How do I count number of items in a bucket? countdistinct-keys-using-mapreduce and basic-couchbase-querying-for-sql-people explain.

Have to make a view for that… which means I need to figure out how to test views first. couchbase-101-create-views-mapreducecouchbase-101-create-views-mapreduce-from-your-java-application.

Boils down to creating a DesignDocument, adding a View and saving the design doc. Sounds simple.

@Test
public void test_create_views() throws Exception {
    try {
        DesignDocument designDoc = client.getDesignDoc(repo.designDocName);
        fail("design doc already exists!? " + designDoc);
    } catch (Exception e) {
        //no views yet, good
    }
    repo.createViews();
    final View view = client.getView(repo.designDocName, repo.viewName);
    assertNotNull(view);
}

Red bar, move on to implement createViews(). As of SDK 1.4 ViewResponse exposes the total number of rows in the view. Since we’ll be storing all kinds of different document (classes, think BlogPost, Comment, …) in a single bucket, views have to make sure they onloy index the intended document types, and emit document ids:

function (doc) {
    if (doc.type && doc.type == "org.pax_db.core.DatasetInfo") {
        emit(doc.id, null);
    }
}

Green bar, lets implement the count() method:

@Override
public long count() {
    final View view = client.getView(designDocName, viewName);
    Query query = new Query();
    query.setIncludeDocs(false);
    query.setLimit(1);
    query.setStale(Stale.FALSE);
    final ViewResponse response = client.query(view, query);
    return response.getTotalRows();
}

Run the test again, unexpectedly it fails again in client.query():

Caused by: java.lang.RuntimeException: Failed to access the view

The view exists – client.getView() returns non null object, but query fails. Googling again: http://stackoverflow.com/q/24306216. Ahhh, timing.. I don’t mind async workflows, but async workflow without callbacks, that’s annoying. No answers on the SO question, more googling… new-views-are-immediately-available

The view is created, but then the server needs to index all existing docs and that takes some time, which is funny since here the bucket is empty. Anyhow, it’s not instantaneous, need to wait. Back to looping. Adapting the exponential backoff code sample given at Couchbase+Java+Client+Library (one more interested thing to test). Run the test again and now it passes.

Implementing save()… first JSON encoding (using Gson). Hm, need to add this type file which is not present as a field. StackOverflow proves useful again.

JsonElement json = gson.toJsonTree(item);
json().addProperty("type", item.getClass().getCanonicalName());

hm, this needs testing.

@Test
public void test_gson() throws Exception {
    final Gson gson = new GsonBuilder().setPrettyPrinting().create();
    final JsonElement el = gson.toJsonTree(item);
    el.getAsJsonObject().addProperty("type", item.getClass().getCanonicalName());
    final String json = gson.toJson(el);
    assertTrue(json, json.contains("id"));
    assertTrue(json, json.contains("name"));
    assertTrue(json, json.contains(item.getName()));
    assertTrue(json, json.contains(item.getDescription()));
    ...
}

Fails with the description field. Debugger to help: looks like some encoding thing. http://stackoverflow.com/a/11361318/306042GsonBuilder#disableHtmlEscaping() solved the problem.

Back to save(), need to properly code around future.get(), handle exceptions and thread interrups.. and it works.

Going to the admin console http://192.168.1.128:8091/index.html#sec=buckets, and browsing the test bucket, I can see one document:

{
"id": "1",
"name": "H. sapiens PeptideAtlas Build May 2010",
"organ": "WHOLE_ORGANISM",
"description": "Spectral counting data based on MS/MS data from Human PeptideAtlas Build May 2010
Interaction consistency score: 13.5&nbspCoverage: 40%",
"score": "13.5",
"isIntegrated": false,
"hasPeptideCounts": true,
"type": "org.pax_db.core.DatasetInfo"
}

 

Conclusion – how much I miss rich query API..

Compared to MongoDB’s collection#save() & collection#count() and rich query API, CouchBase requires more work (including reasoning about getting stale data – Query#setStale(Stale)). Maybe spring-data can help generate CRUD repos, and ElasticSearch for rich query API (until N1QL is ready..).