« Paper: Actor Model of Computation: Scalable Robust Information Systems | Main | Stuff The Internet Says On Scalability For October 17th, 2014 »
Monday
Oct202014

Facebook Mobile Drops Pull For Push-based Snapshot + Delta Model

We've learned mobile is different. In If You're Programming A Cell Phone Like A Server You're Doing It Wrong we learned programming for a mobile platform is its own specialty. In How Facebook Makes Mobile Work At Scale For All Phones, On All Screens, On All Networks we learned bandwidth on mobile networks is a precious resource. 

Given all that, how do you design a protocol to sync state (think messages, comments, etc.) between mobile nodes and the global state holding servers located in a datacenter?

Facebook recently wrote about their new solution to this problem in Building Mobile-First Infrastructure for Messenger. They were able to reduce bandwidth usage by 40% and reduced by 20% the terror of hitting send on a phone.

That's a big win...that came from a protocol change.

Facebook Messanger went from a traditional notification triggered full state pull:

The original protocol for getting data down to Messenger apps was pull-based. When receiving a message, the app first received a lightweight push notification indicating new data was available. This triggered the app to send the server a complicated HTTPS query and receive a very large JSON response with the updated conversation view.

To a more sophisticated push-based snapshot + delta model:

Instead of this model, we decided to move to a push-based snapshot + delta model. In this model, the client retrieves an initial snapshot of their messages (typically the only HTTPS pull ever made) and then subscribes to delta updates, which are immediately pushed to the app through MQTT (a low-power, low-bandwidth protocol) as messages are received. When the client is pushed an update, it simply applies them to its local copy of the snapshot. As a result, without ever making an HTTPS request, the app can quickly display an up-to-date view.

 

We further optimized this flow by moving away from JSON encoding for the messages and delta updates. JSON is great if you need a flexible, human-readable format for transferring data without a lot of developer overhead. However, JSON is not very efficient on the wire. Compression helps some, but it doesn’t entirely compensate for the inherent inefficiencies of JSON’s wire format. We evaluated several possibilities for replacing JSON and ultimately decided to use Thrift. Switching to Thrift from JSON allowed us to reduce our payload size on the wire by roughly 50%.

On the server side Facebook also innovated: 

Iris is a totally ordered queue of messaging updates (new messages, state change for messages read, etc.) with separate pointers into the queue indicating the last update sent to your Messenger app and the traditional storage tier. When successfully sending a message to disk or to your phone, the corresponding pointer is advanced. When your phone is offline, or there is a disk outage, the pointer stays in place while new messages can still be enqueued and other pointers advanced. As a result, long disk write latencies don't hinder Messenger's real-time communication, and we can keep Messenger and the traditional storage tier in sync at independent rates. Effectively, this queue allows a tiered storage model based on recency.

Delta based syncing strategies are very common in Network Management systems where devices keep a north bound management system in sync by sending deltas. The problem with a delta based approach is it's easy for clients to get out of sync. What if changes happen at a rate faster than than deltas can be moved through the system? Or queues get full? Or the network drops updates? The client will get out of sync and it won't even know it. Sequence numbers can be used to trigger a full sync pull if there's an out of sync situation.

It's also good to see Facebook got rid of JSON. The amount of energy and bandwidth wasted on moving and parsing fat JSON packets is almost criminal.

Reader Comments (5)

A snapshot + delta scheme is rather primitive.

As you note, it doesn't handle overflow of the delta queue and can result in unnecessary repulls of the baseline snapshot. If you're clever, a sequence/epoch number can enable a delta computation to be computed even after an indefinite disconnection period.

Also, delta queues are unable to "fold" offsetting operations that occur when offline (e.g. insert of a record followed by its later deletion shouldn't be transmitted at all).

Finally, the biggest feature missing from primitive sync+streaming implementations is that they lack the concept of "transactions". For example, a sequence of deltas will only be applied once the entire transaction message is received.

October 20, 2014 | Unregistered CommenterASM

Good points.

It often depends on the kind of data you are moving around. A counter will want to see deletions so not transmitting deletion state would be a problem. Also some state machines may listen for deletion events to perform some kind of reinitialization logic, for example. I've kept meta data per property to indicate deletion, creation, and update count so the client can take appropriate action without the cost of transmitting the entire event stream. Very useful when events flap a lot.

Wouldn't the delta calculation require the server side to keep state around so the events a particular client missed could be recreated? I wonder if that's a greater cost than a resync?

And do you think syncing is transactional? I've never thought if it that way.

October 20, 2014 | Registered CommenterHighScalability Team

These are two different mechanisms of communication and state change transfer. Either the client periodically pulls the data from the server, or the server create a long running connection with the client and keep on sending updates (messages) on that socket connection.

Very clearly, there is a lot of resource wastage in the first approach.

In the second approach, the onus now is on the server to maintain the state and keep the client in sync with all the updates. Given the scale that facebook operate at, it is remarkable that they have build the servers to maintain state for hundred of millions of mobile users.
Now server has to detect (determine) when the client goes out of sync. This could be achieved by sending the acknowledgement back on delivery of the message and server would choose to resync the client if the client acknowledged that it has lost state.

October 20, 2014 | Unregistered CommenterAP

I'm reading this quickly, but does this violate the first link in this article and the Big Cookie model? Just curious.

October 22, 2014 | Unregistered CommenterJames

Hi,

@AP: It's true that using persistent connections in not only more efficient in terms of resource usage, but also latency could be highly enhanced with persistent connections. However, sending acknowledgement back on delivery of the message could break the scalability of the system. This could be done more efficiently using sequence numbers, as Todd suggested in the post. Nevertheless, implementing guaranteed delivery using sequence numbers is not as simple as it seems, especially if low message latency is required.

At MigratoryData we have spent man-years to achieve both very high scalability and guaranteed delivery:

Guaranteed Delivery

To achieve guaranteed delivery, we use sequence numbers for messages. So, if a message is lost, a client automatically reconnects to another cluster member by providing the latest valid sequence received and it gets the messages starting with that sequence from the new cluster member. The principle is simple, however we had to resolve very complex problems:

- Our solution was to use an in-memory cache for each cluster member
- But, such caches needed to be synchronized among cluster members in the presence of failures
- To this end: we implemented efficient distributed algorithms for fault-tolerant messages replication among cluster members
- Finally, in-memory caching consumed memory and thus we implemented TTL for messages. Expired messages were now automatically and efficiently removed from the cache

Very high scalability

Because there is very little interaction between cluster members, MigratoryData comes with near-linear horizontal scalability. What is special is that we've achieved a vertical scalability leap:

- 12 million concurrent persistent connections on a single 1U server while pushing 1 Gbps real-time data (http://t.co/8TV9jrJTbw)
- Push 8.88 Gbps real-time data throughput on a 10 GbE network from a simgle 1U server (http://t.co/vHe2Oxo5nM)

That said, as MigratoryData is used in production since a couple of years to deliver real-time messages along persistent connections to tens of millions of end users in a reliable way, our experience is that it is possible to scale to millions of users and have guaranteed delivery.

Thanks,
Mihai

October 22, 2014 | Unregistered CommenterMihai Rotaru

PostPost a New Comment

Enter your information below to add a new comment.
Author Email (optional):
Author URL (optional):
Post:
 
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>