Event Driven Databricks ETL with Azure Data Factory
Databricks Notebooks are commonplace at my company, they’re the perfect combination of notebooks and PySpark for EDA and simple Spark jobs. Notebooks are controversial to say the least; they encourage bad coding standards and have a non-linear flow to their code.
However, they do have their uses.
The Problem
One of our clients runs Adobe Analytics as one of their analytics tools and they wanted to use the data collected by Adobe in our attribution solution. They’ve been running their Adobe instance longer than they’ve been a client with us so asked whether it was possible to convert and then add that data into our services.
Adobe Analytics offers an option to export the data collected in something called a data feed which is a compressed folder of hit data in TSV format with a bunch of TSV lookup tables. Data feeds can be exported on a daily basis or hourly. Unfortunately, the unique zipper folder format makes it unwieldy when it comes to processing this data as it’s not as simple as simply loading a single file and processing.
For our tech stack, we needed the data in our internal format so we can run it through the same processing pipeline we use with our own collection technology.
The Plan
The Conversion Workflow
This is where Databricks notebooks comes into the picture. An entire day’s hit data is almost guaranteed to be several gigabytes worth of data if not more (e.g. black friday) so we wanted to use PySpark to convert the data due its ability to handle gigabytes of data easily. As this conversion process sits outside of our main data processing pipeline, it gave us a fresh opportunity to develop a new method of using Databricks notebooks in our tech stack.
As I mentioned above, notebooks are not my favourite piece of tech, yet we decided to use them for this purpose for two main reasons:
- It made developing a conversion workflow really simple given that the Adobe data feed format is not the most user friendly of data formats.
- The conversion job is only ever to be run once, even for new clients. Once we have the converted data we have no further immediate use for the conversion workflow and this allows us to be a little more relaxed with the tech.
An alternative to notebooks are spark-submit
jobs which I mention at the foot of this blog post.
Running The Conversion Workflow
Seeing as we had a fresh slate to work with with this conversion process, we had free reign with working out how to trigger this workflow. The data feeds would take between 30-40 minutes to export a day’s worth of data which causes issues with attempting to run this manually or on a batch as we had no accurate method of predicting when exactly the compressed folder would land in storage.
However, Azure Data Factory (ADF) offers an event trigger that is perfect for this use case. Adobe data feeds exports directly to Azure storage so all we would need is an ADF pipeline that is triggered on a new file being uploaded to a given storage account. Combining this with ADF’s ability to run Databricks notebooks, it’s a good orchestrator for this conversion pipeline.
Implementation
Overview:
Databricks
Our conversion process consists of two steps:
- Extraction + lookups
- Transformation
Step 1. is where the folder (in ZIP Dump) is unzipped and the lookups are joined to the hit data which is upserted to delta format in storage (Dataset). Step 2. takes this delta, transforms it to the internal format and upserts it to a different delta location also in storage (Converted Data). The motivation behind separating this into a two part workflow is that the extracted hit data with lookup information is of actual use to us. If we wanted to dive deeper into the Adobe data feed format, we would have this ready in this intermediary format and this feeds into my company’s goals for self serve analytics.
These two steps were encapsulated in their own notebook respectively and each supplied a list of notebook parameters in the form of widgets which can be populated via the ADF interface. We used a pretty rigid folder structure in storage for this process so all we needed to pass to each notebook is the following:
Extraction and lookup - container name, file name and a mount name for mounting storage to DBFS.
Transformation - client name (added to dataframe), container name, date to process (data partitioned by date, only loads in date necessary) and mount name again.
Azure Data Factory
Time to orchestrate these bad boys.
The beauty of ADF is how quickly a working solution can be created. Whilst ADF has downsides (e.g. no version control outside of Azure DevOps and GitHub Enterprise on-premises), one thing it is very good at is getting something quick and simple up and running. The workflow is as follows:
The pipeline has four parameters:
- File name - name of zipped folder landing in storage
- Container - added as parameter in case we wanted to change the container in the future
- Client name - in case we want to run this for other clients
- Mount name - for naming the mount for DBFS
Then the pipeline itself is simply comprised of the two aforementioned notebooks.
extraction-notebook
is the extraction and lookup notebook and has the following parameters filled:
All of which are extracted from the pipeline.
transformation-notebook
is the transformation work and uses the following filled parameters:
Most of which are extracted from the pipeline and date
is populated with the result of the previous step. To identify which day to load in to transform from the intermediary delta location, the date in ISO format is taken from the compressed folder name which is of the format: <report_name>_<ISO_date>.zip
. This date is then outputted from the notebook as so:
Where DATE
refers to the date of the exported data. This date is then picked up by ADF and can be used in future steps using the activity variable:
@activity('extraction-notebook').output.runOutput
Save locations for the various deltas and the saving logic is all contained within the notebooks themselves (we’ve had many issues with Azure Data Factory so feel it’s best placed to act as a pure orchestrator).
The trigger is a simple event trigger with the following parameters:
- File name is the
fileName
from the trigger body Container, client name and mount name are all hardcoded - this is only designed as a one off job not a permanent solution so this works for us
Results
It works flawlessly. We set Adobe Analytics to start exporting ZIP files to a given location we have and over the span of a night, it exported a full month’s worth of data that was then successfully converted to our internal format.
Looking at the logs from the run, we get this:
As can be seen, they all ran successfully. Let’s look at the delta, the below is the converted delta grouped by date and rows counted:
And there are all the days of data loaded in!
Conclusion
Combining Databricks notebooks and Azure Data Factory was a really simple and quick solution to this problem we faced. However, you may have noticed I’m not the greatest fan of either but this is a fantastic first step in moving away from our legacy workflows.
Next steps:
- Investigate how to move spark jobs away from Databricks notebooks and use their
spark-submit
technology and / or Azure HDInsight. This should allow us better version control and better code review workflows. - Investigate better orchestration methods to Azure Data Factory. This will allow us to use our existing git setups for version control and also allow better code review workflows too.