Consistent hashing in bifrost

After some research to choose sharding method for my key-value store, I still insist using consistent hashing from last version.

According from some references like this video, consistent hashing allows users to lookup where is the data by keys. If clients share the same server information, whenever or wherever the lookup occurs, it will always get a fixed result.

maxresdefault

Consistent hashing can also prevent large scale re-hashing occurred on server node membership changes which may lead to unacceptable amount of data migration across servers. The downside is lookup operations in traditional way only need to do one mod by hash codes, but consistent hash required to do a binary search on the ring.

The core data structure to consistent hashing is a sorted array, which should be considered as ring, like showed in the figure above. The element of the ring, is objects with node hash and node address. To construct the ring, clients should get know server address and their weights. Server nodes objects which is the element of the ring should be generated for each servers and the ratio for nodes  to each server should be determinate by weights, their hash code should be unique. Each node should use the same hash function, because different implementation may have different result ranges. Consistent hashing components should also listen to membership changes in order to regenerate the ring and inform member servers to do data migration.

In bifrost, consistent hashing system is built on it's client membership system. User need to register server to bifrost membership services, plus a weight service to inform clients how much loads it can take. For example, a on-memory key-value store, the weights should be set as how much memory the server has, in MB. Bifrost consistent hashing clients will generate a almost fix sized ring (around 2048 nodes in default) according the weights. Users don't need to normalize weights to prevent a very large or small ring that will impact on it's performances.

The ring will be updated automatically if membership changes occurred. Thanks to subscription feature in bifrost raft state machine, there is no polling but server pushes underneath. User can also watch changes for the whole or individual server to migrate data. In the server changes callback for individual member, user will get new key ranges the member belongs, to remove or load data from outer sources.

To use consistent hashing in bifrost, it is similar to client membership. Server, raft services, heartbeat service for membership should be load and prepared. Consistent hashing data structure are built in term of membership groups, this allows to distribute data to servers for different purpose with different lookup table. So, a member must join groups to be accessible in the consistent hashing lookup. As mentioned before, servers all need to specify it's weight. It need to be initialized in bifrost cluster raft service.

Weights::new(&raft_service);

In member server side, the shell of the service can be created to set weight for itself. Please be noticed in this stage ch is not capable of doing any lookup because the ring have not yet been generated.

let ch = ConsistentHashing::new(&group_1, &wild_raft_client).unwrap();

Weights can be set by

ch.set_weight(&server_1, 1);

After weights been set, ring can be generated by

ch.init_table().unwrap();

In clients that will only to lookup, ch and it's ring can be generated without additional steps by

let ch = ConsistentHashing::new_client(&group_1, &wild_raft_client).unwrap();

Now, ch should be functional and ready to deliver.

To lookup a string key:

let server = ch1.get_server_by_string(&k).unwrap();

The server variable should contain a consistent server name for the key.

If user already knows the hash code, it can be lookup directly by

let server = ch1.get_server(&key_hash).unwrap();

Users may also want to lookup by hashing a object, bifrost allow users to provide a object that implemented serde::Serialize. Bifrost will use bincode for serde to encode the object into binary data and use the same hash function for the consistent hashing to calculate the hash code, by:

let server = ch1.get_server_by(&M {a: 1, b: 2}).unwrap();

To watch key range changes it should responsible in specific server, it is as easy as what we meet in raft state machine subscription.

ch.watch_server_nodes_range_changed(&server_name, move |r| {
    println!("Server key range changed to {:?}", r.unwrap());
});

Users do not need to update the ring manually. When any member in the group leave or go offline, nodes to the server will be removed from the ring. When any member joined or go back online, the nodes to the server will also be added to the ring, automatically. There maybe some lag to be responsive, users need to handle short time inconsistency by themselves.

Consistent hashing in bifrost is managed to be easy to use and also self-contained without other dependices. Combining with it's client membership system, the efforts required to be take care of from users was reduced to minimum.

Some thought about addressing problem in Nebuchadnezzar

I just paused on coding to read some paper[1] [2] [3] [4], finding the optimal way to solve cell addressing problem for Nebuchadnezzar. I meet some guys from PingCAP before the spring festival, they asked me why I choose consistent hashing, a P2P way to addressing data in Nebuchadnezzar, over using some addressing service that clients can lookup what is the server for the data and then go to the server to do the jobs (they call it placement driver). PingCAP is building a relational database named TiDB with underlying key-value store TiKV based on theory from Google F1 and Spanner, that meeting was a interview for a remote engineer position, held in a restaurant.

Honestly I can't answer the question at that time. Because I don't know how their placement driver works. Google chose this architecture for almost all of their infrastructure software. Seems like the placement driver won't becomes bottle neck to the whole system. PingCAP claimed their PD can take more than a million per second which is enough for almost any use case. In the other hand, Cassandra (it only use consensus for transactions), Dynamo from Amazon, Trinity from Microsoft and RAMCloud from Stanford all use or propose P2P architecture on top of consistent hashing.

The benefits from PD and range sharding is data can move around in the cluster at will. That means data can store in order by it's fields and fetch them in sequence easily. If one server cannot hold the sequence, PD can dispatch data to other servers. To locate the data, clients have to query the location in PD. In PingCAP PD, it takes at least 1 (when the location for data entry was cached) to 3 (if cached location is stale) network round trip to fetch the data from the actual server.

Consistent hashing don't need a central service to hold the location to the data. The only information it required is members and their weights to generate the lookup table by hashing, sort, and generate range slots. To lookup in the slots, clients need to hash the key and use binary search to find the range slot the key belongs, each slot represents a server address. Servers will maintain it's own hash table to the actual data. There will be only one network round trip and the addressing can be finished in clients. Range slots won't expand with amount of entry the system holds, the amount of range slots is also configurable. The downside to this approach is that range query is somehow difficult to implement and also inefficient compared to PD approach.

