What's cool about starting a new project is you finally have a chance to do it right. You of course eventually mess everything up in your own way, but for that one moment the world has a perfect order, a rightness that feels satisfying and good. Arne Claassen, the CTO of notify.me, a brand new real time notification delivery service, is in this honeymoon period now.
Arne has been gracious enough to share with us his philosophy of how to build a notification service. I think you'll find it fascinating because Arne goes into a lot of useful detail about how his system works.
His main design philosophy is to minimize the bottlenecks that form around synchronous access, that is when some resource is requested and the requestor ties up more resources, waiting for a response. If the requested resource can’t be delivered in a timely manner, more and more requests pile up until the server can’t accept any new ones. Nobody gets what they want and you have an outage. Breaking synchronous operations into asynchronous operations by separating request and response into separate message passing actions, stops the resource overload. Instead of a system going down from too many parallel requests, it can works its way through a backlog of requests as fast as it can. And in most cases the request/response cycles are so fast that they appear like a linear sequence of events.
Notify.me is taking the innovative and risky strategy of using ejabberd, an XMPP based system, as their internal messaging and routing layer. Will Erlang and Mnesia (Erlang's database) be able to keep up with traffic and keep low latencies as traffic scales? It will be interesting to find out.
If you are interested in notify.me they've kindly offered 500 beta accounts for HS readers: http://notify.me/user/account/create/highscale
Who are you?
My name is Arne Claassen, the CTO of notify.me. I've been working on highly scalable web
based applications and services for the past decade. These sites have employed various
combinations of traditional scaling techniques such as server farms, caching, content pregeneration
and highly available databases using replication and clustering. All of these
techniques are ways to mitigate scarce resources (generally the database) being in
contention by many users. Knowing the benefits and pitfalls of these techniques, it has
become my focus to architect systems that circumvent scarce resource scenarios.
What is notify.me, why did you make it, and why is it a good thing?
notify.me is the brainchild of Jason Wieland, our CEO. It's a near real time notification
service that alerts users to new content published on the web.
It was created to address the common user pain of staying up to date of time critical events
that occur on the web. For instance, a user searching for an apartment on Craigslist would
want to be alerted once a new one matching their search criteria is posted. notify.me does
the grunt work of repeatedly checking Craigslist for new listings and alerts the user once a
new one is posted. Notifications can be delivered to instant messenger, desktop application,
mobile device, email, and web application.
Our goal is to create and publish open APIs allowing people to build new and interesting
applications for generating and delivering information.
How does your service compare to other services people might be familiar with? Like Twitter, Friend Feed, Gnip, or Yahoo Pipes?
There are quite a few companies that are in our competitive landscape, some of which are
direct competitors, like yotify.com or alerts.com. The main difference is our approach.
Yotify and alerts are focused on being notification portal sites for users to visit. notify.me
is a utility, with a focus of offering all the functionality available on the website via XMPP
and REST APIs, allowing users to interact with notifications from the application of their
choosing. We also allow for escalation of messages to destinations. For example if a user is
not logged into their IM or have a status of away, notifications can be escalated and routed
to their mobile device.
In the messaging arena, we are nearly the opposite of twitter. Twitter is inward facing
publishing model based on its own user generated content. Someone makes a tweet and it
gets published to their followers. notify.me is creating an externally facing message delivery
system. Users add any website that supports web feed standards, or redirect existing
notification emails to us. If anything, we are a messaging pipeline that is complementary to
twitter (more on that below).
Friendfeed does a great job at combining all your social networks together into one
centralized area. They're primary focus is to build features and tools to interact with the
mashed feed. This feed would be perfect to add as a source to notify.me, allowing a user to
receive all social network updates over instant messenger. Being able to know in near
real time when you have a new posting on your wall so that you can immediately respond is
a feature the social addicts want.
Yahoo Pipes would be considered as a possible partner, similar to how they upsell Netvibes
and Newsgator. Their focus is to provide an intuitive programming interface to be able to
manipulate feeds and create a useful mashup.
For example the
Hot Deals Search is a nifty pipe that searches over a collection of sites for the best deal. Users might not want to use Yahoo's own notification options due to the limited options of destinations.
In our beta group we've seen similar activity with users adding ebay links. Ebay has a
competitive notification pipeline to notify.me however, users still add ebay search links to
our service. It turns out they they would like one central place to manage their various
news feeds.
Gnip is a pure infrastructure play. We have similar technology but we are going after
completely different markets.
An additional core feature of our product that we have not yet exposed is that our pipelines
is bi-directional, i.e. any data source can also be a destination and vice-verse. The primary
benefit of this is the ability of allowing messages to be responded to, such as acknowledging
a support ticket you received. Bi-directional communication will require integration with the
notify.me API through which a source can communicate reply options.
We currently are developing a deep integration with the Twitter API to provide two-way
capabilities for tweets via IM in the same channel that you already receive your other
notifications.
Can you explain the different parts of notify.me and how they connect together?
In general terms our system consists of three subsystems, each of which has a number of
implementations.
1.
Ingestion consists of rss and email ingestors, which constantly check the user's
email address (username@notify.me) and the user's feeds for new data. New data
is turned into notifications which are propagated to routing.
2.
Routing is responsible for getting the user's notifications to the right delivery
components. Routing is the point in the system that the user interacts with for
management, such as changing sources and destinations, and viewing
history. Notification history is a specialized delivery component, allowing all
messages to be perused via the website, even after they have gone through the
entire pipeline.
3.
Delivery is currently comprised of history (which always gets the messages), Xmpp
IM, SMS and email, with private RSS, AIM and MSN in development.
On a more technical level, the topology of this system is comprised of two separate
message busses:
1.
Store-and-forward queues (using
simpleMQ)
2.
XMPP (using
ejabberd)
Store and forward queues are used by the ingestion side to distribute the work of
ingestion and generally anywhere where data is handled before it becomes visible to the
routing rules of the user. This allows for scaling flexibility as well as process isolation during
a component failure.
The
Xmpp bus is called the Avatar bus, named thusly because every data owning entity is
represented by a daemon process that is the sole authority for that entities data. We have
four types of avatars, Monitor, Agent, Source and User
1.
Monitor avatars are simply the responsible parties for observing instance health
and spinning up and shutting down additional computing nodes per demand.
2.
Agent avatars are the delivery gateways that provide presence information of our
users into the bus and deliver messages to our users.
3.
Source avatars are ingestors, such as an RSS. This avatar pulls new messages from a store and forward queue and notifies its subscribers of the new message.
4.
User avatars persists all the configuration and messaging data for a particular user.
It is responsible for receiving new notifications from ingestion avatars, deciding on
the routing and pushing messages to the appropriate delivery agent as that agent
declares the ability to execute that delivery.
What particular challenges did you face and how did you overcome them? What options did you consider and why did you decide to do it another way?
From the onset, our primary goal was to avoid bottlenecks and hindrances to scaling
horizontally. Initially we planned on building the entire system as a stateless flow of
messages through queues, with each daemon along the line being responsible for the data
flowing through it, merging, multiplexing and routing it to the next point until delivery was
achieved. This meant no single part's failure would ever affect the whole, other than queues
getting backed up.
However early on we realized that once a message was designated for a user we needed the
ability to track where it was and be able to re-route it dynamically depending on user
configuration and presence. This lead us to add a bit of inappropriate coupling between
daemons via REST apis. Some of this plumbing still exists, as we're still migrating
processing over to the combination queue/async bus architecture.
As we realized that pure message passing without state was not going to satisfy our
dynamic needs, the easy solution was to return to the tried state keeper, the central
relational database. Knowing our scaling goals, this would have introduced a point of failure
that sooner or later we would not be able to mitigate. We decided to look at our state in a
different way and instead of thinking of creating processing units based on function (i.e.
ingestion, parsing, transformation, routing, delivery) that queried state based on the data
flowing through it, we thought of the units in terms of data ownership, i.e. sources and
destinations (users). Once on that track, there was precious little shared state and we were
able to change our storage pattern to have each owner of data be responsible for its own
data, allowing horizontal scaling of the persistence layer, as well as much more efficient
caching.
The remaining need for accessing data across owner is analytics. In many systems analytics
is a primary reason for the existence of a central database, since too often facts and
dimensions are kept intermingled in the production schema. For our purposes, this data is
not a production concern and therefore should never affect live capacity. Usage and state
changes are treated as immutable events, which are queued at the point of occurence into
our store-and-forward system. The nature of our store-and-forward queue allows us to
automatically gather all these events from all hosts to a central archive which can then be
processed into fact and dimension data by ETL processes. This allows for near real-time
tracking of usage without affecting user facing systems.
Could you explain your choice of XMPP a little more? Is it used mainly as a message bus between federated XMPP servers sitting on EC2 nodes? Is the XMPP queue used as the queue for each user's messages from all sources before they are pushed to users?
We have three different xmpp clusters which take advantage of federation for cross-chatter:
user, agent and the avatar bus.
Users
This is a regular xmpp IM server on which we create accounts for each of our users,
providing them an IM account that they can use from any Xmpp capable client. This account
also serves as the user our desktop app signs in as and that will be the authentication for
our API for third party message ingestion and distribution
Agents
The daemons connecting to this cluster serve as communication bridges between our
internal Avatar bus and outside clients. Currently this is primarily for communicating with
chat clients, as every user is assigned an agent that they communicate without us through,
regardless of whether they use their default account or some third party account such as
jabber.org, googletalk, etc. We also are testing a client API that uses Xmpp RPC via these
agents for dedicated Apps. In the future we will also offer full XMPP and REST APIs for third
party integration that will use the agents to communicate with the Avatar bus.
I mentioned earlier that Agents are avatars as well, however they are a little special in that
they do not have a user on the Avatar bus but talk to other avatars through cross server
federation. We currently are also building agents for the Oscar and MSN networks that will
sit directly on the avatar bus since their native transport is not federated. We also plan to
evaluate other networks for possible future support.
Avatars
Avatars is our internal message bus that we use to route and process all commands and
messages. We primarily use direct messaging and IQ based RPC stanzas between avatars,
although we do take advantage of presence for monitoring.
So what is an avatar? It's a daemon (where a single physical daemon process can host
many avatar daemons) that is the authority for some external entity's data. I.e. every user
registered with notify.me has an avatar that monitors agents for status changes, receives
messages in care of that user and is responsible for routing those messages to the
appropriate delivery channel. This means that every avatar is the single authority for all
data about that user and is responsible to persisting the data. If some other part of the
system wants to know something about that user or modify its data, it uses Xmpp RPC to
talk to the avatar, rather than some central database. Right now, avatars persist to disk and
SimpleDB, while keeping a ttl-regulated cache in process. Since only the avatar can write its
own data, it does not need to check the DB but can treat its memory and disk cache as
authoritative and SDB is used primarily for writes. Reads are needed only in the case of a
node failure to bring up the avatar on another node.
At the other end of the bus we have our ingestors. Ingestors are made up of a number of
daemons, generally running on polling loops against external sources, queueing new data
into our store-and-forward queues, where the appropriate ingestor avatar picks up new
messages and distributes them to its subscribers. In the ingestor avatar scenario, it is the
authority on subscription and routing data.
Here's a typical use case: A user subscribes to an RSS feed via the web interface. The web
interface sends the request to the user's avatar, which persists the subscription for
reference and then requests the subscription from the rss ingestor. As new rss items arrive,
the rss ingestor multiplexes items to all user avatars that subscribe to that feed. The user
avatars in turn determine the appropriate delivery mechanism and schedule delivery. In
general that means that the user avatar is subscribed to the user's Xmpp presence via the
users' agent. Until the user is in the proper state for accepting messages, the user avatar
queues the rss items. Once the user is ready to receive the notification, the presence
change is propagated from the agent into the internal bus and the user avatar then sends
the rss items to agent, which in turn delivers it to the user.
Right now, all avatars are always online (even if mostly idle), which is fine for our present
user base size. Our plan is to mod the offline storage module of ejabberd so that we can
blind fire stanzas and have queued messages signal a monitor to spin up the appropriate
avatar for the destination XmppId. Once this system is in place we will be able to spin up on
avatars on demand and shut down them down on idle.
At what traffic load do you expect your current architecture to break and what's your plan?
Since our system is distributed and asynchronous by design, we should avoid systemwide
failures under load. However, while avoiding all the usual bottlenecks, the reality is that our
message bus, which makes all this possible, will likely become our limiting factor, either
because it cannot handle the number of avatars (nodes on the bus) or because latency on
the bus becomes unacceptable. We're only starting to use the avatar system as our
backbone, so it's still a bit fragile and we're still doing load testing on ejabberd to determine
at what point we run into limiting factors.
While we are already clustering ejabberd, the load of mnesia database replication and cross
node chatter means that either number of connections or latency will eventually cause the
cluster to fail or simply consume too much memory to be managable.
Since our messaging is primarily point-to-point, we anticipate that we can split our user
base into avatar silos, each hosted on a dedicated avatar subdomain cluster, reducing
message and connection load. As long as our silos are appropriately designed to keep crosssubdomain
chatter to a minimum, we should be able to have n silos to keep on top of load.
Our single greatest challenge to avoid this architecture failing is eternal vigilance against
introducing features that create messaging bottlenecks. A significant amount of our
message traffic passing through a single processor or family of processors, would introduce
dependencies we cannot scale ourselves out of with subdomain division.
Related Articles
notify.me tech blog - INotification
Flickr - Do the Essential Work Up-front and Queue the Rest
Click to read more ...