Transcript for: Análisis de eventos de visualización de LinkedIn
Hello everyone. Good afternoon. So most of you have LinkedIn profiles and you have probably definitely visited one of the LinkedIn profiles
viewing someone else. So today I'm going to talk about what are all the cool
things that we can do with just this one event where you are looking at
someone else's profile. So what happens in the background? So once you look at someone's profile, there is an event that is
emitted, which is very simple, which says, who viewed
who at what time, right? This is a very, very simple event, but what are all the things
that you can do with this? So the first thing is,
this goes into a database, let's say you have two
databases, two tables, one, you are basically
logging all these events, saying someone looked at someone's profile at this particular time. And then you have another
table where you have, what is the profile of this person? Where is he coming from? What is his industry?
What are his skill sets? So now for the analyst, this is like a really good
thing where they want to know. Hey, what's happening with
all the pages at LinkedIn? How many people are
looking at these pages? What is the breakdown
by a different country? Can I get what's happening
on a particular desktop? Can I know what's happening
across various dimensions? So this is a very typical
analytical use case. And in pretty much any company, you can relate to this where
you have some user profile and you want to know what
is happening with that user. How many people are
looking at that profile? There are multiple ways to solve this and I'll just go over
like different things and how we solve it at LinkedIn. And what are other ways of solving this? The option one is very simple. This is pretty much how most of the analytics systems started. When the data was very small. You basically have two tables. You have the profile view event, which is what is logging all the events. And then you have the profile date table. So all you need to do is you have a query which looks something like this. You do a join on the fly. So nothing fancy here. This is typically how all
analytic applications started. This works very well. You're getting realtime
injection into this databases. Oops. And the only problem with this,
it's doing join on the fly. So that means it's joining too big. There is probably the profile
can be a smaller table, but you're joining two big tables and that can actually
have really high latency. The other one is like a
complete extreme opposite, which is you do the pre-join
and the pre-aggregation and the pre-cubing all up front. So you basically take
this, the same two tables, but you are no longer
querying the base tables. Now instead you are joining this upfront and then you're aggregating it. And then you're computing
all the cubes that you want. So here, if you look at it, you just query the final
table and everything is ready. You are just looking
at one row or few rows, and you get like really,
really good latency here. The problem with this on
the flip side, it's batch. So you cannot really keep
computing this on the fly, all doing the pre-cubes. So there is a lot of latency here. And the storage is also very expensive because you're trying to compute lot of dimensions of French. So the more number of dimensions you have, the cost of storage increases drastically, and it's not flexible. Every time you change a schema, you add a new dimension. You need to recompute your entire data. So operationally as well,
this is not really feasible. So the third option, which is somewhat kind of a
hybrid approach you can say is you have this stream and you have this other database and you do the join on
the stream, on the fly. And then you put this, do
some kind of an aggregation, like five minutes, 10
minutes, or whatever you want. How fast do you want the aggregations or the insights to be available? And then you have this pre-join
or the de-normalized data where you have all things in one place. The advantage of this is now we are not doing a join on the fly. So your latencies can be
much more predictable. So you can have any kind of analyst, any kind of interactive dashboards serve up in like milliseconds and you don't have to wait for long. So let's look at all
these things in one slide. So as you see, these are all the places where
you can do the analytics. So you can start either on the raw data, have it on the pre-join or
the free aggregated data, or you can go all the
way to pre-cube data. So if you look at the
systems that are there, these are all these
systems provide analytics. So you have Spark, Presto,
Big Data, BigQuery, all of them work on
the raw and through it, Pinot, Elastic Search, those are somewhat somewhere in between. You can either use these
systems on the pre join data or the pre aggregated data and Kylin or any other key value store where you basically put
in the pre-cube data. That's another approach of solving this. And as you look at on this side, so these are the two contradicting things where in one system you are getting like
really good flexibility. Like for example, if you
work on the raw data, you get like very good,
full sequel support. You can do joins. You can have a lot of
sequel functionality there, but as you see on the latency
side, they're not very good. So it can take seconds or minutes, depending on the data size. On the other side, you
have very good speed, but there is no flexibility. So you can only do, if you
have pre-cubed your data, you can't really do slicing
and dicing very easily, or you cannot do arbitrary roll-ups. So the next point is how do
these systems store the data? So as you all know, there
are two ways to do this. One is you either put
it in the row storage or in the columnar format. So I won't get too much into details here. Typically in a row store, you keep all the data, all the things in a particular row together. Whereas in columnar
you keep all the values in the column together. It provides very good compression, and it's very good for analytics. So how do they scale? So it's very simple. Again, here you take the large data, you divide it into chunks, and each chunk is basically
represented as a segment. You take that segment and
distribute it over multiple nodes. This is a classic way of scaling data. And this is typically what's done either for OLTP or roll up use cases. What happens within the segment? So this is where things
start getting different from a columnar store to a row store. So if you look at it, you have the data and then every column
is stored independently. So the reason why we
split that into columns is you get a very good compression because within a column, a
lot of values get repeated. So instead of storing the value as it is, you can create a dictionary ID for that. And then if you look at it, you sort it, and then you have special
ID's for each value. And when you store the column, you don't really store the actual value, you store the dictionary IDs. So now you can use bits instead
of using the entire value, and then you get a lot of compression. So typically you can get
somewhere between three to 10 X, or even more in some cases, in terms of the actual data size to the compressed data size. And the other thing that we
have here is the inverted index. So here you basically, this is just like any other posting list. It's same as in the
OLTP databases as well. So how does the query get executed on some sort of the segment that is something like this, which has a forward index
and an inverted index. So the first thing that you
look at it is the where cross, which is like, how do I filter? So if you take a look
at this, like you say, where genre is pop, so
you need to filter it. So you get the raw IDs,
which come kind of match it. And then this is looked up for, from the invited index to get all the IDs. And then you do the projection. You say, what are all the
columns needed for projection? So you look at the artist and then the next phase spits out, okay, here are the artist. And here is the count for
every possible values. And the last one is doing
the group by aggregation. So that's basically the research. So this is pretty much
how query execution works within a particular segment. So now how does it work
when they're distributed? So it's pretty much
expanding the same concept. So you have the same segments. The segments are distributed
across multiple nodes here. So in case of Pinot, we use Apache Helix, which is the cluster management framework. And we use that to control the entire coordination
between different nodes. It figures out if a node goes down or up, and then based on that,
all the metadata is stored. So the first thing is
when a query comes in, the broker looks at Helix, it figures out where should
I send all these queries? Where are all the segments located? And then it fetches the routing
table, and then it sends, it does a scatter gather. It sends the queries to
all the possible nodes, and then it gets the response
from each of these nodes and then merges it and then
sends the response back. So it's a typical scatter gather, pretty much similar to what
other search systems do. So in terms of ingestion, there are two ways you can
ingest data into Pinot. So one is either through real
time, which is Kafka based. So all the events can
come in through Kafka. So for example, if you
viewed someone's profile, there is an event that is emitted to Kafka and Pinot ingests that immediately. And within seconds, you can
get analytics on top of that. And there is also another
approach, which is batch, where we can create these segments offline and then push it into Pinot. So a benchmark on what
happened, what we took, Druid and Pinot. How many of you know Druid here? Pretty much. Okay, awesome. So I don't have to get into detail. Druid is another system, which is exactly like Pinot
in terms of architecture, but there are a lot of differences in the way things happen
in the execution layer and other feature sets, which we will get into detail
in the next remaining section. So this is, we took the same. We took one data set, which is exactly the
page views that happen on the profiles at LinkedIn. And then we've compared Druid and Pinot. As you see here, the frequency distribution
of how many queries. So you see that a lot of queries are served much faster
by Pinot than on Druid. So typically it doesn't really matter in dashboard use cases, because
if you look at the numbers, it's like 200, 84
milliseconds through 136. So for a human for it to perceive. It's not that critical. So both the systems do pretty well, but Pinot is much more efficient. So where this comes in
plays an important role. As you have a big cluster,
you have a lot of data, and then you have a lot
of people querying it. So then you are basically with Pinot, it's much more cost effective than Druid, but for an individual query, you don't really perceive
the difference in latency. So we were at some point in 2015, 14, we were like, Hey, this is amazing. We are all done. So we were happy drinking, but some people, they don't like happy people. So someone came up with the
idea and then said, Hey, what if we take this analytics that you're providing to analysts and then show it to all
the users of LinkedIn? So we basically said, it's
not just for internal users. It's showed for all the 500
million members at LinkedIn. So it's a huge, huge change. It's the same product.
It's the same data set. It's the same visualization. Now it's not just for thousands of people within the company, which roughly you get like
one QPS or two, up to 10 QPS. Now it's like for the entire public. And then you might get somewhere around 5,000 QPS on one system. So we said like, what can go wrong? Let's just try it and
then see what happens. So this is basically how
the latency curve look like. We just took the same system. We said like, just bump more QPS to it. So this is typically the problem with all our systems that happened so far, is it doesn't really
scale for QPS very well, it's very good for like one or two QPS, they do very well, but then once you start pumping more data, more QPS it just falls apart. So we estimated, and we said like, Hey, this is going to take like hundreds of nodes for us to launch this application in the current state. So what are we going to do? Very simple, identify the
problem and then fix it. We thought a lot. And then after a lot of
thinking and brainstorming, we identified the problem. We were drinking a lot. And someone came up with
this idea and then said, Hey, what do we change? So we said, okay, we'll fix it. And how do we fix it? We drink a lot more. And people who don't know
about the Ballmer's peak. So it's like, there is a point where you need to really get to where you can code really well. So we really need to achieve that point. So anyway, jokes apart. So we'll come back to the real stuff. So we'll try to look at like, the OLTP databases have
already done it, right? So they can scale to millions of QPS There is nothing fancy
about it over there. How do they do it? So the reason is the number of seeks that happen to serve an OLTP query, which is a transactional
key lookup is much lesser because you have the build tree, you can directly get to the
place where the row is stored. And then within one seek, most of the time it
requires login comparisons, but with optimization
and auxiliary indexes, you can pretty much reduce it to one seek. So you're scaling at the speed
of this, which is with SSDs. You can pretty much go
up to one GB per second. So it's really, that's the
bottleneck that you have. So the other advantage
you have with the OLTP is also since it's a key lookup, you can scale horizontally because if you can hash on the key, you can pretty much go to one node. You're not sending the
query to all the nodes. With OLAP, that's not the case. So this slide is a bit overloaded, but kind of try to think about it. What happens in the filter? So if you look at it in the filter, you first have to load
the entire inverted index. So that's kind of, can be either very big or can be very small. So in this case, you have a platform where you have like only four values, desktop, mobile, and things like that. And that's very long. So you need to load a
pretty big inverted index. And in terms of VieweeId, which is how many people looked at it, it's kind of small, so
it can be pretty less. So once you have that,
the second part is like, how do you do the
aggregation and group by? So here, if you look
at the VieweeId columns that are spread all over the place. So even if you want to read one value, you are moving a lot in your desk space. So you are to do a lot of
seeks at different places. So that's kind of compared to the OLTP, instead of one seek, you have lots and lots of random
seeks that needs to happen. And the last but not the least is you can't really go
to a particular node because you have to do a scatter gather. So you have to spread your
load across all the nodes. So we took this and then said, okay, how do we eliminate these seeks? How do we minimize the number of seeks and how we can get the, sorry. How do we get the QPS that we want? So the first optimization we did was since the scans were
doing all over the place, we said like, can we sort this data? Bring all the relevant data
in one particular place. So this is a very simple optimization, but it worked very well. So we just ordered on the VieweeId, we got all of them in one place. And then we said, Hey,
with this optimization, we don't need the forward index now. The forward index, sorry,
the inverted index for this. So we can basically
eliminate the inverted index. And the forward index has
become even more better because now we have to
just store the start point, and the end point. We no longer have to store
every individual values and then compress it. This gives very good compression. The second optimization we said is how do we eliminate the
second inverted index, which is their desktop? So if you look at it, since
VieweeId can be applied first, we can say that, Hey, if this is very small, and if not many people
have looked at the profile, we don't have to scan the entire
inverted index for desktop. We don't even have to load it. We can actually do the
filtering on the fly, because if for the other desktop, if we just do it in one seek, we can just get all the data
and then filter it on the fly. So we basically eliminated
the inverted index completely. So we now have one sorted index
and that's pretty much it. And then the third optimization we did was kind of think about
like how other databases have come up with the plan optimization. So here it is slightly different. We don't do a logical plan optimization, but we look at the physical aspects of the segment itself. And then on a per query,
on a per segment basis, we can figure out what is
the best plan to execute. So without getting into the details, it can reorder the evaluation predicates. It can figure out what
needs to be run differently for every query. And then it just looks at the metadata and comes up with the best plan. So it's not a fixed query plan. That's basically the
key takeaway from this. So this is basically what
we got from where we were. So if you remember the first one, we basically just had exponential and just with inverted index
and without the sorted index, if you can see it's still,
we are doing better, but it's not as good as what we wanted. And if you look at with this, it's actually so much better
compared to where we were. And this is the last one is
like without inverted index. So it's like a simple scan,
but we are very good at seek. And then we just read whatever is needed. And we didn't stop there. We said, okay, let's learn one more thing from the OLTP, which is the partitioning
and the scatter gather. So since the scatter gather goes over across all of the nodes, we were also having issues with 95th and 99th percentile latency, because this is site wide. So we are not just looking
at the 50th percentile. So now we are like, no members should actually
have a bad experience. So it's like, we are looking at 99th. So it's like even with 500,
it's millions of members. So what we did here is we said, Hey, this query is coming for a VieweeId for a particular person. What if we shatter it? What if we come up with a different, much more optimized placement strategy? So we looked at this and instead of spreading
the data all over, we created what is called
a concept of replica group. So we kind of divide the segments, and we also have the
concept of partitioning. So with this, we got
even more improvement. So we said like, without
broader optimization, we were somewhere here. Now we were able to push it even further. So this is kind of all the
optimizations that we did. And finally, we were able to solve what we thought would take
like 15 hundreds of nodes. We kind of got it around 30, like less than 15 to 30 nodes
for the entire 5,000 QPS. Comparison with Druid. So Druid is kind of where we were when we started with this exercise. So it's kind of, we tried
the same data set with Druid, it basically exponentially spikes. It's good in low QPS, the
latencies are comparable, but none of the optimizations that we have done for
addressing site speed, site facing applications
have gone into Druid. The next, how are we doing on time? Okay. Okay. So this is the next idea that we got and this time I'm to blame, because I said like, Hey,
we have all these data, so what next, what can we do? So we said, why can't we solve anomaly
detection on top of this data? Because it's the same
data set that we have. And then we built a
application called Astati which says, can I monitor this data? And then say, what's
happening with all the views that are coming to LinkedIn? So these are page views, it's kind of like a key
business metrics at LinkedIn. So it's monitored pretty much every hour. So it's a very, very important touch, like the top five metrics. So we want to monitor that and then say, is there any problem? It's not just looking at the top level, but can we also do
multidimensional anomaly detection? So you can say is the page
used from India on Android actually having a problem? So that's the level at
which we can slice and dice and then do all these things in paddle. So what's the problem here? So if you look at it
from the previous one, it's kind of someone
can say, Hey, it's easy. You basically took the key ID. And then there was a
particular primary key. And then you sorted on that, I'm sorry, I'm not sure. You sorted on that, and then you were able to
access it very quickly, but in this case there is no such thing because your query can be
select from country equals US or whatever combinations
that's possible, right? So we don't really have
a particular primary key on which we can sort the data. So here, the latency is
completely unpredictable. So for something like US, you had to scan like 50% of your data. And then that's going to
have a lot of latency. In terms of, let's say
some country in Africa, you can scan it pretty quickly. So the latency is very variable, like from very small to large latencies. So if you look at it again, going back to the initial
solutions that I talked about, there are two types of solutions. One is the one that we are talking about, the Pinot and Druid, which is
like completely scan based. So here the latency is like, can vary from very quick to really bad. And then you have this
other pre-compute solution, which is like super good, but the storage cost is very high because now you are pre-computing
all possible combinations. So this is kind of how
the world exists today. So you have this solution or you have something
on the other extreme. So what we built was
something called a startree, which can actually be anywhere in between based on your requirement. So it's kind of allowing us to do a trade off between space and time. So if you want really fast response, you can set the configurations so that, Hey, I want really fast response. And it's completely data driven. So it figures out what needs to be done just by looking at the data. You don't have to tell
anything else other than, Hey, I want it to be, it shouldn't scan more
than X number of records on any query. This is a little bit complex. Feel free to catch me after this, if you, but I'll just go over
at a very high level, how this index is created. So you take the raw data. And then the first thing that we do is actually do a multidimensional sort. So the way we saw data is
we have some heuristics. We look at the cardinality
of every column, and then we sort it. And once we sort it, we created
three index on the side. So we create a root node and
then we start branching out. So we look at the
cardinality and then we say, Hey, country is kind of
the highest cardinality, we branch out on country first. And each of these nodes here
just maintain the pointer. At this point, we haven't created
any additional nodes here. And then we create a star node. So what the star node does is we remove the country dimension, and then we further aggregate the data. So this is basically, if you look
at the aggregated data, so now we have Chrome IE and
Safari pre-computed, right? And we just apply this
recursively. That's pretty much it. So we now do it on the browser level, and then we keep doing it until it hits the split threshold. So this is very, very
important, the split threshold. What this allows us to do is,
it's no longer exponential, so we can stop it at a particular point. So, which is the problem
with the other one where split threshold is basically one where you precompute every
possible combination. So this is kind of the
trade off that we make. So if you say split
threshold is a hundred K, you can basically say
that any query of the form equals B1 and equals B1 and equals, B2 will never scan more than T. So that's the whole concept behind this. So just to get back to this. So it's like something like this, you can say split
threshold is a hundred K. So it just pre-computes all
the things that are needed so that your query will never
scan more than a hundred K. So that's the trade off. And if you look at here, if you say split threshold equals one, you basically end up with
a pre-compute solution, which computes every possible combination. And if split threshold is infinity, it is basically what it is today. It just keeps that data as it is. And it doesn't pre-compute anything. Let's look at the query execution on this. So here, if you look at it, the bottom section says
the whole overall concept. It should never scan anything
more than split threshold. So let's say the query is select some from country equals al, it just goes to that. And in this, it's not even using a pre computed because al doesn't really
have a lot of views. So it can just scan and
give different results. If you have something
on Canada, on country. So here in this case, it
basically takes the ca path. And then it goes to the star and then it has the solution pre computed, and then it just returns it. Or if there is some records, leaf records that are pointing, it can just scan them and
return the results very quickly. Similarly, these are
the other two queries. And if you have some of use, if you just want to get the overall, it just goes to star star, and
then you have all these data. So this basically allows us
to tune the configuration, according to the use case. And we can go from anywhere from split threshold
equals a hundred K or 200 K to sometimes even much
lesser like thousand or something like that,
depending on the application. So you get that node, which allows you to trade
off between space and time. So this is basically what happened at the benchmark with the startree. So you see that the QPS, the latency kind of is
stable until like some point, and then it's starting to, but you are getting a lot more throughput on the same system. So typically we see something around two times the
data size with startree, so it kind of doubles, but which is okay to pay that cost. It's not exponential. You don't have to pay a lot of 10 times or 20 times the cost, but you get a lot better throughput. So this allows us to do
the anomaly detection, which has this bursty
workloads, one after the other. And we just kind of periodic, so every five minutes
or every half an hour, we are sending a bunch
of queries to the system and it still holds good. So it's the same system that
we had the dashboards work on. Now we can do anomaly
detection on the same system without having any
paying additional costs. So it saves on operations. We don't need to have additional nodes and we don't have another
system for us to maintain. And it scales as we add more nodes So quick, it's used extensively, pretty much any analytics application that you look at on LinkedIn, something like who VieweeId my profile, campaign, analytics,
content or talent insights, which allow you to look at who moved from which
company to which company, or how the transitions are. So it kind of gives
you all those insights. So we kind of ingest 120 billion events per day in real time. And if you look at the queries, now that we have like
70,000 queries per second, this is almost like the
OLTP kind of workload. I mean, yeah, OLTP can go in millions, but we are kind of getting
there in terms of the QPS. We have, what else is, yeah, all the latencies. We have very strict latency for this. So we have to satisfy the 95th and the 99th percentile latency here. So most of them tend
to be in milliseconds. So based on the use case, we either figure whether it's
a startree can help us here or just the sorted index and the other optimizations that we need. Outside, what are using it? So Uber is a very big user of Pinot. So they have lots of applications. They are also using it for
site facing, like Uber Eats. All the dashboards that the
client sees is powered by Pinot. And then Microsoft is using on Teams. Slack is using, and then this is the best one where they move from Druid to Pinot and they save tremendous
amount in terms of hardware. So then they say still
over provision. So this is, this is pretty good for
Pinot's usage outset. Just to end here. So I think we took on
this very challenging part where we said like, Hey, can there be a system like, which is built on the columnar storage and take it all the way and then sell the site, right? I mean, that's no one has done it so far. Most of this site facing have been using the key value or the OLTP site. And we wanted to not have this overhead where you start an application, it's low QPS in the beginning, but as it becomes famous,
the QPS increases. And now you have to change your system. That's like really, really expensive when we wanted to avoid that migration. So we built this system and we were able to successfully use this for all the varying use cases, dashboarding, site facing
and anomaly detection. Finally, we are back to drinking. So we'll see until the next thing. And here, the other thing is we are going to be Apache very soon and we are in the process. So that's one. And then here
is where you can find the core. It's completely open source. We run the same code. That's open source, inside LinkedIn as well. There is absolutely no fork. And last but not the least
here is the set of team, without whom we couldn't have done this. Thank you. (clapping)