It seems like PingCAP considered P2P way but rejected due to it's complexity and the extra network round trips from PD is insignificant to the performance for their product. It is understandable because TiKV is a key-value store backed by RocksDB which it's data persists to stable hard drives. Each operation on TiKV may also trigger a disk operation which is expected. Disk operations may not faster than network (disk latency is about 1 to 15 milliseconds depends on the actual device, network latency should be about 1 milliseconds or less), increased network round trips will not make much impact on such system. In TiKV, due to it's replication, data also need to replicate to at lease one follower, that will also take at least one more network round trip. It is designed to ensure data safety.

Nebuchadnezzar is a key-value store to power my graph engine Morpheus. Morpheus enable users to do basic CRUD and graph traversal by performing parallel breath-first-search. Each vertices or edges are stored in form of cells in the key-value store. When performing graph traversal operations, Morpheus will do get operations for every cells for vertices, edges and meta data it required. Because graph data are all about relations and their access pattern are unpredictable, we can consider it's pattern is mostly random and seldom sequential access. Another factor to be considered is latency. Because every traversal operations are serials of random access to the key-value store, it is essential to keep every operations to be finished as soon as possible. Keep those ideas in mind, Nebuchadnezzar is designated to be a in-memory key-value store without instant writing data to disks and no internal replication supported. It is similar to Redis and memcached, with more features like sharding and schema support for memory efficiency.

Low latency distributed storage system makes addressing problem more sensitive than traditional data store backed by hard drive disks. Because we have made so much efforts on decreasing latency to less than 1 ms in a single server, redundant network round trips will be unacceptable. It should also considered that because there will be huge amount of random access, cache mechanisms mentioned in PD client will fail due to frequent cache miss. It can be foreseen that there will always be at least 2 network round trips (even with batch) for PD approach.

This makes consistent hashing more preferable for Nebuchadnezzar. Addressing run-time takes no more than O(log n), n is the size of range slot size, which would be a constant. There will also be only one and at most one network round trip, overall latency will be more controllable.

Range query problem over consistent hash table still remains. Although Nebuchadnezzar is built for high-performance random access, range query allow us to implement index for client range query which is essential to some use cases. It is not yet solved in the first version and was considered almost impossible. This is because after hashing, data keys lost it's underlying value and the data dispatched to range slots in uniform distribution. There is no way to determine data location from the range slots.

There is some workaround for range query problem in consistent hash table actually. [3] proposed to use prefix tree (PHT) and [4] proposed to use segment tree (DST), for strings and numbers. Both of the workarounds required to use another data structure to keep track of data keys, range query in the data structure and use consistent hashing to fetch the data.

The only problem left now is engineering. We have to choose how to distribute the data structure to cluster servers in large scale. Fourtantly, both PHT and DST are tree data structure, means they all can be split into fine grained sub-trees on different servers and execute commands in parallel if the tree is too large to fit into one server.

But this also leads to new problems, in worst cases we have to reach multiply servers to create and delete index for keys. It will increase latency for every commands related to the index. Although we can put index processes into coroutines that will not affect user thread, but user may also suffer inconsistent from index. In the actual implementation, I will let user to do the trade-off.

I will keep using the original architecture in my first version for addressing problem in next generation of Nebuchadnezzar. It is the result of compromise for performance. Next if I have some time, I may built another system that does not have such low-latency requirement, on PD approach.

bifrost : distributed system framework for rust

It have been a while since last time I publish an article about my recent work. I am working on a distributed system framework for rust applications. Although there are already are some mature libraries for such purpose, but non of them are for rust. Means there are tons for wheels to be reinvent.

Bifrost is a rust library enabling building robust distributed systems. It does not need any third party software like Zookeeper, etcd or consul to ensure consensus. Bifrost shipped with it's own RPC and raft state machine implementation as basic building block because there is no stable library for such. Bifrost provide a convenient, customizable and fast RPC that does not need to make protocol file and use third party program to generate server traits and client stubs. It also ship with a simple but yet flexible raft state machine framework to build simple data structure like lookup map or more complex system like monitoring client membership changes. It also have potential to build  massive replication data store like tikv in top of it (although bifrost not yet support multi-raft).

bifrost_arch

The idea of using raft is to ensure high availability by replicate logs to majority of machines before it response to the request. Minority crashed or slow server will not harm data integrity. It can also scale read capacity to virtually infinite but write capacity will be limited by individual server in the cluster.

To define a raft state machine, users need to provide action to the functions, to determinate the behavior how to process the function requests. Bifrost currently support 3 kinds of actions: command, query and subscribe. For command functions, clients will send requests to leader in the cluster, leader will append request to the logs and replicate it to followers, commit and return the result. For query requests, client will send requests to any server in the cluster, server will execute the query immediately and return the result with it's last log id; client will check if the log id returned is larger than or equal to the last log id it received; if not, client will reject the result and find another server to try again.  For subscribe, client will start a server to listen to subscription messages; client also need to send command to leader to append subscription logs in configuration sub state machine, which will replicate the subscriptions to it's followers; when events that match the subscription happened, only leader will send message to client subscription servers.

For example, a very simple state machine can be defined like

raft_state_machine! {
    def cmd set(v: $t);
    def qry get() -> $t;
    def sub on_changed() -> ($t, $t);
}

User have to consider what action to use for each function. Because for query (qry) functions generated, the state machine will be immutable. The only way to mutate the state machine is use command (cmd) action.

impl StateMachineCmds for Value {
    fn set(&mut self, v: $t) -> Result<(),()> {
        if let Some(ref callback) = self.callback {
            let old = self.val.clone();
            subs.notify(&commands::on_changed{}, Ok((old, v.clone())));
        }
        self.val = v;
        Ok(())
    }
    fn get(&self) -> Result<$t, ()> {
        Ok(self.val.clone())
    }
}

The raft_state_machine macro will not generate trait functions for subscribe (sub) actions. In the state machine trait implementation, subscriptions should be triggered in command (cmd) functions like the first 4 lines in set command above. You can read the full example here.

To use subscription, it is just as easy as invoke the subscribe function in the client with pattern to match (it can also be empty) and a closure to receive the message. For example to subscribe new entries inserted into a map, the function can be defined as:

def sub on_inserted() -> ($kt, $vt);

To use the subscribe function:

