System Design Interview Distributed Message Queue

      Today we design a distributed message queue. First, let’s make sure we are on the same page regarding the problem statement. What is a distributed message queue? Let's say there are two web-services called the producer and consumer, and they need to communicate with each other. One option is to set up asynchronous communication when a producer makes a call to a consumer and waits for a response. This approach has its own pros and cons. Synchronous communication is easier and faster to implement. At the same time, synchronous communication makes it harder to deal with consumer service failures. We need to think when and how to properly retry failed requests, how not to overwhelm consumer service with too many requests, and how to deal with a slow consumer service host. Another option is to introduce a new component that helps to set up asynchronous communication. The producer sends data to that component and exactly one consumer gets this data a short time after. Such a component is called a queue. And it is distributed because data is stored across several machines. Please do not confuse the queue with a topic. In the case of a topic, a message that is published goes to each and every subscriber. In the case of a queue, the message is received by one and only one consumer. And as it often happens with interview questions, the statement is ambiguous. What are the functional requirements? What non-functional requirements have a priority over others? What is the scale we need to deal with? All these questions need to be clarified with the interviewer. Let's do our best and define the requirements ourselves. Starting with functional requirements. At this stage of the interview, it may be hard to come up with a definitive set of requirements. And it’s usually not needed. Time limit allows us to only focus on several core APIs, like send a message and receive the message. As for non-functional requirements, we want our system to be scalable and handle load increase, highly available and tolerate hardware and network failures, highly performant, so that both send and receive operations are fast, and durable so that data is persisted once submitted to the queue. 

And of course, there may be many other requirements either explicitly stated by the interviewer or intentionally omitted. Among functional requirements, we can be asked to support create and delete queue APIs, or delete message API. There may be specific requirements for the producer (for example system needs to avoid duplicate submissions), or security requirements, or an ask to implement a specific ordering guarantee. As for non-functional requirements, the interviewer may define specific service level agreement numbers (so-called SLA, for example, minimum throughput our system needs to support), or requirements around cost-effectiveness (for example system needs to minimize hardware cost or operational support cost). But do not worry if you can’t think of all the possible requirements. The interviewer is your friend and will help to scope the problem. You just need to be proactive and outline the main use cases. Now let’s start drafting the architecture. Let’s start with components that are common for many distributed systems. First, we need a virtual IP. VIP refers to the symbolic hostname (for example myWebService.domain.com) that resolves to a load balancer system. So next, we have a load balancer. A load balancer is a device that routes client requests across a number of servers. 

High level Architecture

Next, we have a FrontEnd web service. A component is responsible for initial request processing, like validation, authentication, etc. Queue metadata information like its name, creation date and time, owner, and any other configuration settings will be stored in a database. And best practices dictate that databases should be hidden behind some facade, a dedicated web service responsible for handling calls to a database. And we need a place to store queue messages. So, let's introduce a backend web service, that will be responsible for message persistence and processing. Now, let’s take a look at each component one by one. Load balancing is a big topic. And unless the interviewer encourages you to dive deep into load balancing topics, we better not deviate too much from the main question of the interview. Always try to stay focused on what really matters. Internals of how load balancers work may not matter, but in order to make sure non-functional requirements to the system we build are fully met, we need to explain how load balancers will help us achieve high throughput and availability. When a domain name is hit, the request is transferred to one of the VIPs registered in DNS for our domain name. VIP is resolved to a load balancer device, which has a knowledge of FrontEnd hosts. By looking at this architecture, several questions have probably popped into your head? 

VIP and load Balancer

First, the load balancer seems like a single point of failure. What happens if the load balancer device goes down? Second, load balancers have limits with regards to the number of requests they can process and a number of bytes they can transfer. What happens when our distributed message queue service becomes so popular that load balancer limits are reached? To address high availability concerns, load balancers utilize the concept of primary and secondary nodes. The primary node accepts connections and serves requests while the secondary node monitors the primary. If for any reason, the primary node is unable to accept connections, the secondary node takes over. As for scalability concerns, a concept of multiple VIPs (sometimes referred to as VIP partitioning) can be utilized. In DNS we assign multiple A records to the same DNS name for the service. As a result, requests are partitioned across several load balancers. And by spreading load balancers across several data centers, we improve both availability and performance. 

