Entries in xmpp (4)

Thursday
Jan222009

Heterogeneous vs. Homogeneous System Architectures

I follow a certain philosophy when developing system architectures. I assume that very few systems will ever exist in a consistent form for more than a short period of time. What constitutes a “short period of time” differs depending on the specifics of each system, but in an effort to quantify it, I generally find that it falls somewhere between a week and a month. The driving forces behind the need for an ever changing architecture are largely business requirement based. This is a side effect of the reality that software development, in most cases, is used as a supporting role within the business unit it serves. As business requirements (i.e. additional features, new products, etc.) pour forth, it is the developer’s job to evolve their software system to accommodate these requirements and provide a software based solution to whatever problems lay ahead. Given that many businesses can be identified as having the above characteristics, I can now begin to explain why I believe that Heterogeneous System Architectures hold a significant advantage over Homogeneous System Architectures, in many distributed system cases.

Click to read more ...

Monday
Oct272008

Notify.me Architecture - Synchronicity Kills

What's cool about starting a new project is you finally have a chance to do it right. You of course eventually mess everything up in your own way, but for that one moment the world has a perfect order, a rightness that feels satisfying and good. Arne Claassen, the CTO of notify.me, a brand new real time notification delivery service, is in this honeymoon period now. Arne has been gracious enough to share with us his philosophy of how to build a notification service. I think you'll find it fascinating because Arne goes into a lot of useful detail about how his system works. His main design philosophy is to minimize the bottlenecks that form around synchronous access, that is when some resource is requested and the requestor ties up more resources, waiting for a response. If the requested resource can’t be delivered in a timely manner, more and more requests pile up until the server can’t accept any new ones. Nobody gets what they want and you have an outage. Breaking synchronous operations into asynchronous operations by separating request and response into separate message passing actions, stops the resource overload. Instead of a system going down from too many parallel requests, it can works its way through a backlog of requests as fast as it can. And in most cases the request/response cycles are so fast that they appear like a linear sequence of events. Notify.me is taking the innovative and risky strategy of using ejabberd, an XMPP based system, as their internal messaging and routing layer. Will Erlang and Mnesia (Erlang's database) be able to keep up with traffic and keep low latencies as traffic scales? It will be interesting to find out. If you are interested in notify.me they've kindly offered 500 beta accounts for HS readers: http://notify.me/user/account/create/highscale

Who are you?

My name is Arne Claassen, the CTO of notify.me. I've been working on highly scalable web based applications and services for the past decade. These sites have employed various combinations of traditional scaling techniques such as server farms, caching, content pregeneration and highly available databases using replication and clustering. All of these techniques are ways to mitigate scarce resources (generally the database) being in contention by many users. Knowing the benefits and pitfalls of these techniques, it has become my focus to architect systems that circumvent scarce resource scenarios.

What is notify.me, why did you make it, and why is it a good thing?

notify.me is the brainchild of Jason Wieland, our CEO. It's a near real time notification service that alerts users to new content published on the web. It was created to address the common user pain of staying up to date of time critical events that occur on the web. For instance, a user searching for an apartment on Craigslist would want to be alerted once a new one matching their search criteria is posted. notify.me does the grunt work of repeatedly checking Craigslist for new listings and alerts the user once a new one is posted. Notifications can be delivered to instant messenger, desktop application, mobile device, email, and web application. Our goal is to create and publish open APIs allowing people to build new and interesting applications for generating and delivering information.

How does your service compare to other services people might be familiar with? Like Twitter, Friend Feed, Gnip, or Yahoo Pipes?

There are quite a few companies that are in our competitive landscape, some of which are direct competitors, like yotify.com or alerts.com. The main difference is our approach. Yotify and alerts are focused on being notification portal sites for users to visit. notify.me is a utility, with a focus of offering all the functionality available on the website via XMPP and REST APIs, allowing users to interact with notifications from the application of their choosing. We also allow for escalation of messages to destinations. For example if a user is not logged into their IM or have a status of away, notifications can be escalated and routed to their mobile device. In the messaging arena, we are nearly the opposite of twitter. Twitter is inward facing publishing model based on its own user generated content. Someone makes a tweet and it gets published to their followers. notify.me is creating an externally facing message delivery system. Users add any website that supports web feed standards, or redirect existing notification emails to us. If anything, we are a messaging pipeline that is complementary to twitter (more on that below). Friendfeed does a great job at combining all your social networks together into one centralized area. They're primary focus is to build features and tools to interact with the mashed feed. This feed would be perfect to add as a source to notify.me, allowing a user to receive all social network updates over instant messenger. Being able to know in near real time when you have a new posting on your wall so that you can immediately respond is a feature the social addicts want. Yahoo Pipes would be considered as a possible partner, similar to how they upsell Netvibes and Newsgator. Their focus is to provide an intuitive programming interface to be able to manipulate feeds and create a useful mashup. For example the Hot Deals Search is a nifty pipe that searches over a collection of sites for the best deal. Users might not want to use Yahoo's own notification options due to the limited options of destinations. In our beta group we've seen similar activity with users adding ebay links. Ebay has a competitive notification pipeline to notify.me however, users still add ebay search links to our service. It turns out they they would like one central place to manage their various news feeds. Gnip is a pure infrastructure play. We have similar technology but we are going after completely different markets. An additional core feature of our product that we have not yet exposed is that our pipelines is bi-directional, i.e. any data source can also be a destination and vice-verse. The primary benefit of this is the ability of allowing messages to be responded to, such as acknowledging a support ticket you received. Bi-directional communication will require integration with the notify.me API through which a source can communicate reply options. We currently are developing a deep integration with the Twitter API to provide two-way capabilities for tweets via IM in the same channel that you already receive your other notifications.

Can you explain the different parts of notify.me and how they connect together?

In general terms our system consists of three subsystems, each of which has a number of implementations. 1. Ingestion consists of rss and email ingestors, which constantly check the user's email address (username@notify.me) and the user's feeds for new data. New data is turned into notifications which are propagated to routing. 2. Routing is responsible for getting the user's notifications to the right delivery components. Routing is the point in the system that the user interacts with for management, such as changing sources and destinations, and viewing history. Notification history is a specialized delivery component, allowing all messages to be perused via the website, even after they have gone through the entire pipeline. 3. Delivery is currently comprised of history (which always gets the messages), Xmpp IM, SMS and email, with private RSS, AIM and MSN in development. On a more technical level, the topology of this system is comprised of two separate message busses: 1. Store-and-forward queues (using simpleMQ) 2. XMPP (using ejabberd) Store and forward queues are used by the ingestion side to distribute the work of ingestion and generally anywhere where data is handled before it becomes visible to the routing rules of the user. This allows for scaling flexibility as well as process isolation during a component failure. The Xmpp bus is called the Avatar bus, named thusly because every data owning entity is represented by a daemon process that is the sole authority for that entities data. We have four types of avatars, Monitor, Agent, Source and User 1. Monitor avatars are simply the responsible parties for observing instance health and spinning up and shutting down additional computing nodes per demand. 2. Agent avatars are the delivery gateways that provide presence information of our users into the bus and deliver messages to our users. 3. Source avatars are ingestors, such as an RSS. This avatar pulls new messages from a store and forward queue and notifies its subscribers of the new message. 4. User avatars persists all the configuration and messaging data for a particular user. It is responsible for receiving new notifications from ingestion avatars, deciding on the routing and pushing messages to the appropriate delivery agent as that agent declares the ability to execute that delivery.

What particular challenges did you face and how did you overcome them? What options did you consider and why did you decide to do it another way?

From the onset, our primary goal was to avoid bottlenecks and hindrances to scaling horizontally. Initially we planned on building the entire system as a stateless flow of messages through queues, with each daemon along the line being responsible for the data flowing through it, merging, multiplexing and routing it to the next point until delivery was achieved. This meant no single part's failure would ever affect the whole, other than queues getting backed up. However early on we realized that once a message was designated for a user we needed the ability to track where it was and be able to re-route it dynamically depending on user configuration and presence. This lead us to add a bit of inappropriate coupling between daemons via REST apis. Some of this plumbing still exists, as we're still migrating processing over to the combination queue/async bus architecture. As we realized that pure message passing without state was not going to satisfy our dynamic needs, the easy solution was to return to the tried state keeper, the central relational database. Knowing our scaling goals, this would have introduced a point of failure that sooner or later we would not be able to mitigate. We decided to look at our state in a different way and instead of thinking of creating processing units based on function (i.e. ingestion, parsing, transformation, routing, delivery) that queried state based on the data flowing through it, we thought of the units in terms of data ownership, i.e. sources and destinations (users). Once on that track, there was precious little shared state and we were able to change our storage pattern to have each owner of data be responsible for its own data, allowing horizontal scaling of the persistence layer, as well as much more efficient caching. The remaining need for accessing data across owner is analytics. In many systems analytics is a primary reason for the existence of a central database, since too often facts and dimensions are kept intermingled in the production schema. For our purposes, this data is not a production concern and therefore should never affect live capacity. Usage and state changes are treated as immutable events, which are queued at the point of occurence into our store-and-forward system. The nature of our store-and-forward queue allows us to automatically gather all these events from all hosts to a central archive which can then be processed into fact and dimension data by ETL processes. This allows for near real-time tracking of usage without affecting user facing systems.

Could you explain your choice of XMPP a little more? Is it used mainly as a message bus between federated XMPP servers sitting on EC2 nodes? Is the XMPP queue used as the queue for each user's messages from all sources before they are pushed to users?

We have three different xmpp clusters which take advantage of federation for cross-chatter: user, agent and the avatar bus.

Users

This is a regular xmpp IM server on which we create accounts for each of our users, providing them an IM account that they can use from any Xmpp capable client. This account also serves as the user our desktop app signs in as and that will be the authentication for our API for third party message ingestion and distribution

Agents

The daemons connecting to this cluster serve as communication bridges between our internal Avatar bus and outside clients. Currently this is primarily for communicating with chat clients, as every user is assigned an agent that they communicate without us through, regardless of whether they use their default account or some third party account such as jabber.org, googletalk, etc. We also are testing a client API that uses Xmpp RPC via these agents for dedicated Apps. In the future we will also offer full XMPP and REST APIs for third party integration that will use the agents to communicate with the Avatar bus. I mentioned earlier that Agents are avatars as well, however they are a little special in that they do not have a user on the Avatar bus but talk to other avatars through cross server federation. We currently are also building agents for the Oscar and MSN networks that will sit directly on the avatar bus since their native transport is not federated. We also plan to evaluate other networks for possible future support.

Avatars

Avatars is our internal message bus that we use to route and process all commands and messages. We primarily use direct messaging and IQ based RPC stanzas between avatars, although we do take advantage of presence for monitoring. So what is an avatar? It's a daemon (where a single physical daemon process can host many avatar daemons) that is the authority for some external entity's data. I.e. every user registered with notify.me has an avatar that monitors agents for status changes, receives messages in care of that user and is responsible for routing those messages to the appropriate delivery channel. This means that every avatar is the single authority for all data about that user and is responsible to persisting the data. If some other part of the system wants to know something about that user or modify its data, it uses Xmpp RPC to talk to the avatar, rather than some central database. Right now, avatars persist to disk and SimpleDB, while keeping a ttl-regulated cache in process. Since only the avatar can write its own data, it does not need to check the DB but can treat its memory and disk cache as authoritative and SDB is used primarily for writes. Reads are needed only in the case of a node failure to bring up the avatar on another node. At the other end of the bus we have our ingestors. Ingestors are made up of a number of daemons, generally running on polling loops against external sources, queueing new data into our store-and-forward queues, where the appropriate ingestor avatar picks up new messages and distributes them to its subscribers. In the ingestor avatar scenario, it is the authority on subscription and routing data. Here's a typical use case: A user subscribes to an RSS feed via the web interface. The web interface sends the request to the user's avatar, which persists the subscription for reference and then requests the subscription from the rss ingestor. As new rss items arrive, the rss ingestor multiplexes items to all user avatars that subscribe to that feed. The user avatars in turn determine the appropriate delivery mechanism and schedule delivery. In general that means that the user avatar is subscribed to the user's Xmpp presence via the users' agent. Until the user is in the proper state for accepting messages, the user avatar queues the rss items. Once the user is ready to receive the notification, the presence change is propagated from the agent into the internal bus and the user avatar then sends the rss items to agent, which in turn delivers it to the user. Right now, all avatars are always online (even if mostly idle), which is fine for our present user base size. Our plan is to mod the offline storage module of ejabberd so that we can blind fire stanzas and have queued messages signal a monitor to spin up the appropriate avatar for the destination XmppId. Once this system is in place we will be able to spin up on avatars on demand and shut down them down on idle.

At what traffic load do you expect your current architecture to break and what's your plan?

Since our system is distributed and asynchronous by design, we should avoid systemwide failures under load. However, while avoiding all the usual bottlenecks, the reality is that our message bus, which makes all this possible, will likely become our limiting factor, either because it cannot handle the number of avatars (nodes on the bus) or because latency on the bus becomes unacceptable. We're only starting to use the avatar system as our backbone, so it's still a bit fragile and we're still doing load testing on ejabberd to determine at what point we run into limiting factors. While we are already clustering ejabberd, the load of mnesia database replication and cross node chatter means that either number of connections or latency will eventually cause the cluster to fail or simply consume too much memory to be managable. Since our messaging is primarily point-to-point, we anticipate that we can split our user base into avatar silos, each hosted on a dedicated avatar subdomain cluster, reducing message and connection load. As long as our silos are appropriately designed to keep crosssubdomain chatter to a minimum, we should be able to have n silos to keep on top of load. Our single greatest challenge to avoid this architecture failing is eternal vigilance against introducing features that create messaging bottlenecks. A significant amount of our message traffic passing through a single processor or family of processors, would introduce dependencies we cannot scale ourselves out of with subdomain division.

Related Articles

  • notify.me tech blog - INotification
  • Flickr - Do the Essential Work Up-front and Queue the Rest

    Click to read more ...

  • Wednesday
    May142008

    New Facebook Chat Feature Scales to 70 Million Users Using Erlang

    UpdateErlang at Facebook by Eugene Letuchy. How Facebook uses Erlang to implement Chat, AIM Presence, and Chat Jabber support. 

    I've done some XMPP development so when I read Facebook was making a Jabber chat client I was really curious how they would make it work. While core XMPP is straightforward, a number of protocol extensions like discovery, forms, chat states, pubsub, multi user chat, and privacy lists really up the implementation complexity. Some real engineering challenges were involved to make this puppy scale and perform. It's not clear what extensions they've implemented, but a blog entry by Facebook's Eugene Letuchy hits some of the architectural challenges they faced and how they overcame them.

    A web based Jabber client poses a few problems because XMPP, like most IM protocols, is an asynchronous event driven system that pretty much assumes you have a full time open connection. After logging in the server sends a client roster information and presence information. Your client has to be present to receive the information. If your client wants to discover the capabilities of another client then a request is sent over the wire and some time later the response comes back. An ID is used to map the reply to the request. All responses are intermingled. IM messages can come in at any time. Subscription requests can come in at any time.

    Facebook has the client open a persistent connection to the IM server and uses long polling to send requests and continually get data from the server. Long polling is a mixture of client pull and server push. It works by having the client make a request to the server. The client connection blocks until the server has data to return. When it does data is returned, the client processes it, and then is in position to make another request of the server and get any more data that has queued up in the mean time. Obviously there are all sorts of latency, overhead, and resource issues with this approach. The previous link discusses them in more detail and for performance information take a look at Performance Testing of Data Delivery Techniques for AJAX Applications by Engin Bozdag, Ali Mesbah and Arie van Deursen.

    From a client perspective I think this approach is workable, but obviously not ideal. Your client's IMs, presence changes, subscription requests, and chat states etc are all blocked on the polling loop, which wouldn't have a predictable latency. Predictable latency can be as important as raw performance.

    The real scaling challenge is on the server side. With 70 million people how do you keep all those persistent connections open? Well, when you read another $100 million was invested in Facebook for hardware you know why. That's one hella lot of connections. And consider all the data those IM servers must store up in between polling intervals. Looking at the memory consumption for their servers would be like watching someone breath. Breath in- streams of data come in and must be stored waiting for the polling loop. Breath out- the polling loops hit and all the data is written to the client and released from the server. A ceaseless cycle. In a stream based system data comes in and is pushed immediately out the connection. Only socket queue is used and that's usually quite sufficient. Now add network bandwidth for all the XMPP and TCP protocol overhead and CPU to process it all and you are talking some serious scalability issues.

    So, how do you handle all those concurrent connections? They chose Erlang. When you first hear Erlang and Jabber you think ejabberd, an open source Erlang based XMPP server. But since the blog doesn't mention ejabberd it seems they haven't used it .

    Why Erlang? First, the famous Yaws vs Apache shootout where "Apache dies at about 4,000 parallel sessions. Yaws is still functioning at over 80,000 parallel connections." Erlang is naturally good at solving high concurrency problems. Yet following the rule that no benchmark can go unchallenged, Erik Onnen calls this the Worst Measurement Ever and has some good reasoning behind it.

    In any case, Erlang does nicely match the problem space. Erlang's approach to a concurrency problem is to throw a very light weight Erlang process at each state machine you want to be concurrent. Code-wise that's more natural than thread pools, async IO, or thread per connection systems. Until Linux 2.6 it wasn't even possible to schedule large numbers of threads on a single machine. And you are still devoting a lot of unnecessary stack space to each thread. Erlang will make excellent use of machine resources to handle all those connections. Something anyone with a VPS knows is hard to do with Apache. Apache sucks up memory with joyous VPS killing abandon.

    The blog says C++ is used to log IM messages. Erlang is famously excellent for its concurrency prowess and equally famous for being poor at IO, so I imagine C++ was needed for efficiency.

    One of the downsides of multi-language development is reusing code across languages. Facebook created Thrift to tie together the Babeling Tower of all their different implementation languages. Thrift is a software framework for scalable cross-language services development. It combines a powerful software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, and Ruby. Another approach might be to cross language barriers using REST based services.

    A problem Facebook probably doesn't have to worry about scaling is the XMPP roster (contact list). Handling that many user accounts would challenge most XMPP server vendors, but Facebook has that part already solved. They could concentrate on scaling the protocol across a bunch of shiny new servers without getting bogged down in database issues. Wouldn't that be nice :-) They can just load balance users across servers and scalability is solved horizontally, simply by adding more servers. Nice work.

    Wednesday
    Jan302008

    The AOL XMPP scalability challenge

    Large scale distributed instant messaging, presence based protocol are a real challenge. With big players adopting the standard, the XMPP (eXtensible Messaging and Presence Protocol) community is facing the need to validate protocol and implementations to even larger scale.

    Click to read more ...