sm_client.on_inserted(move |res| {
    if let Ok((key, value)) = res {
        println!("GOT INSERT CALLBACK {:?} -> {:?}", key, value);
        assert_eq!(inserted_stash.get(&key).unwrap(), &value);
    }
});

Sometime we need to receive specific kinds of message with some limitation. Bifrost introduced a way to subscribe messages with certain parameters. Those functions can be defined as:

def sub on_key_inserted(k: $kt) -> $vt;

We have to notify the trigger to send messages to subscriber

callback.notify(&commands::on_key_inserted{k: k.clone()}, Ok(v.clone()));

To receive the message, in this case, users have to provide the key they want to subscribe. In this case, it will be sk1 clone.

sm_client.on_key_inserted(|res| {
    if let Ok(value) = res {
        println!("GOT K1 CALLBACK {:?}", value);
        assert_eq!(&String::from("v1"), &value);
    }
}, sk1.clone());

RPC, raft state machine framework are all multiplexing. A RPC server and client can support multiply services in one port, the state machine framework can also handle more than one sub state machines.

512px-Multiplexing_diagram.svg

This enable users to reuse resources in a flexible way. Users need to assemble servers and raft state machine with services. For example, in my client membership tests:

let addr = String::from("127.0.0.1:2100");
let raft_service = RaftService::new(Options {
    storage: Storage::Default(),
    address: addr.clone(),
    service_id: 0,
});
let server = Server::new(vec!((0, raft_service.clone())));
let heartbeat_service = Membership::new(&server, &raft_service);
Server::listen_and_resume(server.clone(), &addr);
RaftService::start(&raft_service);
raft_service.bootstrap();

Users need to define services, raft_service and heartbeat_service in the example, initialize the server with one or more services. Users can also use register_service to add others after the initialization, like Membership::new function in the example. If it is required, users can hook up more than one raft services or other services in one server reactor. The only exceptions need to made is to use different service id in registration.

The state machine framework shared the same idea with the RPC. Users need to register sub state machines to RaftService references in order to make it meaningful. In client membership implementation, the Membership::new initialization function will do those jobs for users by

raft_service.register_state_machine(Box::new(Membership {
    heartbeat: heartbeat_service.clone(),
    groups: HashMap::new(),
    members: HashMap::new(),
}));
server.register_service(DEFAULT_SERVICE_ID, heartbeat_service);

Multiplexing upgrades was done in the spring festival recently. It did increase programming complexity, but more resource efficient and desired to do the same job.

Bifrost also contains some utility may come handy. The notable one is the binding feature, borrowed from Clojrue programming language. Users can define a binding variable with a default value. The variable can be reset at any time and it will be accessible anywhere inside the binding block. The binding is thread-local effective which means binding values to the same variable in different threads will not interfere each other.  For example:

def_bindings! {
    bind val IS_LEADER: bool = false;
}

This will define a value binding variable named IS_LEADER and it's default value is false. It can be rebind to other value by a macro block:

with_bindings!(IS_LEADER: is_leader(meta) => {
    meta.state_machine.write().commit_cmd(&entry)
})

Then in anywhere inside the function commit_cmd, IS_LEADER can always be accessed with the value assigned by invoke get function to the binding. Outside the with_bindings macro block, or in other threads that have not yet bind any value, the value to the binding will always be false in this case.

This is useful when deliver values to functions is undesired. In the example above, only small amounts of sub state machines need to know if current state machine is running on a leader raft service, but it is required in some sub state machine like subscription (only leader can notify subscriber). Deliver this parameter to every sub state machine is unnecessary. Bindings can make cleaner code structure and less boilerplate.

There are two kinds of binding bifrost supported. The first kind you have seen is values. It is suitable to be used for primitive types such as u64, i8, bool. Another kind is references, bifrost will wrap default data or binding data with Arc reference counting container. Each times user get from the binding is a reference. It is suitable for objects, such as String or HashMap.

There are still much more to be introduced about bifrost. In next articles, I will discuss about the architecture design in each of the two modules.

Send, Sync , RefCell, Mutex and RwLock. How rust ensures thread safety.

I am working on bifrost, in the whole month. Expected it to be the foundation of my future projects. Although Raft algorithm itself it simple for everyone to understand, to achieve a usable library like Zookeeper and etcd still required efforts.

In the process of implementing this algorithm with read ability from state machines, which can be executed on both leader and updated follower concurrently to improve read performance, I figured out how rust ensures thread safety by myself.

Let's rehearse how to mutate a variable and what happened under the hood.

First, every mutable variable should have a prefix 'mut' to tell the compiler it is mutable. Then the compiler will keep eyes on those kind of variable to avoid potential inconsistent behavior likely to happen.

For example, in Java you can do this without get an error from compiler. But it is a very bad practice, don't do this at work.

List numbers = new ArrayList();
numbers.add(1);
for int i : numbers {
  numbers.add(i + 1);
}

You may expect you can get 1 and 2 in the number list. But in rust, this code even cannot been compiled.

let mut numbers = vec!(1);
for i in &numbers {
    numbers.push(i + 1);
}

You will get error like this:

error[E0502]: cannot borrow `numbers` as mutable because it is also borrowed as immutable
--> src/raft/mod.rs:444:13
    |
443 |         for i in &numbers {
    |                   ------- immutable borrow occurs here
444 |             numbers.push(i + 1);
    |             ^^^^^^^ mutable borrow occurs here
445 |         }
    |         - immutable borrow ends here

As you can see, you borrowed number as immutable in the beginning of the loop. In the loop block, you will no longer able to mutate 'number'. You may be get annoyed when switching from other PL based on GC or ARC to manage memory like Java/Python/Ruby or even Swift, seeing this kind of 'Error' which used to be at most 'Warning', fight with the compiler just to get the your code to been compiled.

The very essence of Rust is it's zero-cost abstraction. Just like it is advertised, rust cost no more than malloc and free call to manage memory in run time unlike mark and sweep from GC or reference counting from ARC. Which means Rust still use raw pointer under the hood and raw pointer is unsafe.

How can rust ensures memory safety only in compile time with raw pointer? It checks how data flows and when a piece of memory is no longer required, it will be freed instantly.

mut-borrow

