LinkedIn: Creating a Low Latency Change Data Capture System with Databus
This is a guest post by Siddharth Anand, a senior member of LinkedIn's Distributed Data Systems team.
Over the past 3 years, I've had the good fortune to work with many emerging NoSQL products in the context of supporting the needs of a high-traffic, customer facing web site.
In 2010, I helped Netflix to successfully transition its web scale use-cases from Oracle to SimpleDB, AWS' hosted database service. On completion of that migration, we started a second migration, this time from SimpleDB to Cassandra. The first transition was key to our move from our own data center to AWS' cloud. The second was key to our expansion from one AWS Region to multiple geographically-distributed Regions -- today Netflix serves traffic out of two AWS Regions, one in Virginia, the other in Ireland (F1). Both of these transitions have been successful, but have involved integration pain points such as the creation of database replication technology.
In December 2011, I moved to LinkedIn's Distributed Data Systems (DDS) team. DDS develops data infrastructure, including but not limited to, NoSQL databases and data replication systems. LinkedIn, no stranger to building and open-sourcing innovative projects, is doubling down on NoSQL to accelerate its business -- DDS is developing a new NoSQL database called Espresso (R1), a topic for a future post.
Having observed two high-traffic web companies solve similar problems, I cannot help but notice a set of wheel-reinventions. Some of these problems are difficult and it is truly unfortunate for each company to solve its problems separately. At the same time, each company has had to solve these problems due to an absence of a reliable open-source alternative. This clearly has implications for an industry dominated by fast-moving start-ups that cannot build 50-person infrastructure development teams or dedicate months away from building features.
Change Data Capture Systems
Today, I'd like to focus on one such wheel-reinvention: Change Data Capture systems
Relational Databases have been around for a long time and have become a trusted storage medium for all of a company's data. In other words, it is the source of truth for a company's business-critical data. Often times, data is pulled off this primary data store, transformed, and then stored in a secondary data store, such as a data warehouse. This secondary store typically supports the data analytics that drive business insights and direction. In this scheme, the two stores are known as the OLTP store and the OLAP store, respectively. (F2)
All of this has been around for decades, so what is new? Increasingly, data from the primary store is used to feed more than just business decisions. At LinkedIn, it also feeds real-time search indexes, real-time network graph indexes, cache coherency, Database Read Replicas, etc... These are examples of LinkedIn's near-real-time data needs.
If you have ever worked in the area of transferring data from the primary store to secondary stores, you are no doubt familiar with the options available to you.
For example, if you are working in the OLTP-to-OLAP space, you are using some sort of ETL (Extract, Transform, and Load) technology. This space has seen innovation around tooling (e.g. making it easy to define a transformation using GUI drag-and-drop tools) and cross-vendor integration (e.g. Oracle to Teradata, Aster Data, etc...). The industry typically uses ETL to run nightly jobs to give executives a view of the previous day's, week's, month's, year's business performance.
What do you do if you need a stream of near-real-time updates from your primary data store, as shown below for LinkedIn's near-real-time needs?
Outside of costly and proprietary vendor-specific products, there are few options.
Introducing Databus
Databus is an innovative solution in this space.
It offers the following features:
- Pub-sub semantics
- In-commit-order delivery guarantees
- Commits at the source are grouped by transaction
- ACID semantics are preserved through the entire pipeline
- Supports partitioning of streams
- Ordering guarantees are then per partition
- Like other messaging systems, offers very low latency consumption for recently-published messages
- Unlike other messaging systems, offers arbitrarily-long look-back with no impact to the source
- High Availability and Reliability
How Does Databus Work?
Databus is composed of 3 important pieces:
- Relays
- Bootstrap
- Client Library
The Databus architecture is shown in the images below. A Databus Relay will pull the recently committed transactions from the source Database (e.g. Oracle, MySQL, etc...) (Step 1). The Relay will deserialize this data into a compact form (Avro etc...) and store the result in a circular in-memory buffer. Clients (subscribers) listening for events will pull recent online changes as they appear in the Relay (Step 2). A Bootstrap component is also listening to on-line changes as they appear in the Relay.(Step 3)
If a Subscriber were to fall behind such that the data it requests is no longer in the Relay's in-memory buffer, the Subscriber can request Consolidated Deltas occurring since a time T in the past (Step 4). This will return an efficient representation of all changes that have occurred since time T.
If a new Subscriber, one with no prior knowledge of the dataset, were to join the party, it would need to fully bootstrap. At first, the Subscriber's Databus Client library would request a Consistent Snapshot at some time T in the past (Step 5). The client library would then request Consolidated Deltas since that time T (Step 6). After the Subscriber applies the Consolidated Deltas, the client library would switch to listening for online changes from the Relay (Step 7). The client library helps the subscriber get all changes since time T, where T can be any arbitrary point in time, shielding the Subscriber from the details of where the changes are coming from.
Databus' Bootstrap Component
One of the most innovative features of Databus is its Bootstrap component. Data Change Capture systems have existed for a long time (e.g. Oracle Streams). However, all of these systems put load on the primary data store when a consumer falls behind.
Bootstrapping a brand new consumer is another problem. It typically involves a very manual process -- i.e. restore the previous night's snapshot on a temporary Oracle instance, transform the data and transfer it to the consumer, then apply changes since the snapshot, etc...
Databus's Bootstrap component handles both of the above use-cases in a seamless, automated fashion.
How Does Databus' Bootstrap Component Work?
The Databus Bootstrap component is made up of 2 types of storage, Log Storage and Snapshot Storage. Log Storage serves Consolidated Deltas, whereas Snapshot Storage serves Consistent Snapshots.
- As shown earlier, the Bootstrap component listens for online changes as they occur in the Relay. A LogWriter appends these changes to Log Storage.
- A Log Applier applies recent operations in Log Storage to Snapshot Storage
- If a new subscriber connects to Databus, the subscriber will bootstrap from the Bootstrap Server running inside the Bootstrap component
- The client will first get a Consistent Snapshot from Snapshot Storage
- The client will then get outstanding Consolidated Deltas from Log Storage
- Once the client has caught up to within the Relay's in-memory buffer window, the client will switch to reading from the Relay
Future Plans for Databus
The engineers of the Databus and Espresso (our next NoSQL store, enough said) teams have been working tirelessly to support Databus replication within Espresso -- Databus will be Espresso's native internal replication technology. Additionally, once the team finds some free time, they will open-source it.
We are looking for engineers with a strong track record of solving tough problems to join us in DDS. Feel free to ping me on LinkedIn if you are interested.
What this means for NoSQL
As cool as Databus is, it won't work for all NoSQL stores. There is currently a big gap in the feature set provided by many NoSQL technologies, especially many Dynamo-style Key-Value stores. They do not provide a timeline-consistent change stream that Databus can pull.
Without this support, there are two unfulfilled use-cases:
- supporting outbound feeds into existing Business Intelligence infrastructure (i.e. nightly, ETL-oriented loads)
- supporting outbound near-real-time feeds into secondary indexes such as search, network graph, caches, etc...
Recently, for Cassandra, both Netflix and Ooyala solved this problem separately. Netflix published a tech blog about Aegisthus, a system to transform an eventually-consistent set of data files into a time-line consistent stream. This stream is currently consumed by Business Intelligence -- it's not real-time as it is depends on the memtable flush interval. However, with a few tweaks, it can be near-real-time. We look forward to the open-sourcing of that technology.
More importantly, we look to NoSQL vendors to solve this problem for their products.
More Resources
- Data Infrastructure @ LinkedIn -- QCON London 2012, Sid Anand, Senior Member, LinkedIn
- Databus: A System for Timeline-Consistent Change Data Capture -- CIKM 2011, Chavdar Botev, Senior Member, LinkedIn
- Data Infrastructure at LinkedIn -- XLDB 2011, Shirshanka Das, Senior Member, LinkedIn
- LinkedIn Infrastructure -- QCON SF 2007, Jean-Luc Vaillant, LinkedIn Co-founder and former CTO
Acknowledgements
I'd like to thank the tireless efforts of the engineers who built this system:
Aditya Auradkar, Chavdar Botev, Shirshanka Das, Dave DeMaagd, Alex Feinberg, Phanindra Ganti, Lei Gao, Bhaskar Ghosh, Kishore Gopalakrishna, Mihir Gandhi, Brendan Harris, Swaroop Jagadish, Joel Koshy, Kevin Krawez, Jay Kreps, Shi Lu, Sunil Nagaraj, Neha Narkhede, Sasha Pachev, Igor Perisic, Lin Qiao, Tom Quiggle, Jun Rao, Bob Schulman, Abraham Sebastian, Oliver Seeliger, Adam Silberstein, Boris Shkolnik, Chinmay Soman, Subbu Subramaniam, Roshan Sumbaly, Kapil Surlaker, Sajid Topiwala, Cuong Tran, Balaji Varadarajan, Jemiah Westerman, Zach White, David Zhang, Jason Zhang, Agila Devi, Neil Pinto, Ramana Ramakrishnan, Sai Sundar, Nishant Vyas, Agila Devi, Neil Pinto, Ramana Ramakrishnan, Sai Sundar and Nishant Vyas.
I'd also like to give a special thanks to Bob Schulman, Kapil Surlaker, and Shirshanka Das for their help with this article.
Footnotes
- Netflix's UK and Ireland customers benefit from Netflix's local Region presence in terms of snappy latency. If you are not familiar with AWS Regions, Regions provide geographic proximity to end users. Regions themselves are composed of several data centers, known in AWS-speak as Availability Zones. As the name implies, Availability Zones provide Disaster Recovery within the hosting Region. Disaster Recovery across Regions is never a good idea for a latency-sensitive application like a web site.
- OLTP (Online Transaction Processing) vs. OLAP (Online Analytic Processing) : This differentiates between their uses -- OLTP for primary data serving, OLAP for analytic processing of a modified copy of the primary data.
Reader Comments (10)
"Introducing Databus"...is this available open source? Will it be?
We'd like to start a discussion. It will be open-sourced soon. What do people think about it? How will people use it? etc...
We plan to open-source Databus later this year.
I look forward to it being open sourced. I think many people (myself included) will use it to ETL from legacy source databases where schema is not very well suited for loading delta sets of data. The commercial CDC products out there (e.g. Attunity) make it difficult for developers to customize and contribute to. I am working on a solution to grab CDC data by reverse engineering the SQL Server transaction logs and would be happy to conform to a common CDC abstraction.
I see this being used to bring transactions out of relational business software databases and into multiple specialized systems--a graph database, a statistical platform, an HDFS store, an in-memory BI store, and a stream database--all at the same time. I'm excited to hear about this because it is a critical piece of my vision for the future of business software.
First of all, thank you for sharing this with us.
I do not understand the ETL aspects well enough, but here is my attempt to see how MongoDB fits in this scheme. I am very likely overly simplifying things here, but please bear with me. :)
So there is a regular DB that we all know of which stores the primary data. This DB publishes it's changes sequentially, which are captured and kept in a fixed sized buffer and made available to subscribers. This buffer of changes is regularly backed on to disk. A new member intending to do a full sync can pull changes from the backed up copy. The data with the subscribers can be acted upon to derive other meaningful data.
It appears to me that MongoDB provides all these things, or at least the basic infrastructure to build such a system.
The regular DB is the MongoDB. It publishes the changes in the form of an oplog - which is typically memory mapped. A client can open a tailable cursor to the oplog collection and stream in the changes to the DB as they happen. The clients can be associated with secondary DBs arranged as part of a Replica Set, on which Map-Reduce or Aggregation tasks can be run. One of the secondaries can act like the "Bootstrap Server". (A new secondary can be brought up from another secondary by doing a full sync). The oplog of this special secondary can probably act like the "Relay" to avoid load on the primary.
Does this make any sense? Was MongoDB evaluated for this project?
Any thoughts on this would be appreciated.
OTOH, now if this has to work with a particular database then will there be "Databus drivers" for each of those databases? For instance, if this has to work with MongoDB, there has to be a MongoDB driver which can understand the oplog format published by MongoDB and make it available in a common format to clients.
Or would you be publishing a standard and asking databases developers to provide changes as per that standard?
Once again, thank you.
Btw, I am also waiting for that post on Espresso. :)
-- Brahmana
Sid, nice post. We're doing something similar at Yammer for our near-realtime search. Databus looks much more extensive, but the principles are similar. We're using BDB JE as the backing store for what's essentially a change set service. The Rails application, on updating an ActiveRecord model, beacons that change to the change set service. The change set service indexes the change. Another service traverses the change set index and generates Lucene indices. There's some extra bits for dependencies between models so when a dependent model updates, the model dependent upon it is "invalidated" so it gets reindexed.
I'd be really curious to learn more and to get a crack at the open source bits. What we have has served our needs for the most part, but it can definitely improve.
Sid,
Good post and thanks for sharing this. We are looking to yet re-invent the wheel creating something very similar to what you have described above. Do you have any idea when the bits may be available to start trying out? We would be very interested in leaning more about the offering and taking a look at the open source code and contributing back if applicable.
I'm trying to find the specifics on how the relay pulls committed changes out of the DB.
Is there any video or document describing this in detail?
As I understand it's used today on Oracle. Does the technique depend on specific Oracle capabilities or can it be implemented on a "generic" RDBMS?
Can Databus Relay pull Transactions from MS SQL Server (the source Database).