Webinar transcript: Taking the pain out of the backfilling process for your data
When you have billions of datapoints, making retroactive changes can be seriously painful. Here's how to remove some of that pain, from our CTO Egor Gryaznov.
Egor Gryaznov, our CTO, recently walked us through some tips, tricks, and best practices for making the backfilling process easier when it comes to your gigantic stores of data. Check it out, and read below for a lightly-edited transcript.
Thanks, everyone for joining. My name is Egor. I'm the co founder and CTO at Bigeye. Today, we're going to be talking about how to make the backfill process a little bit less painful. If you're a data engineer, or have had to deal with data, I'm sure you are familiar with backfilling data and correcting it historically. I'm going to talk about tips and tricks to make it a little bit easier.
So, let's get started. Before Bigeye, I was at Uber, where I was one of the first people called "data engineer." I stood up the original data warehouse there. I built a lot of analytics and data-based tooling for the rest of the company embedded in teams and within the data platform itself. At the end of this session, I'm going to actually walk through one of the projects that I did around experimentation analytics and how we scaled our data processing there, especially around backfilling the data and correcting it historically as necessary.
So what are we talking about today? We'll talk about why backfilling sucks and why it doesn't always have to. I personally love this meme:
This is the "this is fine" dog. It's probably one of my favorite depictions of life as a data engineer, where everything around you is on fire all the time. And you just kind of put on a happy face and say, "Yep, this is fine." Backfilling is definitely one of those concepts. But we can talk about how you can do work ahead of time in order to make backfills a little bit easier.
So we're gonna have three sections today. First: we'll talk about techniques. This is going to cover how to process and store the data in order not to stuck in a paradigm that makes it very complex to reprocess data.
We'll talk about the tools you already use or may want to use, some existing frameworks that are popular, and scripting that you might want to roll in on your own or use to extend your current frameworks.
And finally, we're going to talk through that real-world example to bring it all together. As I mentioned, this is based on my time on the experimentation analytics team at Uber, and we'll talk about how we set ourselves up for success early when we were building out the processing pipeline, so that backfills became quite a bit easier.
Techniques for processing data
Let's start with techniques for processing the data itself. Why are we even talking about backfills? Let's take a quick step back and talk about the different ways that data is processed.
So if your data is very small, or if it changes all of the time, then full reloads are actually not a problem. Reprocessing all data for all its history is a very easy paradigm. There's never a need to actually reprocess history, because every single time you're processing runs, it already processes all historical data. So you're always going to have the most up-to-date data with a full reload.
However, as you scale and as the volume of data increases, it could become very expensive to do this at scale. Sometimes it may simply be required, especially for dimension tables.
Say you have an e-commerce store and you have users and orders, users are going to scale fairly slowly. Properties about users can change at any point in time for any user in all history. Therefore, full reloads for your user dimension table are usually a good call. However, with your orders table, it may start small and you could reprocess it completely into a fact table and pull it into an analytics table at any point in time. Maybe you can reprocess all of your history. But at some point, you're going to scale to where it's prohibitively expensive from a computational perspective to actually scale out the reprocessing your orders table.
That's why there are incremental updates, which process only the newest data available. That is cheaper computationally, and it scales really well. But the logic around it becomes much more difficult, because you have to start reasoning about what inputs you need to correctly process the right amount of new arriving data. A lot of this is figuring out when the last the job ran, what new data has arrived since, and what parts of the data may or may not be updated historically. Whenever something changes, let's say your business logic changes or updates, or you're remapping a new column and you're changing the output values, you might need to reprocess historical data.
This is really what I mean by backfilling: anytime you have to reprocess historical data, the word "backfill" traditionally means when you miss a certain time window of data and you want to literally fill in that time window that you missed. But when I'm referring to backfilling, I really mean anytime you have to reprocess it, whether it's because you missed it, or because you are changing logic and you want the output to look slightly different.
So let's move on from this. We will mostly be talking about incremental processing today. And how you can optimize your data storage to improve your ability to backfill.
In this case, we're talking about partitioning. Partitioning is particularly important for incremental processing, because it allows you to separate your data on some predefined field. Usually, this is both a logical and physical separation, meaning you have to query specific partitions, but also physically, when they're stored on disk, they are usually separate files.
In this screenshot, you can see a table with users by date. If you partition it, in this case, by the month that the user signed up, you can actually see the three different partitions that exist there, each with all of the data for that month.
Partitioning is particularly important for incremental processing, because it allows you to update only the parts of the data that you care about. If you're processing one, two, or three days at a time, you're only touching those three physical partitions without changing anything else historically. This allows for much faster reads and writes, especially if you're accessing a limited number of partitions. Typically, you want to limit this to as few as possible.
But some systems will allow you to scale that part up. This is really available in most data warehouses. Snowflake's a little bit special here, because they have their own notion of micro-partitions. And you can't control that, Snowflake decides for you. But if you're using something like BigQuery, or Databricks, then you are probably already partitioning your data because those are fairly native concepts in those systems.
Finally, partitioning allows you to backfill one partition at a time, which as I mentioned, creates faster writes. But it also introduces a unit of parallelism that you can exploit in the future, by parallelizing how many backups are running simultaneously. And if they're all touching partitions that are completely disjointed, then you can achieve much faster backfills; especially when it's over a longer period of time.
So now that we know about how to store the data more efficiently for incremental processing and backfilling, let's talk about how to optimize our transformations for backfilling and incremental processing.
There are really two parts to this that are important. One is how you're expressing the business logic in your data processing. And the second one is how you're using the input data for your transformations in your data process.
So let's start with the business logic. The most important thing is to use item pound and pure functions only. So let's break this down. First off, what does that mean? It means rerunning the same function, the same logic is going to produce the same result no matter how many times you run it.
So a counter example to this is "x equals x plus one." In SQL, this might be the case when the column is not zero, so then "column equals column plus one." If I rerun this five times, X will have incremented. The reason I'm pulling functions is problematic, because when you go to backfill, when you reprocess your data, you'll find cases where you change data in a way that you do not expect.
You want most of your logic to result in the same thing no matter how often you run it. Let's say you needed to backfill something and then you updated your logic and reran it again, you don't want the results to change every single time you rerun that processing on that partition.
Secondly, pure functions mean that the function will have the same output for the same input. Most transformation that you're doing is probably already pure. Like remapping country codes to country names, or mapping IP addresses to regions; these are pure functions. For the most part, I'm not going to get into the IP range problem. But these these are pure functions that will always produce the same output for the same input.
A good counter example: anything that uses current timestamp, or current date, or some environmental variable that changes. These aren't pure functions, because their output will depend on the state of the system at the time. So for example, if you're computing churn using current timestamp, and you're saying user, when a user left the system and deleted their account, compared to a current timestamp, or take the time difference of that, if I run that today, and if I run that tomorrow, I'm gonna get different results. And that's not going to be good, because let's say I need to reprocess a user that churned a month ago. I'll reprocess that partition for that information. My "days since churn" is going to be different after I rerun it, because even though I have the same input of the same user, I do not have the same output because I have a variable that is dependent on the environment. And that is always changing with every process.
Once you've locked down your business logic into only using pure and important functions, you can make sure to parameterize your input data. So the important part of incremental processing is deciding which data you need, every single time you run that incremental processing. And part of that is parameterizing that, usually across time ranges.
Going back to that orders example, if I have orders coming in every day, I might want to process the last week of orders. Maybe orders can get backfilled and their status can change in the system. Or I might want to reprocess the last three days of orders or one day of order or two hours of orders. But that logic needs to be parameterized. I will want to be able to process some arbitrary time range, especially when I'm backfilling. When backfilling, I don't just process the last seven days, I might want to process the seven days from a month ago. By parameterizing the range of data that you're accessing, you're allowing yourself to have an easier wait time backfilling and reprocessing the historical data.
It's also important to default to something that works for that ongoing process. The default might be everything from seven days ago until now, and that "now" is a default, and you can change the end timestamp and parameterize that as you see fit.
Tools and frameworks
We've talked a little bit about storing your data efficiently, and building transformation so that you don't shoot yourself in the foot later when you're backfilling. Let's talk a little bit about the sorts of tools that you'll encounter, and how you can apply these techniques in those tools.
First, let's talk about frameworks. There are really two things that matter here. One is the processing framework. How can you express your business logic and your transformations? Second, all of your orchestration or execution frameworks. So that are going to actually run that transformation and processing both on a day-to-day basis, and maybe even for backfill.
Let's start with the processing frameworks. Really, this is where there are going to be two camps. One camp is going to be the code base processing frameworks. Typically that's Spark, but also MapReduce, BigPanda, anything that is code-based. I'll read a bunch of data and run code-based transformations.
Now, this is usually easier to test and control, as your transformations and your logic are now expressed as Python or as Java or Scala. And you can create functions around those and test those functions independently. Make sure they're pure. And know that your transformations are doing the right thing.
Now parameterization becomes very easy. It's a built in function of all languages, you can pass around parameters into functions, which generates the right input datasets for you.
SQL-based processing frameworks are a little bit tricky, because it becomes practically impossible to test individual transformation logic. At this point, you need to mostly test and validate the output of the system. Parameterization also becomes a lot trickier, and you typically need a layer on top of it.
So you need a framework that helps you do the parameterization of your SQL queries before they're being executed. For this, if you haven't heard of dbt, it stands for "data build tool" and is a great framework for helping you build SQL-based transformations and logic around your models. It takes care of the performer parameterization for you. And it helps you work through a lot of the trickiness around incremental processing that you might encounter otherwise.
Also, we've seen many folks just build their own SQL-based parameterization framework and templating, and Jinja-based SQL generation framework that can help.
The important thing to understand here is you will need to scale this out and it will become extremely complex. If you are building your own, take into account the fact that you are going to need to parameterize your input data sets. And you have that as a first-class feature in your in-house framework.
Finally, once once you've actually built out your logic, you know what it looks like and what it's written in, something needs to go and run it: both on a day-to-day basis, as well as whenever you want to backfill it. Ideally your data processing is the same as your backfilling logic. Those two should be pretty much the same from a logic perspective, not from an input perspective.
For orchestration, you need to be able to pass those parameters into your code. You're parameterizing those input data sets, and we need something to pass in what parameters are running, or the defaults. Typically, exit orchestration frameworks have some way to pass parameters into the jobs that they're running. Airflow is the most common example here. You can do everything from variables to a number of other things inside of Airflow to actually pass these parameters down. And I'm happy to answer more questions about that.
If you're not using Airflow, there are a couple of others. But the most important thing here is to make sure that you're able to actually populate these jobs with the right parameters when they are running on these frameworks.
Finally, I recommended not rebuilding your own SQL processing framework, but you are going to need a little bit of your own scripting, especially if you are going to run backfills. And you will, regardless of what data set you have. You will have to backfill it at some point in time. Typically running backfills is easier to do separately from the day-to-day processing.
So your orchestration-layer Airflow job is going to take your logic and run it with all those defaults, pushing the data forward every day. But having a script that takes the same logic in the same job and just passes in different backfill parameters is really important. Then you can run that separately from the day-to-day processing without having to pause the day-to-day and simply patching things historically.
Also, by having a separate script, you can run these things. These backfill jobs simultaneously and multiple partitions can process at the same time, whereas typically with orchestration frameworks, they want to run one version of a job at a time, and you would have to create multiple jobs, which has a lot of overhead that probably don't need.
In this case, make sure that your unit of parallelism is completely distinct. So if I'm updating data for July 1, I should not be touching July 2 at all, neither reading nor writing from it. Otherwise, I'm going to run into a problem, backfilling multiple partitions at the same time.
I sort of jumped the gun on this one a little bit, but having scripting around writing the transformations is really important for SQL-based processing. A lot of teams do roll their own SQL-templating framework, but be careful of having to really upkeep this.
The hard part here is that requirements for a framework like this are going to expand. You're going to want to generate more and more complex logic and have more complex expressions. At some point, it makes more sense to look into something like dbt, if you are going to pick a SQL-templating framework.
And finally, with SQL, you can't really test the logic itself, it's hard to isolate the logic for each individual column that you're outputting. So you're going to have to test the outputs and monitor the outputs. And this is really where a tool like Bigeye comes in to help you monitor the outputs itself rather than monitoring the logic.
Backfilling data: A real-world case study from Uber
With those concepts in mind, let's talk a little bit about the real-world example. So we're going to talk about Uber's experimentation platform use case. This is something that I worked on for two and a half years at Uber. We built a system to take records about which experiment each user is in, and aggregate them and then build an analytics tool around it so that our data science teams and our product teams could perform their own analytics around which experiments are driving the most value for the business and impacting metrics.
To start all this, we had a lot of raw exposure logs. And when we're talking about exposure logs, we mean logs that are sent anytime an experiment is evaluated for a given user.
You can see this diagram on the right, there was an SDK. That SDK would call our experimentation service and say, "What treatment should I show for this experiment for this user ID?" We would return back a specific group. And then we would log an event into Kafka saying this user was just entered into this experiment with this group.
Because we have 1000s of experiments by millions of users every day, that generated well over two terabytes a day of data and billions 10s of billions of records daily for just these raw logs. And logs were pretty straightforward.
The problem we had with these is sometimes there was late arriving data, because some of this logging was queued. Sometimes that would arrive late to Kafka, we had cross region, Kafka, sometimes the replication took longer time. And then other times, we just had mistakes and experiment configuration where there were typos, and we needed to correct them and clean them up after the fact. And so we needed to not only do that, but also we needed to reduce the volume of data that we were dealing with.
And so, we would process all of these raw logs into something that was cleaned up, we'd remove a lot of duplicates, we'd clean up a lot of the individual record level things, remap experiment configurations into human readable strings away from injury IDs, or UU IDs. And then we'd enrich the data with some experimental data. And this processing was obviously difficult.
We're dealing with terabytes of data per day, and then we have to remove duplicates, and clean it up and enrich it and spit that out. If we knew that data was late, or somebody made a mistake, in an experiment started a week ago, we have to reprocess a week's worth of data. And so with that, we needed to design something that would withstand the test of time, both for nailing the incremental processing going forward, as well as allowing us to reprocess the data historically.
And so what we did is we picked a partition that's on the date of the log event, once the log is published, it's not affected by any data before it or afterwards. And so we built a processing framework that just wrote a single day's worth of logs every single time we didn't, we couldn't get more granular than that. But we didn't need to, we simply said each run will process a day's worth of data. And that run usually would take about an hour to go to run the transformations themselves. We wanted to make sure that we could test them because they are fairly straightforward row-0level enrichment and processing transformations.
We wrote all this in Spark running Java methods over each record, and then we were able to test each individual method. The reason we avoided SQL for this is because there was some fairly complex logic, not just not the joining, but more around the filtering of what we consider a duplicate and what is considered an invaluable record or an event that we don't want to store in our process logs
The most important thing here is we parameterize the date of the raw logs that we wanted to read. Going back to that first data structure, because we were outputting one day's worth of log every single time, we would also want to only read one day's worth. Parameterizing that allowed us to actually process any day as we see fit.
So finally, when we got to the orchestration level, running the job meant that we would normally run the job for today's date. And after midnight, we processed yesterday's data and then today's, so we would just pass that default parameter through the orchestration framework every single time. Then, we wrote a small backfill script that took the same transformation and the same job that we were using to do the processing incrementally going forward.
That would pass in a user-defined data read for backfilling. So if I had to reprocess seven days worth of data because we had late-arriving data, then I could parallelize that and run one job for each backfill date that I wanted. And all seven of those would run simultaneously. I could backfill all seven partitions in seven commands that were all pretty much the same thing, or one command with a loop. That made it very easy for us to run these backfills since we knew that the logic was exactly the same.
We didn't have to worry about having separate logic for backfill or for the data data transformations. That removed a lot of headache from from our backfilling process, because we did that work upfront to design the data and the transformations in a way that made it easy for us to run the backfills in the future.
Hopefully, that puts it all together and showed what we did with Uber's experimentation platform, applying the lessons that I gave a little bit earlier. Thanks again for joining and have a good rest of your week!
Schema change detection