The figure above shows how a variable was mutated. After the mutate occurred (green box turned into purple), the memory for green box will be freed.

Why will the green box been freed and allocate new spaces for purple rather than reuse it? Thinking about C++, when there is no resizeable array list data structure exists, you can only allocate an arbitrary amount of memory and cannot expand it afterwards. The only way to expand the array literally, is to allocate another memory with larger space, copy the contents from the old one and free the memory for the old. This is how array list works in standard library. That means the memory address to the array may be changed in the mutation. In the example above, the address to number is unstable in the loop due to mutable function push, which have risk to produce dangling pointer.

Although you can presume the memory preserved for the Vector is always sufficient by invoking with_capacity, but the compiler will never accept any assumption like that because it is dumb and does not trust it's users.

In the concurrency programming scope, things gets more complicated. I used to chat with one of my friend after we watched another round of the greatest movie in 2016 <Your Name.>.

Your-Name
Good story, great graphics.

He mentioned some unexpected behavior encountered when dealing with threads, shared pointers, data races etc, on C++ programming, which cost him a lot of time to figure out what really happened at that time. I have my own experience when do such things on Java, but it may go worse on bare-metal. A pointer shared between threads may get freed when other threads are still using it.

Rust provided some mechanisms to prevent you from data races even you do not realized what you are doing and the consequences. In the doc there is an example for you to make sense on how to work with threads.

use std::sync::{Arc, Mutex};
use std::thread;

let five = Arc::new(Mutex::new(5));

for _ in 0..10 {
    let five = five.clone();

    thread::spawn(move || {
        let mut number = five.lock().unwrap();

        *number += 1;

        println!("{}", *number); // prints 6
    });
}

Although this is not what an efficient concurrent hit counter should really been designed, but it demonstrated how to share data with threads. Let's decomposite to see what's really in there.

First we use Arc and Mutex from the std::sync. You may familiar with the term Mutex, which is the lock, but Arc is the new thing in rust world. Arc stands for atomic reference counting. It looks and behave like the ARC from Apple , which required to be used explicitly in rust, is for memory management. Arc will monitor references been used in threads and free the memory when all threads ended.

ref-count
You can imagine the rectangles are the threads, reference the same data (the green box)

There is also another reference counter named Rc which have the same interfaces with Arc, but cannot been used with multi-thread. You will get another compile error if you replace Arc with Rc instead. Because Rc is not designed for multi-thread, it is not safe to be shared between threads. Arc can be shared because it use an atomic counter under the hood. Rust ensures you will never messed with with Rc and Arc by using two traits, Send and Sync. The two traits usually appears in pair, but they represent different meaning.  Send means you can send the data to threads safely without trigger a copy, like what we use Arc for. It use clone function to create another reference for counting and add one to the counter, when the clone dropped, the counter will minus one. Sync means every modification to the data will be synchronized between the threads, it is what we use Mutex for. If there is no any mutation on the data, we don't really need Sync. In the example above, if we don't add on the number, but only print out the value for the number, Mutex is not required.

Mutex is a lock. Threads trying to acquire the lock for read or write the data exclusively. This is how Mutex ensures the safety when there is data race, it also responsible for tracking memory address changes when mutating the data. Arc can't do such job alone because it is only responsible for sharing the data by sending address to the threads by clone. Every time lock() function is invoked and lock is acquired, it will return a MutexGuard as the entry point to the data and the monitor to ensure the lock will be released when it is out of scope or dropped. In the example above, if developer need to do any mutation to the number in the reference, Mutex is required.

Another alternative lock is read write lock, RwLock in rust standard library. In this lock, read will not block each other but write lock will block both read and write. In bifrost, when implementing read for state machines, read lock is the best practice. Because when there is no mutation in the process, blocking read is undesired. Although RwLock is similar to Mutex by providing extra feature to avoid unnecessary blocking, RwLock and Mutex are not interchangeable. RwLock cannot been used alone because read() provided a window for unsafety, thus it does not implemented Sync trait unless every member in the data implemented Sync as well. In my practice, when I put a threadpool in the RwLock, it cannot send to threads even if the RwLock is wrapped by Arc. The compiler will throw an error complain about Sync trait missing. If I replace RwLock with Mutex, it will be fine. My final solution is to wrap the threadpool by Mutex, which passed the compiler. You see, using the threadpool is a potential mutation to the data structures in the object. Even read() cannot stop you from doing this because the threadpool functions are not tagged as mutable for compiler (it is common when the library rely on unsafe operations). When the threadpool itself cannot ensure memory safety, it have to rely on others, like Mutex.

In this scenario, RefCell is also not allowed in RwLock. RefCell is like the combination of Rc and RwLock, without the actual lock part. It was used as a work around for interior mutability, with some run time cost. Because it cannot ensure safety by itself, I use another RwLock as replacement. I spent hours on figuring out the difference between Mutex and RwLock, finding countermeasures. Thread safety is ensured even if I did not know the whole story in the beginning, avoid potential crash defects.

Although rust is still in its early stage to be used as a productive system programming language because of lacking mature libraries, it demonstrated that it is worthwhile to setup a steep learning cure to ensure even the newbie to this technique are still able to write safe code without blow up anything in run time. When it also have potential to build high performance software, makes it the perfect system programming language to me.

Building RPC framework for Rust programming language

Once I found there are so much work to do to make system programming on Rust a little bit easier, I make up my mind to take it as a chance to implement some basic distributed system features like Raft consensus algorithm. The foundation of a distributed system is a mechanism for server to communicate with. RPC comes with predefined protocols for each command like URL and parameters within which makes it ideal in large systems.

I had built 2 kinds of systems relies on RPC. Although they are all designated for client to invoke functions in another server, but the actual procedure is totally different.

The first one is a Clojure project. Because I can compile Clojure code in runtime or lookup a function though its namespace and symbol, this way of developing on this framework is deadly simple, Deliver the full name of the function and its parameter in map when invoking is all needed. In this way, functions to invoke are all normal one, definition is not required. It looks convenient, but inefficient due to function name encoding/decoding and parameters with full map key names and this RPC is only available in Clojure applications, which means other programming languages cannot understand the data easily.

