Transcript for:
Distributed Priority Queue Design

hey guys uh welcome back to the channel hope you're all having a nice day uh I just want to say thank you all for 40,000 subscribers uh not only doesn't mean a lot to me uh but you know I really don't care too much about the subscriber number The View number uh the ad Revenue number or anything like that really what matters to me the most is just seeing how these videos impact you guys uh pretty much my metric of success or anything like that is just how many people reach out to me letting me know that they got their dream offer in part due to this Channel and that's really what does it for me at the end of the day so um thank you a lot for all the support um I'm probably not going to go and make a slideshow of all those uh those offers just yet I'll do it in a little bit batch them up uh but yeah I really appreciate it have a great day let's go ahead and do some systems design okay so today should hopefully be a fairly fun video uh in which we'll be talking about a distributed priority queue now this one is absolutely killing me uh because I heavily based it off some Facebook paper that they released a couple years ago called fqs or Fox I don't know how they want to pronounce it but uh the gist is you know I was racking my brain for like many many hours even days on how to build this thing came up with what I thought was a pretty good solution probably over complicated it read their paper and I was just like h i could have just been doing this the whole time that was so much easier and then I still tried to improve upon it couldn't do it realized that the Improvement that I thought I made on their design was actually in their paper and then I felt like an idiot so I've been humbled by about 10 separate engineers at Facebook but uh maybe I'll get them another day so let's go and talk about what they more or less did so a company like Facebook or Google or any massive one like that does need something like a distributed priority queue the reason being that they have tons of jobs that they want to run in an automated fashion they want to run them asynchronously and ideally certain jobs are you know more important to run than others so they want to be assigning them with some sort of arbitrary priority so let's say for example as you can see I have in the drawing something like a video encoder right maybe we've got uh one video over here with Priority One we've got the presidential debate over here with priority 2 and we've got a last video over here with priority 3 basically our distributed priority queue is going to allow us to build out a scalable and fault tolerance system with fairly good read and write through putut that's going to allow all of these separate video encoders to basically pick off all of these items and make sure that they all get processed at least once cool so I guess the gist is you know the second that you go ahead and hear uh from a node saying that uh your video that you want to process is in the system it should eventually get processed cool let's go ahead and talk about some problem requirements so basically to formalize this out we want to build an ordered list of items but of course it's not enough to build an ordered list of items right we've done that with Kafka we've done that with jmss Brokers like active mq or rabbit mq we want to be able to assign them each a priority cool so we have this concept of priority on our ordered list and we want to be able to support some operations in queing modifying an existing item we want to be able to DQ and probably we want to be sure that if uh an item is actually in one of these lists that it is going to be delivered and processed at least once typically the way that you would make sure that you know results only happen exactly once is you probably have some sort of item potency key or some sort of item potent function that's going to be run as a result of handling your message uh but I'm not really going to talk about this too much here as we've talked about it plenty in the past so last aspect of this is that we want our system to be both scalable and Fa top an so I guess that's not really a functional requirement but uh it is something that is going to be required even if we have billions of messages cool so the first thing I want to do is quickly give a moment to ordered consumption because you may be thinking oh you know like we want to build this massive queue out right and so when we're talking about just like a normal Q or like a fifo q uh you know you have active mq or jmss Brokers where I guess the consumption of the messages is unordered and then you also have Kafka where basically as long as you have one consumer uh per partition all of the messages in that partition are going to be consumed in an ordered fashion so the gist is I guess the same thing really applies here right can we guarantee that all of our messages are going to be processed in order yes but only if our queue has one consumer at a time if you have multiple consumers inherently in order to take advantage of the fact that you have multiple consumers you want multiple messages to be delivered to those consumers at once and of course if that happens just due to some sort of race condition one of the later messages can be fully processed before an earlier one so if this is the earlier message and this is the later message you can see that the earlier one's delivered first the later one is delivered second but ultimately maybe the later one gets processed first and gets put in some database sync over here while the early one gets processed second now this is going to be a challenge that we inherently have to deal with but my point is here we're not going for perfect ordering within our priority queue because at the end of the day we probably want to have more than one consumer if we only wanted one consumer this would be a very different problem and we could just go ahead and build like a heap or something in but uh yeah okay so keep in mind that we're not going for ordered consumption we're going for pretty much like an approximate priority que and here's the best way that we can try and do that so there are a few different ways that we can Implement a priority queue if I went to you after you took your data structures and algorithms class you would probably say okay well let's do it with a heap right and a heap is pretty simple because it allows you um constant time accesses of the top element of the Heap so if you're looking for the number with the lowest priority for example and it's a Min Heap you can easily find that and then of course uh when you pop it you have to do some logarithmic actions in order to rebalance the Heap so we would Implement that as an array and the question is is that something that is easy to implement on dis well probably not because there are a ton of random shuffles that you have to do of the array element right like keep would basically be like this is the first level right here these two guys are the second level you know this over here is going to be the third level and so on and so if you're switching elements between this this spot or this spot all the time now you're all of a sudden you're jumping around that list it's probably not going to be too great for uh a hard drive so how could we do this on dis only well actually we do have a fairly good way of doing it which is using just like a normal B tree or an LSM tree based index the reason being that we can actually maintain a sorted list and because it's a sorted list we can really easily access the first element that being said something like an LSM tree or a B tree based index is still going to be fairly slow because at the end of the day it is based on disk and in addition to that uh it's basically just going to be the fact that you know we'll need an index eventually for some ID of every single node uh meaning that we can't perfectly uh denormalize the data we're going to have to normalize the data and so we're going to probably be jumping around the disc a lot as a result of that uh and so we're not going to you know we're going to have a lot more expensive rights as a result uh we have all of these uh log n insertions into our index and also if we delete an element from the top of our sorted list we may potentially have to Reb balance our index if it's something like a b tree so we can have a lot of expensive operations going on in the background there but also just in general memory is a lot faster than dis so what could we perhaps do instead well we may be able to implement some sort of hybrid approach so in this case we could actually do the Heap itself in memory or rather the sorted list itself but some sort of data structure where we basically have a sorted list of all the primary keys of the nodes ordered by their priority and in turn what we can do is have pointers to the data on on disk what this allows us to do basically is just insert a bunch of data on disk when we insert it which is going to be an O of one right to the end of some sort of log almost like Kafka then we can basically you know simultaneously have derived data in memory which is basically going to be that uh sorted index of all of those node IDs but I'll explain that in a little bit so to just make sure that this is some uh feasible for the type of scale that we're looking for well if every single node needs some sort of priority and it also needs a pointer to the actual data location on disk imagine that's 8 Bits for the priority 8 Bits for the pointer that's 16 bits overall and so as a result of that uh if we have a billion nodes that's 16 gabt on disk so not too bad actually we can totally store that or sorry 16 gigabytes in memory and again that is fairly reasonable for a modern day server especially if we start to partition this thing out cool and again we can do this regardless of how big the nodes are because our limiting factor is probably going to be memory and all we really need to store in memory is just this one thing right here so what does this actually look like to really try and solidify that in everyone's understanding well it would be some sort of hybrid priority cue right where in memory over here we have a heap or a sorted list whatever you prefer to do and then over here on the right on disk we basically have something that almost resembles just like a right Ahad log where as nodes come in they get assigned some sort of ID based on their location so that should be super easy to do we just order everything everything popping into the dis then in addition to the ID we have the actual data in reality this is probably going to be pretty big because it could be something like a video frame or a small video chunk or something like that or anything else that we might want to process asynchronously in the background and then finally of course we have our priority so regardless of how these things are ordered on disk when we actually go to memory we order them properly and so again the dis rights are going to be o of1 of course in memory uh it's probably going to be o of log n where there are N Things in the Heap because we have to insert it in the proper place of the Heap cool now let's go ahead and talk about modifying some inced elements so keep in mind that one of the things that we wanted to be able to do was basically take an existing element that's on disk somewhere like down here and be able to actually change the data or perhaps even change the priority well if we do that fortunately like I mentioned we're assigning an ID to every single uh element of our Heap as it's going in and so we can actually use that ID the fact that this is already going to be sorted by ID just like an index and then really really quickly uh figure out where that lives on disk so that is going to make life easier for us the one expensive thing to do would be if we actually changed the uh priority of the node itself because then we would have to go over here do some sort of inmemory update and that's not exactly the easiest thing to do perhaps then instead of doing a heap we might want to do some sort of sorted link list with a hashmap where where the hashmap is basically going from node ID to a particular element of the list or something like that so that we could access it in constant time cool hopefully that's starting to make sense but the gist is we want all these operations to be Atomic I think it would be really really nice if we could just keep everything simple use something like a standard SQL database uh and yeah that should hopefully make our life a little bit easier of course though if you are aiming for uh you know faster reads or writes then you know maybe you'll choose a different database if you're able to fit everything in memory that could be great but yeah I mean we pretty much have a right ahead log right here and then we also have a separate inmemory index so you're basically looking for a database that implements a sorted inmemory index for you otherwise you would effectively just build something out like this on your own cool let's talk about replication quickly to get a better sense of what's going on here so we want to be sure that basically all of our messages are going to be delivered so what options do we have well for starters we have multi-leader replication which basically means that if we want to write to the same node ID from multiple different regions that we could that being said I don't really know that this is going to be a huge requirement of our system the reason being I can't imagine that people from multiple different regions are going to be modifying the same task to be processed asynchronously it could happen but it seems like at least for this case it's probably less likely and so we're not going to get that much benefit there and it also comes at the cost of potentially having to deal with right conflicts when that that does occasionally happen in theory we could do leaderless replication if we wanted some sort of strong consistency right so we could try and use Quorum consistency in order to effectively ensure that you know within a partition of our priority queue or something that uh we make sure that every single event that goes in there is replicated sufficiently such that it doesn't get lost so that's actually not a terrible idea and could potentially avoid us you know having to do something like a two-phase commit to make sure that all events are synchronously rep replicated somewhere uh but at the end of the day I think that uh you know if we just kind of go towards single leader replication we may end up getting faster read and write response time so it seems pretty preferred to me finally we could do single leader replication however again single leader replication is not perfect in the sense that uh you know if you write to a single leader and then you're doing asynchronous replication and the single leader goes down before that message gets asynchronously replicated all of a sudden that message is lost so what we could is uh something we've started to talk about a little bit more which is use a consensus algorithm with some sort of like replication group where you know let's say every single leader has two followers that message has to be synchronously replicated to one of the followers uh before you know we can proceed right because uh that's kind of how something like raft works you have to get a majority of nodes to agree that the message is real before you can proceed and uh consider it a successful right so we can do that and the nice thing about that is that you know when we do see that a certain note exists uh we can just read it from the leader and then we only have to read the message from one place so that actually helps us with our read speeds compared to something like Quorum consistency cool let's go ahead and talk about partitioning so we want to basically make our read and our write throughput as fast as humanly possible because we want this thing to be a massive system at scale so what does that really mean well it means we want to decrease our read and write load from a single node in any of our partitions so another thing to note is we're already handling message events out of order right I mentioned that we're likely going to have multiple consumers here as opposed to just one and because we have multiple consumers anyway uh it basically means that we're never going to get perfect ordering and so actually using something like round robin will probably be the most Fair way to distribute events by their priorities uh or rather not by their priorities but just completely randomly and that way we can ensure that you know you have consumers reading from every sing Le node and as long as they're doing that even if we're not getting perfect priority ordering across all nodes we're still doing this pretty well another op option as opposed to round robin would actually be to like Partition by priority or like by the range of priority but the problem with that is assuming priority is like a continuous value and there's not just like uh you know you can have one two and three priority assuming we can literally have any number as our priority then what that could lead to is a lot of uh hot reads on the top Shard because at the end of the day when you're doing a priority queue all of the reads are going to be coming from one note so even though the rights may be spread out first of all there could be an uneven distribution in the priority ranges provided but also the reads are going to be completely locked in on that one node and because we're not getting fairly ordered rights anyway because we have a bunch of consumers consuming from that one node it doesn't really benefit us to be only consuming from one place so I thought a lot about this concept of like trying to Partition by priority when I was thinking through this video but ultimately doing this thing round robin and basically just randomly spreading out events is probably going to be the best way to get this done cool so now we know that we have events going to many partitions right so this could be like partition one and we' you know we've got our little like Heap in memory and data on disk this could be partition two this could be partition three it is possible that we might have one particular consumer that is actually reading from multiple partitions at once so if this were to be the case how would we go ahead and do it well actually we would probably want to make sure that we're aggregating the data across all three of these nodes at once and then picking the one with the highest priority across all three of them so the way that we could do that is basically by having a consumer over here establishing its own little Heap in memory and basically picking from the top of the Heap so this could be the one from partition one one and then it'll have to reach out to partition one again to basically back fill this Heap once this guy is processed so the way that we can do that at least in my opinion is probably going to be best done with something like a long pole the reason being that using a websocket here or a server sent event especially if uh our cues are or sorry especially if our priority queue is not especially busy it basically means that uh we're going to be maintaining a persistent connection when probably most of the time we don't need to maintain that persistent connection if these events actually take fairly long time to process then you know this guy's going to have a lot of downtime where it doesn't actually need to ask for another event and so just fetching one event as opposed to maintaining a persistent connection the whole time seems like a much better solution here I also wrote downtown instead of downtime because my brain doesn't work that's a shame cool finally let us go ahead and go into our actual design so again we've got our client on the left this guy is someone who is going to be publishing to our priority when we do that the first thing that we do is hit our load balancer which is going to use round robin the reason being is that we can't ensure a perfect ordering of all of our events by their priorities as long as we have multiple consumers anyway so we may as well approximately spread them out and just have our consumers listen to certain priority notes that's going to make our life a lot easier then over here basically the kind of priority CU itself will be held on a mySQL database so effectively we're just inserting in primary key order like right into a log ideally o of one inserts onto disk and then separately maintaining this derived data structure of a heap over on memory so in memory we're basically going to be indexing on the priority score and then of course within our partition like I mentioned we're going to be doing some replication with paxos or raft or any other consensus algorithm the reason for that being that once we've actually properly done that uh paxos or raft algorithm we can be sure that the message is going to be fault tolerant and will eventually be read that is an extremely important part of this so what does that look like in real life well it basically means you propose to one node you propose to another node this is the proposed phase they both come back and say whether or not they'll do it it almost looks like a two-phase commit except the difference here is that you only need a majority approval so if this guy doesn't actually ever respond because he's down we only are able to send the message to the other node right over here but since we got a majority of approval and that node goes through we can actually go back to the client and say Hey you wrote successfully your thing is in our system it's going to get processed now of course from there we have our consumers so it's possible that you could say have one consumer sharing a single partition with another consumer and if that were to happen you would basically use locking over here to Ure that they're both not receiving the same event and then of course once they acknowledge that event that they've handled it properly that event is then deleted or marked as deleted from uh the MySQL table cool additionally if you have a consumer over here that is handling multiple partitions then we have to do what I described before which is basically do long polls to the nodes that they care about aggregate some sort of local list where you get the priorities of each node and figure out which one's the next one that you want to handle and then handle them accordingly writing back to the database and saying hey I just handled your last event please give me your next one of highest priority so I can consider that well guys hopefully you found this fairly useful again this is not really my design I kind of just stole it from Facebook which is a nice feeling uh but it is a fairly good design and I see why they use it and I see how it does scale so Props to them for that one anyways hope all of you have a great day thanks again for uh the channel landmark and enjoy your weekend