Open-sourcing Rocksplicator, a real-time RocksDB data replicator

Pinterest Engineering
Pinterest Engineering Blog
8 min readNov 4, 2016

--

Bo Liu | Pinterest engineer, Infrastructure

Pinterest’s stateful online systems process tens of petabytes of data every day. As we build products and scale billions of Pins to 150 million people, we need new applications that work in a way where computation co-locates with data. That’s why we adopted RocksDB. It’s adaptable, supports basic and advanced database operations with high performance and meets the majority of requirements for building large-scale, production-strength distributed stateful services. Yet two critical pieces were missing for us: real-time data replication and cluster management for RocksDB-based stateful services. To fill this gap, we built a RocksDB replicator–Rocksplicator–a cluster management library, as well as tools for RocksDB-based stateful services. Today we’re open-sourcing the project on GitHub for all RocksDB users.

Before we dive into Rocksplicator, let’s first see three examples where this technology helps engineering teams at Pinterest.

1. Powering ranking systems with machine learning

Personalized machine learning models rank Pin candidates so Pinners discover relevant ideas to their interests and tastes. We extract features from several KB of data to compute the relevance between a person and a Pin. For every Pin recommendation, our scoring system computes tens of thousands of relevance scores between a set of candidate Pins and a Pinner. Previously, we used a stateless ranking system that pulled required data through the network. But as we grew, network bandwidth consumption increased dramatically and high service latency became a pain point of the core Pinterest system. To solve these problems, we built a stateful ranking system to co-locate computation with data and greatly improved network bandwidth consumption and service latency.

2. Online event tracking system

So Pinners don’t see duplicate Pins, we take into consideration each Pin someone sees and how many times they see it (in case we want to do soft deduplication). To maintain impression events in real-time and support online queries for months of data, it’d be inefficient to use a traditional standalone database with a Read-Modify-Write model. Instead, we co-locate computation with data so the Read-Modify-Write kind of application logic happens as the data node end merges.

3. Analytics system

Our analytics system has a broad range of use cases, such as ads reporting and A/B test result reporting, and supports time series counters and time series application-specific data structures like HyperLogLog. Each data point stored in the system has multiple client optionally specified tags used for arbitrary aggregations and breakdowns. To answer a single client request, an analytics system usually scans hundreds of MB of data, and the eventual response returned to client is often very small. This makes it challenging (if not impossible) to build a large-scale, efficient analytics system without co-locating computation with data.

Adopting RocksDB

Before introducing RocksDB into our technical stack, we had MySQL, HBase and Redis running as storage systems. For this new group of applications, we decided to not use any of these systems for a few reasons. For MySQL, it’s difficult to co-locate computation with data. With HBase, we had challenges developing complex logic through its co-processor framework, and it has long tail serving latency. For Redis, its replication feature didn’t have the best reliability at scale.

RocksDB gave us an embeddable persistent key-value store for fast storage. As mentioned earlier, RocksDB is adaptable, supports basic and advanced database operations with high performance. And while we found the majority of our production needs were met, we built a RocksDB replicator, cluster management library and tools for RocksDB based stateful services.

Rocksplicator

Design decisions

Rocksplicator was implemented in modern C++, and we made the following technical decisions as we designed it.

Async Master-Slave replication only

Besides Master-Slave replication, an alternative way of doing data replication is employing a consensus algorithm. A consensus algorithm-based solution achieves better write reliability and stronger data consistency guarantee, but usually has lower write throughput. Our applications are perfectly fine with the data consistency model provided by async master-slave replication. We also recognized the challenge of implementing a one hundred percent correct high performance consensus algorithm with limited time and resources, so our RocksDB replicator only supports async Master-Slave replication.

Replicating multiple RocksDB instances in one process

A typical stateful service process hosts multiple shards which makes it easier to expand cluster capacity or redistribute uneven workload. RocksDB allows multiple RocksDB instances to be embedded into one process, and the data stored in them is isolated between instances. We conveniently use a physical RocksDB instance to model a logical DB shard, so the library should replicate multiple RocksDB instances in parallel.

Master/slave role at RocksDB instance level

Instead of services processes, we assign master/slave roles to RocksDB instances. This allows us to mix master and slave shards on each host of a cluster so hardware resources can be fully utilized for the entire cluster.

Work reactively

The Rocksplicator library needs to be robust and performant, so we limited its responsibility scope. It provides two APIs, one for adding a DB to replicate and the other for removing a DB from the library to stop replication. The library only does data replication and isn’t responsible for other tasks such as determining the topology of replication graph or replication role for each shard. These tasks are handled separately by the cluster management library and tools, which we’ll cover later.

Optimize for low replication latency

We minimized the delay between the moment an update is applied to its master and the moment the same update is applied to all slave(s). This reduces the time window for potential data loss if there’s a system crash.