In the second project, I used Protocol Buffers from Google instead. Profobuff require developer to define all command and their parameter and returns as message in a file. Google built some tools to compile those files to source code in the programming language we wanted. It is way more efficient than my previous home brew implementation, and also able to deliver messages between applications built upon different programming languages. But maintaining  a protobuffer definition file is cumbersome and not agile enough, things may be broken after recompiled.

When searching for RPC framework for my Rust projects. I want this it to be as flexible as what I built for Clojure, but also efficient. I tried protobuff and even faster Cap'n Proto, but not satisfied. I also cannot just copy the way I use on Clojure because Rust is static typed for performance and it is no way to link a function in runtime form a string. After I found tarpc (yet another project from Google), I was inspired by it's simplicity and decided to build it on my own.

The most impressive part I took from tarpc is the service macro, which translate a simple serials of statements into a trait and a bunch of helper code for encoding and decoding and server/client interfaces. Although tarpc is more complex because it also supports async calls, but the basics are the same. We still need to define what we can call from a client, but the protocol definition are only existed in your code. Developer can define the interface with service macro,  implement server trait and generate server/client objects. For example, a simple RPC server protocol can be something like this

service! {
    rpc hello(name: String) -> String;
}

This will generate the code we required for both server and client. Next we need to implement the actual server behavior for hello, like this

#[derive(Clone)]
struct HelloServer;

impl Server for HelloServer {
    fn hello(&self, name: String) -> Result<String, ()> {
        Ok(format!("Hello, {}!", name))
    }
}

To start the server

let addr = String::from("127.0.0.1:1300");
HelloServer.listen(&addr);

To create a client and say hello to the server

let mut client = SyncClient::new(&addr);
let response = client.hello(String::from("Jack"));

The response should have already contains another string from the server.

let greeting_str = response.unwrap();
assert_eq!(greeting_str, String::from("Hello, Jack!"));

Looks simple, idiomatic, like what we define a function in Rust itself. No need to create another file and compile it every time we make any modification on it, this it because the service macro did all of the job for us under the hood.

Most of the work was done in compile time, like hash function names into an integer for network transport, parameters for each function are all encoded into a struct for serialization. This requires developers to configure compile plugin, but the performance gain worth the effort.

There is still more works to do to improve the RPC, like add  promise pipelining, async call, version control etc. My next work is to understand and implement Raft algorithm based on this RPC framework, I have put my code on GitHub if you are interested to take a look.

Language choice for my next pivot

I had worked on some distributed system and system for a while by implement a key-value store based on RAMCloud design and some other trivial middleware development. After drill into high performance programming, I found myself hard to take control from JVM to do some significant optimization that really matter for a database system. Although I tried to overcome some of the problems by programming my code off-heap to gain memory efficiency without waste spaces on object header, but I think this is not what JVM was intended for developer to do so because it makes backfire (JVM crashing). The most severe problem is the GC in every higher level programming language that intended to free the developer from memory management. JVM in OpenJDK do a great job in most circumstances, but always failed in performance critical tasks by stop-the-world GC pause and lagging. JVM and Clojure runtime is also burdensome for this kind of project, It spent about 1.6 GB of RAM for Morpheus to startup which makes it impossible to run in embedded devices and cheap virtual machines.

I was expecting for a minimal runtime, multi-platform, running on bare-metal without a virtual machine, super fast, expressive and don't need to bother memory management but efficient programming language that is suitable for my system software design. I worked to write golang for a while, expecting it to my weapon of choice because it is lead by my most favourite company Google and it is also somehow famous in the community. At first, I did get great experience: the language itself is very easy to learn (tooks ne about 1 day), it's ecosystem is well developed, tool chain is not painful. But when I tried to build something a bit more larger, I found it missed some features that I was expected.

First of all, it does not support generic because the developer of golang teams want to keep the language clean and they think it is not necessary. Then I have to convert data types from every value I got from a hash map. To keep my functions flexible, I have to write long and lousy interface {} expression. Writing a sorting comparator is also painful because of lacking language features; Second, it has no macro. After I worked with lisp like languages, I found macro is the key feature to save my time on repeating code structure and also gain performance by do some job at compile time. But for some reason, golang just does not have such feature; Third, golang is not that fast as a system programming. In some standard test, golang is not able to overrun Java. It's garbage collection is also not as well developer as Java's. The implementation of simple hello world HTTP server is also failed to beat netty in almost every cases.

I really think I have to go back to C/C++ way, like most of the popular system did. Until I found another programming language name Rust, made by Mozilla, used to power their next generation web browser engine servo. I have head about rust for a while, by it's unique strict check and scope rules to manage memory in compile time like ARC from Apple LLVM but more strict and ensures no memory leak. It seems favour my need because I want to manage memory by myself only if I required. Rust provide scope rules based on ownership and borrowing, I can also manage memory on my own if I do the job in unsafe block. Even if I use my own way to manage memory, I can still use it's scope rules to release memory, like this

impl Chunk {
    fn new (size: usize) -> Chunk {
        let mem_ptr = unsafe {libc::malloc(size)} as usize;
        Chunk {
            addr: mem_ptr,
            segs: segments,
        }

    }
    fn dispose (&mut self) {
        info!("disposing chunk at {}", self.addr);
        unsafe {
            libc::free(self.addr as *mut libc::c_void)
        }
    }
}

impl Drop for Chunk {
    fn drop(&mut self) {
        self.dispose();
    }
}

After I use malloc from libc, I don't need to explicitly decide when to free but let rust to do the job by implementing Drop trait. This let me able to combine the power from both from C and rust itself. The concepts of ownership and borrowing seems annoying in the beginning because the pointers and objects cannot been delivered between functions freely, but it makes sense when considering the problem in deeper level. Rust seems make fine rules to avoid common problems that programmers might make from both concurrency and memory management.

Rust is light, you can even write a operating system with it because some university have already asked their students to do so. it is cross-platform because it use LLVM to compile. Rust have a pattern match based macro system and more in progress. It have cargo for package management. It supports almost every language features that a modern PL should have, like blocks, lambda, type inference, C libraries interop and more.

