Transcript for:
Análisis de eventos de visualización de LinkedIn

  • Hello everyone. Good afternoon. So most of you have LinkedIn profiles and you have probably definitely visited one of the LinkedIn profiles viewing someone else. So today I'm going to talk about what are all the cool things that we can do with just this one event where you are looking at someone else's profile. So what happens in the background? So once you look at someone's profile, there is an event that is emitted, which is very simple, which says, who viewed who at what time, right? This is a very, very simple event, but what are all the things that you can do with this? So the first thing is, this goes into a database, let's say you have two databases, two tables, one, you are basically logging all these events, saying someone looked at someone's profile at this particular time. And then you have another table where you have, what is the profile of this person? Where is he coming from? What is his industry? What are his skill sets? So now for the analyst, this is like a really good thing where they want to know. Hey, what's happening with all the pages at LinkedIn? How many people are looking at these pages? What is the breakdown by a different country? Can I get what's happening on a particular desktop? Can I know what's happening across various dimensions? So this is a very typical analytical use case. And in pretty much any company, you can relate to this where you have some user profile and you want to know what is happening with that user. How many people are looking at that profile? There are multiple ways to solve this and I'll just go over like different things and how we solve it at LinkedIn. And what are other ways of solving this? The option one is very simple. This is pretty much how most of the analytics systems started. When the data was very small. You basically have two tables. You have the profile view event, which is what is logging all the events. And then you have the profile date table. So all you need to do is you have a query which looks something like this. You do a join on the fly. So nothing fancy here. This is typically how all analytic applications started. This works very well. You're getting realtime injection into this databases. Oops. And the only problem with this, it's doing join on the fly. So that means it's joining too big. There is probably the profile can be a smaller table, but you're joining two big tables and that can actually have really high latency. The other one is like a complete extreme opposite, which is you do the pre-join and the pre-aggregation and the pre-cubing all up front. So you basically take this, the same two tables, but you are no longer querying the base tables. Now instead you are joining this upfront and then you're aggregating it. And then you're computing all the cubes that you want. So here, if you look at it, you just query the final table and everything is ready. You are just looking at one row or few rows, and you get like really, really good latency here. The problem with this on the flip side, it's batch. So you cannot really keep computing this on the fly, all doing the pre-cubes. So there is a lot of latency here. And the storage is also very expensive because you're trying to compute lot of dimensions of French. So the more number of dimensions you have, the cost of storage increases drastically, and it's not flexible. Every time you change a schema, you add a new dimension. You need to recompute your entire data. So operationally as well, this is not really feasible. So the third option, which is somewhat kind of a hybrid approach you can say is you have this stream and you have this other database and you do the join on the stream, on the fly. And then you put this, do some kind of an aggregation, like five minutes, 10 minutes, or whatever you want. How fast do you want the aggregations or the insights to be available? And then you have this pre-join or the de-normalized data where you have all things in one place. The advantage of this is now we are not doing a join on the fly. So your latencies can be much more predictable. So you can have any kind of analyst, any kind of interactive dashboards serve up in like milliseconds and you don't have to wait for long. So let's look at all these things in one slide. So as you see, these are all the places where you can do the analytics. So you can start either on the raw data, have it on the pre-join or the free aggregated data, or you can go all the way to pre-cube data. So if you look at the systems that are there, these are all these systems provide analytics. So you have Spark, Presto, Big Data, BigQuery, all of them work on the raw and through it, Pinot, Elastic Search, those are somewhat somewhere in between. You can either use these systems on the pre join data or the pre aggregated data and Kylin or any other key value store where you basically put in the pre-cube data. That's another approach of solving this. And as you look at on this side, so these are the two contradicting things where in one system you are getting like really good flexibility. Like for example, if you work on the raw data, you get like very good, full sequel support. You can do joins. You can have a lot of sequel functionality there, but as you see on the latency side, they're not very good. So it can take seconds or minutes, depending on the data size. On the other side, you have very good speed, but there is no flexibility. So you can only do, if you have pre-cubed your data, you can't really do slicing and dicing very easily, or you cannot do arbitrary roll-ups. So the next point is how do these systems store the data? So as you all know, there are two ways to do this. One is you either put it in the row storage or in the columnar format. So I won't get too much into details here. Typically in a row store, you keep all the data, all the things in a particular row together. Whereas in columnar you keep all the values in the column together. It provides very good compression, and it's very good for analytics. So how do they scale? So it's very simple. Again, here you take the large data, you divide it into chunks, and each chunk is basically represented as a segment. You take that segment and distribute it over multiple nodes. This is a classic way of scaling data. And this is typically what's done either for OLTP or roll up use cases. What happens within the segment? So this is where things start getting different from a columnar store to a row store. So if you look at it, you have the data and then every column is stored independently. So the reason why we split that into columns is you get a very good compression because within a column, a lot of values get repeated. So instead of storing the value as it is, you can create a dictionary ID for that. And then if you look at it, you sort it, and then you have special ID's for each value. And when you store the column, you don't really store the actual value, you store the dictionary IDs. So now you can use bits instead of using the entire value, and then you get a lot of compression. So typically you can get somewhere between three to 10 X, or even more in some cases, in terms of the actual data size to the compressed data size. And the other thing that we have here is the inverted index. So here you basically, this is just like any other posting list. It's same as in the OLTP databases as well. So how does the query get executed on some sort of the segment that is something like this, which has a forward index and an inverted index. So the first thing that you look at it is the where cross, which is like, how do I filter? So if you take a look at this, like you say, where genre is pop, so you need to filter it. So you get the raw IDs, which come kind of match it. And then this is looked up for, from the invited index to get all the IDs. And then you do the projection. You say, what are all the columns needed for projection? So you look at the artist and then the next phase spits out, okay, here are the artist. And here is the count for every possible values. And the last one is doing the group by aggregation. So that's basically the research. So this is pretty much how query execution works within a particular segment. So now how does it work when they're distributed? So it's pretty much expanding the same concept. So you have the same segments. The segments are distributed across multiple nodes here. So in case of Pinot, we use Apache Helix, which is the cluster management framework. And we use that to control the entire coordination between different nodes. It figures out if a node goes down or up, and then based on that, all the metadata is stored. So the first thing is when a query comes in, the broker looks at Helix, it figures out where should I send all these queries? Where are all the segments located? And then it fetches the routing table, and then it sends, it does a scatter gather. It sends the queries to all the possible nodes, and then it gets the response from each of these nodes and then merges it and then sends the response back. So it's a typical scatter gather, pretty much similar to what other search systems do. So in terms of ingestion, there are two ways you can ingest data into Pinot. So one is either through real time, which is Kafka based. So all the events can come in through Kafka. So for example, if you viewed someone's profile, there is an event that is emitted to Kafka and Pinot ingests that immediately. And within seconds, you can get analytics on top of that. And there is also another approach, which is batch, where we can create these segments offline and then push it into Pinot. So a benchmark on what happened, what we took, Druid and Pinot. How many of you know Druid here? Pretty much. Okay, awesome. So I don't have to get into detail. Druid is another system, which is exactly like Pinot in terms of architecture, but there are a lot of differences in the way things happen in the execution layer and other feature sets, which we will get into detail in the next remaining section. So this is, we took the same. We took one data set, which is exactly the page views that happen on the profiles at LinkedIn. And then we've compared Druid and Pinot. As you see here, the frequency distribution of how many queries. So you see that a lot of queries are served much faster by Pinot than on Druid. So typically it doesn't really matter in dashboard use cases, because if you look at the numbers, it's like 200, 84 milliseconds through 136. So for a human for it to perceive. It's not that critical. So both the systems do pretty well, but Pinot is much more efficient. So where this comes in plays an important role. As you have a big cluster, you have a lot of data, and then you have a lot of people querying it. So then you are basically with Pinot, it's much more cost effective than Druid, but for an individual query, you don't really perceive the difference in latency. So we were at some point in 2015, 14, we were like, Hey, this is amazing. We are all done. So we were happy drinking, but some people, they don't like happy people. So someone came up with the idea and then said, Hey, what if we take this analytics that you're providing to analysts and then show it to all the users of LinkedIn? So we basically said, it's not just for internal users. It's showed for all the 500 million members at LinkedIn. So it's a huge, huge change. It's the same product. It's the same data set. It's the same visualization. Now it's not just for thousands of people within the company, which roughly you get like one QPS or two, up to 10 QPS. Now it's like for the entire public. And then you might get somewhere around 5,000 QPS on one system. So we said like, what can go wrong? Let's just try it and then see what happens. So this is basically how the latency curve look like. We just took the same system. We said like, just bump more QPS to it. So this is typically the problem with all our systems that happened so far, is it doesn't really scale for QPS very well, it's very good for like one or two QPS, they do very well, but then once you start pumping more data, more QPS it just falls apart. So we estimated, and we said like, Hey, this is going to take like hundreds of nodes for us to launch this application in the current state. So what are we going to do? Very simple, identify the problem and then fix it. We thought a lot. And then after a lot of thinking and brainstorming, we identified the problem. We were drinking a lot. And someone came up with this idea and then said, Hey, what do we change? So we said, okay, we'll fix it. And how do we fix it? We drink a lot more. And people who don't know about the Ballmer's peak. So it's like, there is a point where you need to really get to where you can code really well. So we really need to achieve that point. So anyway, jokes apart. So we'll come back to the real stuff. So we'll try to look at like, the OLTP databases have already done it, right? So they can scale to millions of QPS There is nothing fancy about it over there. How do they do it? So the reason is the number of seeks that happen to serve an OLTP query, which is a transactional key lookup is much lesser because you have the build tree, you can directly get to the place where the row is stored. And then within one seek, most of the time it requires login comparisons, but with optimization and auxiliary indexes, you can pretty much reduce it to one seek. So you're scaling at the speed of this, which is with SSDs. You can pretty much go up to one GB per second. So it's really, that's the bottleneck that you have. So the other advantage you have with the OLTP is also since it's a key lookup, you can scale horizontally because if you can hash on the key, you can pretty much go to one node. You're not sending the query to all the nodes. With OLAP, that's not the case. So this slide is a bit overloaded, but kind of try to think about it. What happens in the filter? So if you look at it in the filter, you first have to load the entire inverted index. So that's kind of, can be either very big or can be very small. So in this case, you have a platform where you have like only four values, desktop, mobile, and things like that. And that's very long. So you need to load a pretty big inverted index. And in terms of VieweeId, which is how many people looked at it, it's kind of small, so it can be pretty less. So once you have that, the second part is like, how do you do the aggregation and group by? So here, if you look at the VieweeId columns that are spread all over the place. So even if you want to read one value, you are moving a lot in your desk space. So you are to do a lot of seeks at different places. So that's kind of compared to the OLTP, instead of one seek, you have lots and lots of random seeks that needs to happen. And the last but not the least is you can't really go to a particular node because you have to do a scatter gather. So you have to spread your load across all the nodes. So we took this and then said, okay, how do we eliminate these seeks? How do we minimize the number of seeks and how we can get the, sorry. How do we get the QPS that we want? So the first optimization we did was since the scans were doing all over the place, we said like, can we sort this data? Bring all the relevant data in one particular place. So this is a very simple optimization, but it worked very well. So we just ordered on the VieweeId, we got all of them in one place. And then we said, Hey, with this optimization, we don't need the forward index now. The forward index, sorry, the inverted index for this. So we can basically eliminate the inverted index. And the forward index has become even more better because now we have to just store the start point, and the end point. We no longer have to store every individual values and then compress it. This gives very good compression. The second optimization we said is how do we eliminate the second inverted index, which is their desktop? So if you look at it, since VieweeId can be applied first, we can say that, Hey, if this is very small, and if not many people have looked at the profile, we don't have to scan the entire inverted index for desktop. We don't even have to load it. We can actually do the filtering on the fly, because if for the other desktop, if we just do it in one seek, we can just get all the data and then filter it on the fly. So we basically eliminated the inverted index completely. So we now have one sorted index and that's pretty much it. And then the third optimization we did was kind of think about like how other databases have come up with the plan optimization. So here it is slightly different. We don't do a logical plan optimization, but we look at the physical aspects of the segment itself. And then on a per query, on a per segment basis, we can figure out what is the best plan to execute. So without getting into the details, it can reorder the evaluation predicates. It can figure out what needs to be run differently for every query. And then it just looks at the metadata and comes up with the best plan. So it's not a fixed query plan. That's basically the key takeaway from this. So this is basically what we got from where we were. So if you remember the first one, we basically just had exponential and just with inverted index and without the sorted index, if you can see it's still, we are doing better, but it's not as good as what we wanted. And if you look at with this, it's actually so much better compared to where we were. And this is the last one is like without inverted index. So it's like a simple scan, but we are very good at seek. And then we just read whatever is needed. And we didn't stop there. We said, okay, let's learn one more thing from the OLTP, which is the partitioning and the scatter gather. So since the scatter gather goes over across all of the nodes, we were also having issues with 95th and 99th percentile latency, because this is site wide. So we are not just looking at the 50th percentile. So now we are like, no members should actually have a bad experience. So it's like, we are looking at 99th. So it's like even with 500, it's millions of members. So what we did here is we said, Hey, this query is coming for a VieweeId for a particular person. What if we shatter it? What if we come up with a different, much more optimized placement strategy? So we looked at this and instead of spreading the data all over, we created what is called a concept of replica group. So we kind of divide the segments, and we also have the concept of partitioning. So with this, we got even more improvement. So we said like, without broader optimization, we were somewhere here. Now we were able to push it even further. So this is kind of all the optimizations that we did. And finally, we were able to solve what we thought would take like 15 hundreds of nodes. We kind of got it around 30, like less than 15 to 30 nodes for the entire 5,000 QPS. Comparison with Druid. So Druid is kind of where we were when we started with this exercise. So it's kind of, we tried the same data set with Druid, it basically exponentially spikes. It's good in low QPS, the latencies are comparable, but none of the optimizations that we have done for addressing site speed, site facing applications have gone into Druid. The next, how are we doing on time? Okay. Okay. So this is the next idea that we got and this time I'm to blame, because I said like, Hey, we have all these data, so what next, what can we do? So we said, why can't we solve anomaly detection on top of this data? Because it's the same data set that we have. And then we built a application called Astati which says, can I monitor this data? And then say, what's happening with all the views that are coming to LinkedIn? So these are page views, it's kind of like a key business metrics at LinkedIn. So it's monitored pretty much every hour. So it's a very, very important touch, like the top five metrics. So we want to monitor that and then say, is there any problem? It's not just looking at the top level, but can we also do multidimensional anomaly detection? So you can say is the page used from India on Android actually having a problem? So that's the level at which we can slice and dice and then do all these things in paddle. So what's the problem here? So if you look at it from the previous one, it's kind of someone can say, Hey, it's easy. You basically took the key ID. And then there was a particular primary key. And then you sorted on that, I'm sorry, I'm not sure. You sorted on that, and then you were able to access it very quickly, but in this case there is no such thing because your query can be select from country equals US or whatever combinations that's possible, right? So we don't really have a particular primary key on which we can sort the data. So here, the latency is completely unpredictable. So for something like US, you had to scan like 50% of your data. And then that's going to have a lot of latency. In terms of, let's say some country in Africa, you can scan it pretty quickly. So the latency is very variable, like from very small to large latencies. So if you look at it again, going back to the initial solutions that I talked about, there are two types of solutions. One is the one that we are talking about, the Pinot and Druid, which is like completely scan based. So here the latency is like, can vary from very quick to really bad. And then you have this other pre-compute solution, which is like super good, but the storage cost is very high because now you are pre-computing all possible combinations. So this is kind of how the world exists today. So you have this solution or you have something on the other extreme. So what we built was something called a startree, which can actually be anywhere in between based on your requirement. So it's kind of allowing us to do a trade off between space and time. So if you want really fast response, you can set the configurations so that, Hey, I want really fast response. And it's completely data driven. So it figures out what needs to be done just by looking at the data. You don't have to tell anything else other than, Hey, I want it to be, it shouldn't scan more than X number of records on any query. This is a little bit complex. Feel free to catch me after this, if you, but I'll just go over at a very high level, how this index is created. So you take the raw data. And then the first thing that we do is actually do a multidimensional sort. So the way we saw data is we have some heuristics. We look at the cardinality of every column, and then we sort it. And once we sort it, we created three index on the side. So we create a root node and then we start branching out. So we look at the cardinality and then we say, Hey, country is kind of the highest cardinality, we branch out on country first. And each of these nodes here just maintain the pointer. At this point, we haven't created any additional nodes here. And then we create a star node. So what the star node does is we remove the country dimension, and then we further aggregate the data. So this is basically, if you look at the aggregated data, so now we have Chrome IE and Safari pre-computed, right? And we just apply this recursively. That's pretty much it. So we now do it on the browser level, and then we keep doing it until it hits the split threshold. So this is very, very important, the split threshold. What this allows us to do is, it's no longer exponential, so we can stop it at a particular point. So, which is the problem with the other one where split threshold is basically one where you precompute every possible combination. So this is kind of the trade off that we make. So if you say split threshold is a hundred K, you can basically say that any query of the form equals B1 and equals B1 and equals, B2 will never scan more than T. So that's the whole concept behind this. So just to get back to this. So it's like something like this, you can say split threshold is a hundred K. So it just pre-computes all the things that are needed so that your query will never scan more than a hundred K. So that's the trade off. And if you look at here, if you say split threshold equals one, you basically end up with a pre-compute solution, which computes every possible combination. And if split threshold is infinity, it is basically what it is today. It just keeps that data as it is. And it doesn't pre-compute anything. Let's look at the query execution on this. So here, if you look at it, the bottom section says the whole overall concept. It should never scan anything more than split threshold. So let's say the query is select some from country equals al, it just goes to that. And in this, it's not even using a pre computed because al doesn't really have a lot of views. So it can just scan and give different results. If you have something on Canada, on country. So here in this case, it basically takes the ca path. And then it goes to the star and then it has the solution pre computed, and then it just returns it. Or if there is some records, leaf records that are pointing, it can just scan them and return the results very quickly. Similarly, these are the other two queries. And if you have some of use, if you just want to get the overall, it just goes to star star, and then you have all these data. So this basically allows us to tune the configuration, according to the use case. And we can go from anywhere from split threshold equals a hundred K or 200 K to sometimes even much lesser like thousand or something like that, depending on the application. So you get that node, which allows you to trade off between space and time. So this is basically what happened at the benchmark with the startree. So you see that the QPS, the latency kind of is stable until like some point, and then it's starting to, but you are getting a lot more throughput on the same system. So typically we see something around two times the data size with startree, so it kind of doubles, but which is okay to pay that cost. It's not exponential. You don't have to pay a lot of 10 times or 20 times the cost, but you get a lot better throughput. So this allows us to do the anomaly detection, which has this bursty workloads, one after the other. And we just kind of periodic, so every five minutes or every half an hour, we are sending a bunch of queries to the system and it still holds good. So it's the same system that we had the dashboards work on. Now we can do anomaly detection on the same system without having any paying additional costs. So it saves on operations. We don't need to have additional nodes and we don't have another system for us to maintain. And it scales as we add more nodes So quick, it's used extensively, pretty much any analytics application that you look at on LinkedIn, something like who VieweeId my profile, campaign, analytics, content or talent insights, which allow you to look at who moved from which company to which company, or how the transitions are. So it kind of gives you all those insights. So we kind of ingest 120 billion events per day in real time. And if you look at the queries, now that we have like 70,000 queries per second, this is almost like the OLTP kind of workload. I mean, yeah, OLTP can go in millions, but we are kind of getting there in terms of the QPS. We have, what else is, yeah, all the latencies. We have very strict latency for this. So we have to satisfy the 95th and the 99th percentile latency here. So most of them tend to be in milliseconds. So based on the use case, we either figure whether it's a startree can help us here or just the sorted index and the other optimizations that we need. Outside, what are using it? So Uber is a very big user of Pinot. So they have lots of applications. They are also using it for site facing, like Uber Eats. All the dashboards that the client sees is powered by Pinot. And then Microsoft is using on Teams. Slack is using, and then this is the best one where they move from Druid to Pinot and they save tremendous amount in terms of hardware. So then they say still over provision. So this is, this is pretty good for Pinot's usage outset. Just to end here. So I think we took on this very challenging part where we said like, Hey, can there be a system like, which is built on the columnar storage and take it all the way and then sell the site, right? I mean, that's no one has done it so far. Most of this site facing have been using the key value or the OLTP site. And we wanted to not have this overhead where you start an application, it's low QPS in the beginning, but as it becomes famous, the QPS increases. And now you have to change your system. That's like really, really expensive when we wanted to avoid that migration. So we built this system and we were able to successfully use this for all the varying use cases, dashboarding, site facing and anomaly detection. Finally, we are back to drinking. So we'll see until the next thing. And here, the other thing is we are going to be Apache very soon and we are in the process. So that's one. And then here is where you can find the core. It's completely open source. We run the same code. That's open source, inside LinkedIn as well. There is absolutely no fork. And last but not the least here is the set of team, without whom we couldn't have done this. Thank you. (clapping)