Implementation

To exchange RocksDB updates, replicators need to talk to each other across networks. We employ async fbthrift server and client for this task.

Typically, a persistent log of all updates applied to the master is needed to replicate updates among a set of replicas. Each update in the log should be tagged with a global unique ascending sequence number so master/slaves know the next updates to be sent/requested. This is a non-trivial task, especially when moving master role from one host to another comes into the picture. After verifying a set of properties of RocksDB WAL sequence number, we decided to use RocksDB WAL as the persistent log and RocksDB sequence number as the global replication sequence number. What we verified includes:

  • RocksDB WAL sequence number always starts with 0 for a newly created DB
  • It increases by one for every update applied
  • It’s preserved during DB close/open or DB backup/restore

RocksDB provides two convenient APIs, GetLatestSequenceNumber() and GetUpdatesSince(). These APIs allow us to use its WAL sequence number for data replication. By reusing RocksDB WAL, we minimized the performance penalty that could have been introduced by an additional persistent layer. Essentially, the write throughput is bounded by RocksDB write throughput.

Hybrid pull- and push-based replication

Broadly, there are two data replication models, pull-based and push-based models.
For pull-based models, each slave periodically checks its latest local sequence number and requests its master for new updates since that. For push-based models, the master tracks the latest sequence numbers for all slaves and proactively sends data to them whenever it has newer updates. A pull-based model is simpler to implement, while push-based has lower replication latency. We employed a hybrid approach to achieve both simplicity and lower latency. In our implementation, the master doesn’t track slaves’ latest sequence numbers. Instead, it serves replicate requests from slaves. If there are no new updates after the sequence number specified in the slave’s request, the master holds the request for a configurable amount of time before sending back an empty response to the slave. Whenever the master receives a new write, it fulfills currently held requests immediately. Note the slave side replicate request timeout value has to be larger than the waiting timeout used by the master. Both timeout values are defined within the replicator library, so it’s easily enforced.

Data replicating workflow

Figures 1, 2 and 3 demonstrate replicator internal workflows for different scenarios. Rocksplicator runs an internal fbthrift server to handle replicate requests from remote slaves. It also has a small worker thread pool to do CPU intensive work and to make blocking calls to RocksDB. These blocking calls include getting the latest sequence number from a local slave, applying new updates to it and getting updates after a specific sequence number from a local Master.

Figure 1 shows the workflow for replicating data to slaves. For every local slave, the worker thread pool repeatedly does step 1 through 4. The delay between step 2 and 3 can be large (the remote master may not have the requested data at the moment), so async fbthrift client is used.

Figure 2 shows the workflow for handling a replicate request for data currently available.

Figure 3 shows the workflow for handling a replicate request for data currently unavailable. When the worker thread learns the requested data isn’t available at step 4, it will register for notification when new update to the corresponding DB is available. Once a new write is applied to DB1, a worker thread (can be any thread in the worker thread pool) will activate immediately to fetch the new update from DB1 and pass it to the fbthrift server for sending back to the remote slave.

Common system architecture

We’re using Rocksplicator in several production systems. Each shares the same architecture as depicted in Figure 4. The grey box is a service process which has several embedded RocksDB instances, a RocksDB replicator, admin logic and application logic. All green components are libraries or tools built in-house at Pinterest. The black components are third party open-source libraries or systems. The red box is application-specific logic. If we need to build a new RocksDB-based stateful service, we implement the red application part and don’t need to handle the most complex work of building a distributed stateful service such as data replication and cluster management (like expand/shrink cluster, move shards, failover masters, etc.).

The admin library manages the life cycles of all local RocksDB instances. It loads the cluster config from ZooKeeper when the process first starts, in which the cluster shard mapping is stored. It also receives admin commands from the admin tools to open/close/backup/restore local RocksDB instances, or to change the roles or upstream network addresses for local RocksDB instances. Every time the application logic needs to access RocksDB, it will first get a DB handler from the admin library and then use the handler to read/write the corresponding DB.

Open-sourcing Rocksplicator

We’re open-sourcing Rocksplicator on GitHub and including all the green components in Figure 4. It has a set of tools and libraries, including:

  • RocksDB replicator (a library for RocksDB real-time data replication)
  • Cluster management library and tools for RocksDB replicator-based stateful services
  • An example counter service that demonstrates how to use RocksDB replicator and cluster management library
  • A set of other libraries such as async fbthrift client pool, fbthrift request router, concurrent ratelimiter, a stat library for maintaining and reporting server stats and more.

We hope you find Rocksplicator as useful as we do. We can’t wait to see new ideas from the community. Together we can make RocksDB data replication better for everyone.

Acknowledgements: The contributors to this project include Shu Zhang, Jian Fang and Bo Liu.

--

--