It seems nothing wrong about rust it self, I kinda like this language after 1 week of trial. But the downside is, this language is still favoured by small group of people. It's community is active and the people are nice, but there is not so much libraries that can just borrow and use. I my case, I can't find a well maintained concurrent hash map but have to do this on my own. Well that's how a open source works.

I have already halted the development of JVM based Morpheus and Nebuchadnezzar, start to redo what I have done with rust. Although I am able to inherit most of the ideals I learned from the previous version, I found it more challenging because there is more concepts that are operating system related and some of the work that have been done by others in Java have to migrate to rust on my own. I respect the power of community, the reason of why Java ecosystem is so powerful is noting but the cumulating of the works form the community members years by years.

Multifunctional Home Server Virtualization

My IDC server is down again, I reckon if it is possible to host the server in my home and retire the IDC server, then I can reclaim hardwares from the problematic server (it's storage is still decent) and also save CNY 4300 annually. I do have a spare machine that can do this job, it has 32 logic cores and 128GB of RAM. The problem is when I use it as my test server, I also use this machine as a windows gaming platform, which requires a powerful graphic card and windows operating system (my servers are all linux). When the machine be use as a server, it have to be stable, and prevent downtime as much as possible. I also want to use the graphic card on the machine for my GPU and CUDA experiments. The operating system for the experiment platform is Ubuntu and reboot the host server is not an option in this case.

Virtualisation looks like a solution. I was able to use this machine for both computing in Ubuntu Server and gaming platform in Windows with the help of Hyper-V by Microsoft in Windows 8. There will be 3 virtual server in the host, 2 of them need to use the graphic card in the host, because there is only one card available, there will only one of the two VM running in the same time. If the VM want to use hardware on the host PCI-E, it have to be passthrough to the VM. When a hardware was passthroughed, it will be unavailable for host and other VMs. It can be inference that if you want to use PCI passthrough for your graphic card, you also need another card for your host, or you can't see anything after the operating system booted.

IMG_2732
The final hardware configuration. Each VM have it's own storage in hardware RAID or dedicated hard drive. Host system will in the F60 SSD and a GT 210. The GTX 1080 is the graphic card for PCI passthrough.

Because Hyper-V does not support PCI passthrough, I have to looking for other solutions. As a result to my research, almost all of the solutions that support PCI passthrough virtualization are based on Linux kernel. The first one come into my mind is Xen Server from Citrix. It is free and open source. I have successfully installed the system on my host, installed Windows and passthroughed the graphic. But the problem is the driver from Nvidia cannot get the card working, Windows was able to recongize it's model but there is always a yellow exclamation mark in the device manager. I also gets black screen with no signal all the time then the VM started. I think that might be a compatible problem. Then I turned to VMWare ESXiunfortunately it's installer cannot load on my machine.

Two of the most authentic solution was passed, I think I have to build my own tools from scratch. Xen Server is a platform faces enterprises, based on Linux, Qemu and KVM. I found some articles to build my own virtualization solution on regular Linux distribution like Xubuntu 16.04 manually. The pro to this way is I can control every delicate details in setting up the VM for my system.

Get a normal VM without PCI passthrough working is easy. But for the machine that required real graphic card, it becomes tricky. First you need to blacklist your device in the kernel pci-stub to avoid it to be used in the host, then it need to be switch to VFIO for VMs. After following the instructions on the web page I mentioned, I get almost the same problem that I encountered in the Xen Server, there is no output in the graphic card. When I use the virtual graphic from Qemu, the driver is not working after installation.

I go through a lot of search and fount this article said that the new card cannot work with default seabios in the system, but OVMF instead. The solution is simple. First install OVMF bios:

sudo apt-get install ovmf

then modify  the bios parameter for your VM command with:

-bios /usr/share/ovmf/OVMF.fd

After that I was able to see pictures on the screen output form the passwhtoughed graphic card.

How about the performance? I did a quick test in the VM with 3DMark Demo. I got 6582 for graphics and 5947 for CPU (8 cores 16 threads). Looks like near native.

Comp

I have also setup a Ubuntu Desktop version use the same Qemu configuration and get CUDA SDK and mxnet examples working. Those two system are switchable by close one and starting another, without interrupting the third virtual machine running as a application server.

Lessons learned about locks

Locks are handy and unavoidable in parallel programming. Especially in some applications like databases, when there may be multiply threads modify one piece of data. Locks can make sure only one thread can get access and modify the data in the same time.

Morpheus key-value store rely on locks heavily. There is lock for segments in trunks, and lock for cells. Recently, I am trying to improve the performance of the store, and reduce memory footprint.

My first attempt is to replace the CellMeta Java objects with simple addresses (long data type) for each cell index. This can save at least 24 bytes each, that will be considerable amount of spaces when there may be millions of cell in the store per-node. But there will be one compromise. In the beginning, I use synchronized keyword to lock on cells. But when there is no object entity that have been replaced as address, there will also be no way to ues synchronized as locks. My first solution is to create 1024 locks for each trunk, each cell will assigned to one lock according by it's hash. That means, a locks may corresponding to multiply cells. It looks like a controllable solution, even JDK use this principle to implement it's ConcurrentHashMap. But it will produce deadlock you trying to make a lock chain in parallel. A lock chain means a cell will be locked when another cell be locked. When 2 or more locks be acquired in different order, deadlocks occurs. In the the key-value store case, deadlocks may happens even when lock targets are irrelevant because cells may share the same lock. After discover this defect, I implemented a lock cache. Because I don't want to create lock for every cell in the beginning, I only want lock objects to be created when it is needed to save memory space. When a cell is asked to be locked, the store will try to find the lock in the cache according to cell hash. If it was found, lock will locked, or lock will be created. If the lock is asked to be released and lock count of a cell is zero, the lock will be removed from cache and waiting for GC to remove it from memory. The cache did a little impact on performance, but more memory efficient. It is equivalent to previous synchronized way, a bit heavy but with able to provide read write lock feature rather than spin lock monitor.

I use reentrant locks in the first time. But I used to not fully understand what the term 'reentrant' means. It means you can lock the same lock in the same thread more than one times, without cause deadlocks[1]. This feature is useful to build more clean code without concern about the lock status. The rule of using a lock is lock and unlock appears in pairs regardless what happened between the lock period even a exception thrown. In Java and Clojure, unlock should be put in finally block of a try clause, catch block is irrelevant. When you are using reentrant lock, lock time must equal to unlock one, or, deadlock and exceptions will occur. This rule looks easy, but in some situation, developers still need to track the locks, like reference count in old school iOS programming without ARC. I made a mistake by use some judgements like if (! l.isLockedByCurrentThread()) l.lock() and if (l.isLockedByCurrentThread()) l.unlock(). This will make the lock been locked in the outer layer but be released in the inner layer, which will cause inconsistency.

Programming concurrent applications is a delicate work. Developer have to consider all of the possible corner cases and use locks prudently. Use locks too much or in unnecessary places will damage the performance, but not using it when required will cause unexpected states and data corruption. Some applications claimed they have better performance to others, but unstable in practice, it is likely because the developer traded stability for performance. Lots of the lock problems also are hard to been tested without proper production level stress, a mature project should test all of the possible circumstances before release..

Graph traversal in distributed systems

Last month I have finished depth-first search and breadth-first search in Morpheus. Morpheus, based on hash distributed key-value store, requires to traversal vertices in distributed and even parallelised method.

The first thing comes to my mind is building a messaging passing infrastructure. Message passing infrastructure is almost the same as RPC in cluster connector, but use more succinct and determined data format to improve it's performance. Message passing is asynchronous, it means that sender does not wait for receiver to complete the actual processing, but for the delivery. Each message for a task have a 128 bit long ID for nodes to determinate the messages from the tasks. Each message also corresponding to one action, enables messages dispatcher to invoke the function bind to the action. Just like the RPC in cluster connector, if the message receiver is the same as sender, the message will not go through the network interface even will not been serialize or deserialize for transmission. 

Both DFS and BFS algorithm are built based on this infrastructure. But their behaviours are vary. For example, DFS cannot been paralleled. To conduct DFS on distributed system, the stack to the visited and discovered nodes must be transfer to the nodes once at a time for updates. BFS, in the other hand, is able to paralleled by conduct each node to discover the vertices children they belongs to in each level. We will discuss how those 2 kinds of graph traversal been implemented in Morpheus in next few chapters.

DFS in Morpheus, adapted ideas from S.A.M. Makki and George Havas thesis. Passing vertices stack through nodes for update. This method is single threaded because depth-first search is inherently sequential. DFS in Morpheus is a good choice when the size of subgraph that the start vertex belongs is not very large. It is also suitable for conventional graph traversal for user queries like detect links and existence of path.

BFS in Morpheus, is more complicated. Currently, Morpheus supports parallel search in each level. Morpheus rely on nested fork-join multithread pattern, illustrated in the figure below

Snip20160722_10

Consider the nodes contains vertices represents as A, B, C in 'Parallel Task I'; 'Main Thread' as the coordinator server (may be any one of the nodes in the cluster); 'Parallel Task I', 'Parallel Task II', 'Parallel Task III' as search task for each level. One BFS request will contain start vertex, end vertex and search rules. Here is what happend when the search begins.

  1. The coordinator first put the first vertex in the search list, send it to the node that belongs to the first vertex by it's id hashing and wait for the return message.
  2. The node received the search list and get the neighbour ids to the vertices in parallel according to search rules.
  3. When all parallel search for search ended, it send the search result to the coordinator as return message.
  4. The coordinator may receive return messages from multiply servers (it is not possible when there is only one vertex in the search list). When each of the return message arrived, it will tried to update local task status for each vertices, indicates whether have been visited, level, and parent
  5. After the coordinator received all of the return messages, it will extract the vertices for next level from local task status, partition the vertices by their servers into search lists and send them to their server
  6. Level advanced, step 2 will take the job. The iteration will continue until stop condition fulfilled or reached the maximum level in search rules when executing step 5

The idea to this algorithm was adapted from Lawrence National Laboratory on BlueGene. But I think I have not yet fully replicated their design, because in my implementation, coordinator will become the bottleneck in step 4 and 5; it will also cost a lot of memory to cache the vertices that have already been discovered, that's why I also provided on-disk cache for this occasion. BFS is used for finding shortest path, faster but more resource consuming path finding. It may also become the backbone of many other algorithms like fraud detection and community discovery.

To distribute task information like task id, parameters to each node in the cluster, we need a global shared hash map that any of it's changes can been updated to each node. I use paxos to do the job. There are also some other components like disk based hash map might come handy. I packed those computation related into hurricane component.

If you want to take a closer look at my actual implementation please refer: DFS, BFS

Crashing the G-WAN web server, at the speed of light

Recently I signed a job, working as a humble software engineer in a cloud gaming company at their server team.

My employer, which is also a software engineer, mostly working on game engines, writing C++ and C#, holding some game demo I.P (hopefully) and claimed that he solved the problem of building a profitable 3D cloud gaming servers, which is the major technical problem that would impact their business model (again, hopefully, most of the cloud gaming providers got bankrupted or get low profit due to server and bandwidth expense). Briefly, he has no any actual experience on how to build a cloud server platforms. That's why I thought I might be able to help him building their products that is robust and scaleable.

Although I write Java and Clojure code. I don't detest C/C++ and any other programmings that would compile to machine code for performance. Actually, I love high performance, and I really expect to learn anything about them form the job.

They have a great vision, and guaranteed they would sell their company for 3 billion dollars and everyone in the company would got 5 million dollars.

Lookin good, "what should I do?" I asked. "We have found ultimate solutions", they smiled in mysterious. I was really intrigued to know what secret they found. "We are asking you to write our web server in C++ programming language". Sounds challenging, and I like challenges in my work. Later they presented me a web server that I have never heard: G-WAN.

They told me to do some research on this one (another software is ArangoDB, that was fine). Because they did't tell me what I am actually going to do in the project, I started from searching on the internet for introductions and documentations.

The first thing I investigated is their web site.

Snip20160607_1

The web page design looks decent. What they focus on the front page is like "It's the era of multicore computing". I surely know that, the words looks are just written for those who don't have experiences on server programming and project managers.

What's next? In the about page, I expected some more detail information about how this software works. Instead, I got this.

Snip20160607_3

I smell taste of mouldy medals. Then I tried to look into this website and trying to find more technical from this web site. In the FAQ page, I started to felt strange and disturbing.

Why? Because non of the cases in the FAQ use modern hardware and operating systems. Instead there are plenty of ancient and outdated configurations. Like more faster and scalable Ubuntu 10.04, their usually tested CPU is Intel Xeon W3680 @ 3.33GHz (6 Cores). I start to wondering am I just jumped through to the time that I have just enrolled my undergraduate school. The web site is more like a fraud to me. But I may be wrong, so I start to find how many servers are powered by G-WAN.

Shodan can do this job for me, and it did give me an interesting result. As a server have released for about 8 years ago, there are only approxmy 21 HTTP servers online and most of them are serving static contents or totally abandoned.

Snip20160607_4

 

I stopped to take deep look on this project because I know there must be some reason for this general purpose server not to be accepted and got so few users, because even the Clojure http server http-kit got at least 800 sites, it is young and not the most popular one in the small Clojure community.

I start to search the server name in Google. There is not much about it, but there are some threads from Reddit, Hacker News.

https://www.reddit.com/r/linux/comments/2mgo7o/wtf_is_gwan_web_server/

https://news.ycombinator.com/item?id=4109698

https://news.ycombinator.com/item?id=8130849

and some blogs

https://riccardo.forina.me/why-i-ll-never-trust-g-wan/

http://tomoconnor.eu/blogish/gwan-snakeoil-beware/

and Wikipedia debate

Most of them are really critical and I don't know how much they suffered to get angry like this. What is the worst thing can possible go wrong on a web server? Finally I found out the answer later by myself.

I reported my research to my boss and telling them I am not recommend this software in our projects. But it seems they don't agree with that and tells me to do the test and make my decision.

Unfortunately, I got really bad stomachache that day an have to stay at home and waiting for my No.2 to come out at any time (sorry for the description). And I also realised that there is no qualified server grade equipments in the office and the only way to perform a comprehensive test is to use my own cluster that I was only use do my own develop and run my own project. Because I really want to know how fast G-WAN can be, I suggested to stay at home test G-WAN and other candidates on my equipments.

Then I totally destroyed any my last expection on the software. For the record, I posted the results from Apache Bench and Weighttp to the GitHub Repo. I have to say, it is not  a complete test, but I am pretty sure G-WAN is totally and utterly a bad choice to any projects that are not toys.

Because it crashed under 1k concurrency test, in a second.

Why it crashed? I am wondering. I started to look for any clues, but nothing left in the log, except this:

Could not attach to process. If your uid matches the uid of the target
process, check the setting of /proc/sys/kernel/yama/ptrace_scope, or try
again as the root user. For more details, see /etc/sysctl.d/10-ptrace.conf
ptrace: Operation not permitted.
No frame selected.
Signal : 11:Address not mapped to object
Signal src : 1:.
errno : 0
Thread : 10
Code Pointer: 000000404f4c module:gwan function:?? line:0
Access Address: 000000000018
Registers : EAX=ffffffffffffffff CS=00000033 EIP=000000404f4c EFLGS=000000010246
 EBX=7fa0b19ff000 SS=00000000 ESP=7fa0d8ffa170 EBP=7fa0d8ffa1c0
 ECX=000000000008 DS=00000000 ESI=0000ffffffff FS=00000033
 EDX=000000000008 ES=00000000 EDI=7fa0b1a009b0 CS=00000033
Module :Function :Line # PgrmCntr(EIP) RetAddress FramePtr(EBP)
No available stack frames
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[1] 31224 segmentation fault (core dumped) sudo ./gwan

Still does not make any sense. What I can see from this "Segment Fault" is the developer got backfired from using dark magic reading deallocated memory address (I got almost the same problem on my graph database project when operating unsafe memory, but fixed).

I reported this exception to my boss. He said "That's must be your fault. You must use it wrong".

I panicked.  I can foreseen my miserably life, never have a chance to go home at the time I should be, playing with my cat and my PlayStation.

He may discovered and my co-worker also have no faith on developing a regular web server based on such an unstable foundation. My boss agreed for us to use golang instead.

"What a nice day" I think. "But I will use it on my own", he said. I smiled at him and did't say anything else.

So, why they are so crazy about this. It seems most of the people are attracted by it's amazing compare chart.

local_nginx_lighty_gwan_100k

 

G-WAN is above 2x faster than any other web servers, including Nginx, the one that considered as the most fast and widely-used web server in the world, looks like crap in the chart. If that was true then those open source authors are either dumb-ass or G-WAN guys are really genius. But truly, they are not.

They are even not close to their claim. According to my test results, G-WAN is only at most 30% faster than Jetty. Compare to golang, G-WAN is 100k qps against 70k qps goroutine without any frameworks. But when you consider start to build something upon G-WAN, it is going to a nightmare.

I am not trying to persuade my employer to give up hope on it, because he paid for the support subscription. Looks like he trusts sales men more that his own team. Nice job on helping those rich Swiss people. He will not understand what I have been suffered until he did the same evaluation like I did.

One week after I submitted my report, those "TrustLeap" guys gives me their comment on my test and totally ignored my questions about how the crash happend. They criticize me not using a really fast operating system (which the newest one is not the fastest), my kernel version is too high, I tested their web server with X window started. But they just didn't explain anything about WHY THEIR SERVER CRASHED. They implied that the only way to run their server right is to use old OS like Ubuntu Server 10.04, which have already been stopped supported from Canonical. The is how those guys treat their customers.

I was so furious about wasting my time on such program and this problematic stuff not getting recognized by my employer. In another perspective, it is a successful project. It did attracted some people like my employer to pay for it even he did not know anything about distributed systems, scaleability and software architecture. It is also not a successful project because their trick is so obvious to professionals that can only fool newbies. I am going to end this unpleasant article with quote by voretaq from ServerFault.

I thought the IT world learned its lesson about sacrificing all in the name of performance with qmail and the rest of the djb family of things...