Let's move on to the next component, which is a FrontEnd web service. The frontEnd is a lightweight web service, consisting of stateless machines located across several data centers. FrontEnd service is responsible for: request validation, authentication, and authorization, SSL termination, server-side data encryption, caching, rate limiting (also known as throttling), request dispatching, request deduplication, usage data collection. Let’s discuss some basics of these features. Request validation helps to ensure that all the required parameters are present in the request and values of these parameters honor constraints. For example, in our case, we want to make sure the queue name comes with every sends message request. And message size does not exceed a specified threshold. During the authentication check, we verify that the message sender is a registered customer of our distributed queue service. And during authorization check, we verify that sender is allowed to publish messages to the queue it claims. TLS is a protocol that aims to provide privacy and data integrity. TLS termination refers to the process of decrypting a request and passing on an unencrypted request to the backend service. And we want to do TLS termination on FrontEnd hosts because TLS on the load balancer is expensive. Termination is usually handled by not a FrontEnd service itself, but a separate HTTP proxy that runs as a process on the same host. Next is the server-side encryption. Because we want to store messages securely on backend hosts, messages are encrypted as soon as FrontEnd receives them. Messages are stored in encrypted form and FrontEnd decrypts them only when they are sent back to a consumer. Cache stores copies of source data. In FrontEnd cache we will store metadata information about the most actively used queues. As well as user identity information to save on calls to authentication and authorization services. Rate limiting or throttling is the process of limiting the number of requests you can submit to a given operation in a given amount of time. Throttling protects the web service from being overwhelmed with requests. The leaky bucket algorithm is one of the most famous. Rate limiting is a quite popular system design question on its own. And we will have a separate video for it. FrontEnd service makes remote calls to at least two other web services: Metadata service and backend service. FrontEnd service creates HTTP clients for both services and makes sure that calls to these services are properly isolated. It means that when one service let’s say Metadata service experiences a slowdown, requests to backend service are not impacted. There are common patterns like bulkhead and circuit breaker that helps to implement resource isolation and make service more resilient in cases when remote calls start to fail. 

Metadata Service

Next, we have to request deduplication. It may occur when a response from a successful send message request failed to reach a client. Lesser an issue for ‘at least once’ delivery semantics, a bigger issue for ‘exactly once’ and ‘at most once’ delivery semantics, when we need to guarantee that message was never processed more than one time. Caching is usually used to store previously seen request ids to avoid deduplication. Last but not least is usage data collection. When we gather real-time information that can be used for audit. And even though FrontEnd service has many responsibilities, the rule of thumb is to keep it as simple as possible. Moving on to the next component, which is the Metadata service. Metadata service stores information about queues. Every time queue is created, we store information about it in the database. Conceptually, Metadata service is a caching layer between the FrontEnd and persistent storage. It handles many reads and a relatively small number of writes. As we read every time message arrives and writes only when the new queue is created. Even though strongly consistent storage is preferred to avoid potential concurrent updates, it is not strictly required. Let's take a look at different approaches to organizing cache clusters. The first option is when the cache is relatively small and we can store the whole data set on every cluster node. FrontEnd host calls a randomly chosen Metadata service host because all the cache cluster nodes contain the same information. The second approach is to partition data into small chunks, called shards. Because the data set is too big and cannot be placed into a memory of a single host. So, we store each such chunk of data on a separate node in a cluster. FrontEnd then knows which shard stores the data and calls the shard directly. And the third option is similar to the second one. We also partition data into shards, but FrontEnd does not know what shard data is stored. So, FrontEnd calls a random Metadata service host and the host itself knows where to forward the request to. In option one, we can introduce a load balancer between FrontEnd and Metadata service. As all Metadata service hosts are equal and FrontEnd does not care which Metadata host handles the request. In option two and three, Metadata hosts represent a consistent hashing ring. Do not worry if this term is completely new to you. The distributed cache topic is big and we will have a separate video on how to design a distributed cache. The components we built so far were relatively straightforward. Not easy of course, but if you have an understanding of several core design principles, you will at least progress thus far in the interview. By the way, the set of components we just discussed: VIP + Load Balancer + FrontEnd web service + Metadata web service that represents a caching layer on top of a database is so popular in the world of distributed systems, that you may consider it a standard and apply to many system designs. Now, let’s take a look at the backend component. This is where the real challenge starts. To understand how backend service architecture may look like, let’s try to answer some important questions first. 

By the way, if you stuck during the interview, not knowing how to progress further, start asking yourself questions. Asking the right questions helps to split the problem into more manageable pieces. Plus, it helps to establish a better communication channel with the interviewer. The interviewer will let you know whether you are on the right path or not. So, what those questions maybe? We need to figure out where and how messages are stored, right? Is the database an option? Yes, it is. But not the best one and let me explain why. We are building a distributed message queue, a system that should be able to handle very high throughput. And this means that all this throughput will be offloaded to the database. In other words, a problem of building a distributed message queue becomes a problem of building a database that can handle high throughput. And we know that highly-available and scalable databases exist out there. And if you are a junior software engineer, it is totally reasonable to say that we will utilize a 3-rd party database solution and stop right there. But for a senior position, we need to either explain how to build a distributed database (and we promise you a separate video on this) or we need to keep seeking other options. And if not a database, where else can we store data? Who thought about memory? Please let me know in the comments. And you are correct by the way. As well as those who said file system. As we may need to store messages for days or even weeks, we need more durable storage, like a local disk. At the same time, newly arrived messages may live in the memory for a short period of time or until memory on the backend host is fully utilized. Next question we should ask ourselves: how do we replicate data? And I believe you may already figure this out. We will send copies of messages to some other hosts so that data can survive host hardware or software failures. 

And finally, let's think about how FrontEnd hosts select backend hosts for both storing messages and retrieving them. We can leverage Metadata service, right? So, let's summarize what we have just discussed. The message comes to the FrontEnd, FrontEnd consults Metadata service what backend host to send data to. The message is sent to a selected backend host and data is replicated. And when receive message call comes, FrontEnd talks to Metadata service to identify a backend host that stores the data. Now, let's dive deep into the backend service architecture. We will consider two options of how backend hosts relate to each other. In the first option, each backend instance is considered a leader for a particular set of queues. And by the leader, we mean that all requests for a particular queue (like send a message and receive message requests) go to this leader instance. Let's look at the example. Send message request comes to a FrontEnd instance. The message comes to a queue with ID equal to q1. FrontEnd service calls Metadata service to identify a leader backend instance for this queue. In this particular example, instance B is a leader for q1. The message is sent to the leader and the leader is fully responsible for data replication. When receive message request comes to a FrontEnd instance, it also makes a request to the Metadata service to identify the leader for the queue. The message is then retrieved from the leader instance and the leader is responsible for cleaning up the original message and all the replicas. We need a component that will help us with leader election and management. Let’s call it In-cluster manager. And as already mentioned, the in-cluster manager is responsible for maintaining a mapping between queues, leaders, and followers. The in-cluster manager is a very sophisticated component. It has to be reliable, scalable, and performant. Creating such a component from scratch is not an easy task. Let’s see if we can avoid leader election in the first place. Can you think of an option when all instances are equal? Please pause this video and think for a while. 

Backend Service

In the second option, we have a set of small clusters, each cluster consists of 3-4 machines distributed across several data centers. When sending message request comes, similar to the previous design option, we also need to call Metadata service to identify which cluster is responsible for storing messages for the q1 queue. After that, we just make a call to a randomly selected instance in the cluster. An instance is responsible for data replication across all nodes in the cluster. When receiving message request comes and we identified which cluster stores messages for the q1 queue, we once again call a randomly selected host and retrieve the message. The selected host is responsible for the message cleanup. As you may see, we no longer need a component for leader election, but we still need something that will help us to manage queue to cluster assignments. Let’s call this component an Out-cluster manager (not the best name, I know, but naming is hard). And this component will be responsible for maintaining a mapping between queues and clusters. Is the out-cluster manager a simpler component than the in-cluster manager? It turns out that not really. While the in-cluster manager manages queue assignments within the cluster, the out-cluster manager manages queue assignments across clusters. The in-cluster manager needs to know about each and every instance in the cluster. The out-cluster manager may not know about each particular instance, but it needs to know about each cluster. The in-cluster manager listens to heartbeats from instances. The out-cluster manager monitors the health of each independent cluster. And while the in-cluster manager deals with host failures and needs to adjust to the fact that instances may die and new instances may be added to the cluster, the out-cluster manager is responsible for tracking each cluster utilization and deal with overheated clusters. Meaning that new queues may no longer be assigned to clusters that reached their capacity limits. And what about really big queues? When a single queue gets so many messages that a single leader (in design option A) or a single cluster (in design option B) cannot handle such a big load? In-cluster manager splits queue into parts (partitions) and each partition gets a leader server. The out-cluster manager may split the queue across several clusters. So that messages for the same queue are equally distributed between several clusters. So far we have covered all the main components of the high-level architecture. Let’s see what else is important to mention while discussing distributed message queues. Queue creation and deletion. The queue can be auto-created, for example when the first message for the queue hits FrontEnd service, or we can define API for queue creation. API is a better option, as we will have more control over queue configuration parameters. Delete queue operation is a bit controversial, as it may cause a lot of harm and must be executed with caution. For this reason, you may find examples of well-known distributed queues that do not expose delete queue API via a public REST endpoint. Instead, this operation may be exposed through a command-line utility, so that only experienced admin users may call it. As for a message deletion, there are several options at our disposal. One option is not to delete a message right after it was consumed. In this case, consumers have to be responsible for what they already consumed. And it is not as easy as it sounds. As we need to maintain some kind of an order for messages in the queue and keep track of the offset, which is the position of a message within a queue. Messages can then be deleted several days later, by a job. This idea is used by Apache Kafka. 

The second option is to do something similar to what Amazon SQS is doing. Messages are also not deleted immediately, but marked as invisible, so that other consumers may not get already retrieved message. A consumer that retrieved the message, needs to then call delete message API to delete the message from a backend host. And if the message was not explicitly deleted by a consumer, the message becomes visible and may be delivered and processed twice. We know that messages need to be replicated to achieve high durability. Otherwise, if we only have one copy of the data, it may be lost due to unexpected hardware failure. Messages can be replicated synchronously or asynchronously. Synchronously means that when the backend host receives a new message, it waits until data is replicated to other hosts. And only if replication is fully completed, the successful response is returned to a producer. Asynchronous replication means that response is returned back to a producer as soon as the message is stored on a single backend host. The message is later replicated to other hosts. Both options have pros and cons. Synchronous replication provides higher durability, but with a cost of higher latency for send message operation. Asynchronous replication is more performant, but does not guarantee that message will survive backend host failure. There are three main message delivery guarantees. 

At most once, when messages may be lost but are never redelivered. At least once, when messages are never lost but may be redelivered. And exactly once, when each message is delivered once and only once. And you probably have a question already, why do we need three? Will anyone ever want anything other than exactly once delivery? Great question and the simple answer is that it is hard to achieve exactly-once delivery in practice. In a distributed message queue system there are many potential points of failure. The producer may fail to deliver or deliver multiple times, data replication may fail, consumers may fail to retrieve or process the message. All this adds complexity and leads to the fact that most distributed queue solutions today support at-least-once delivery, as it provides a good balance between durability, availability, and performance. With a pull model, the consumer constantly sends retrieve message requests and when a new message is available in the queue, it is sent back to a consumer. With a push model, the consumer is not constantly bombarding FrontEnd service with receive calls. Instead, the consumer is notified as soon as a new message arrives in the queue. And as always, there are pros and cons. Here I will not enumerate all of them, will simply state that from a distributed message queue perspective pull is easier to implement than a push. But from a consumer perspective, we need to do more work if we pull. Many of us think of the FIFO acronym when we hear about queues. FIFO stands for first-in, first-out, meaning that the oldest message in a queue is always processed first. But in distributed systems, it is hard to maintain a strict order. Message A may be produced prior to message B, but it is hard to guarantee that message A will be stored and consumed prior to message B. For these reasons many distributed queue solutions out there either do not guarantee a strict order. Or have limitations around throughput, as the queue cannot be fast while it’s doing many additional validations and coordination to guarantee a strict order. With regards to security, we need to make sure that messages are securely transferred to and from a queue. Encryption using SSL over HTTPS helps to protect messages in transit. And we also may encrypt messages while storing them on backend hosts. We discussed this when talked about FrontEnd service responsibilities. Monitoring is critical for every system. With regards to the distributed message queue, we need to monitor components (or microservices) that we built: fronted, metadata, and backend services. As well as provide visibility into a customer’s experience. 

Hight level Artistructure

In other words, we need to monitor the health of our distributed queue system and give customers the ability to track the state of their queues. Each service we built has to emit metrics and write log data. As operators of these services, we need to create dashboards for each microservice and set up alerts. And customers of our queue have to be able to create dashboards and set up alerts as well. For this purpose, integration with a monitoring system is required. Do not forget to mention the monitoring aspect of the interviewer. Many times this topic is omitted by candidates, but it is very important. Let's take one final look at the architecture we built. And evaluate whether non-functional requirements are fulfilled. Is our system scalable? Yes. As every component is scalable. When load increases, we just add more load balancers, more FrontEnd hosts, more Metadata service cache shards, more backend clusters, and hosts. Is our system highly available? Yes. As there is no single point of failure, each component is deployed across several data centers. Individual hosts may die, network partitions may happen, but with this redundancy in place, our system will continue to operate. Is our system highly performant? It’s actually very well depends on the implementation, hardware, and network setup. Each individual microservice needs to be fast. And we need to run our software in high-performance data centers. Is our system durable? Sure. We replicate data while storing and ensure messages are not lost during the transfer from a producer and to a consumer. And that is it for today’s system design interview question.

About Home Study

Technology and Life