Building a Data Pipeline with Delta Lake, Spark, and Databricks
Delta Lake is an open-source storage layer that brings ACID (Atomicity, Consistency, Isolation, Durability) transactions and scalable metadata handling to big data workloads. It’s tightly integrated with Apache Spark and Databricks, making it ideal for building robust and scalable data pipelines.
This article will guide you through a hands-on project that simulates real-world challenges. We’ll explore:
- Importing CSV files into Databricks.
- Writing code to process data with Delta Lake.
The pipeline will include schema management, upserts, and SQL-based transformations while showcasing how to optimize data management for analytics.
1: Importing Csv Files into Databricks
Step 1: Setting Up Your Environment
Before we start, ensure you have:
- A Databricks workspace set up.
- A running cluster.
Step 2: Uploading CSV Files
- Open the Databricks console.
- Navigate to Data > Create Table > Upload File.
- Select the CSV files you want to import and specify the target directory as
dbfs:/FileStore/tables/datalake/csv
.
Note: We’re not creating actual tables here. This process is just to upload the files.
Choosing DBFS target directory and import .Csv Files:
2: Writing code to process data with Delta Lake
Before start, here are our relations diagram between csv (Tables) we’ve imported.
Step 1: Inspect the Directory
The command bellow is used in Databricks to list the contents of a directory in Databricks File System (DBFS).
Remember: This directory MUST be the directory we used when we imported csv files.
dbutils.fs.ls("dbfs:/FileStore/tables/datalake/csv")
Step 2: Creating Delta Tables
Convert each CSV file into a Delta table for structured and performant querying.
# Get Csv Files
files = dbutils.fs.ls("dbfs:/FileStore/tables/datalake/csv")
# Directory to save our Datalake Delta
delta_path = "dbfs:/FileStore/tables/datalake/delta"
datalake_extension = "delta"
# Extract file names
for file in files:
file_path = file.path
dataframe = spark.read.format("csv").option("header", "true").option("delimiter", ";").option("inferSchema", "true").load(file_path)
# Getting the name of file to concat
if file.name.endswith(".csv"):
file_name_without_extension = file.name[:-4] # Remove the last 4 characters (".csv")
# Write to Delta
dataframe.write.format("delta").mode("overwrite").save(f"{delta_path}/{file_name_without_extension}.{datalake_extension}")
Step 3: Adding Data to Delta Tables
Add new rows to an existing Delta table.
# Loading some Data at Delta Tables
df = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/categories.delta")
# Adding a new row to Delta
new = spark.createDataFrame([(9, "coffee", "Moka pot, Aeropress, cappuccino")], df.schema)
new.write.format("delta").mode("append").save("dbfs:/FileStore/tables/datalake/delta/categories.delta")
Step 4: Handling Upserts
Upserts allow you to merge new and existing data seamlessly.
#upsert
from delta.tables import DeltaTable
# Loading tables as DeltaTable again
deltaTable_orders = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/datalake/delta/orders.delta")
deltaTable_order_details = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/datalake/delta/orderdetails.delta")
# New values to be inserted
new_order = spark.createDataFrame([(11078, "ALFKI", 1, "2023-08-01")], ["OrderID", "CustomerID", "EmployeeID", "OrderDate"])
new_order_details = spark.createDataFrame([(11078, 1, 18, 3)], ["OrderID", "ProductID", "UnitPrice", "Quantity"])
deltaTable_orders.alias("orders").merge(
new_order.alias("newOrder"),
"orders.OrderID = newOrder.OrderID")\
.whenMatchedUpdate(set = {"CustomerID" : "newOrder.CustomerID", "EmployeeID" : "newOrder.EmployeeID", "OrderDate" : "newOrder.OrderDate"})\
.whenNotMatchedInsert(values = {"OrderID" : "newOrder.OrderID", "CustomerID" : "newOrder.CustomerID", "EmployeeID" : "newOrder.EmployeeID", "OrderDate" : "newOrder.OrderDate"})\
.execute()
deltaTable_order_details.alias("order_details").merge(
new_order_details.alias("newOrderDetails"),
"order_details.OrderID = newOrderDetails.OrderID AND order_details.ProductID = newOrderDetails.ProductID")\
.whenMatchedUpdate(set = {"UnitPrice" : "newOrderDetails.UnitPrice", "Quantity" : "newOrderDetails.Quantity"})\
.whenNotMatchedInsert(values = {"OrderID" : "newOrderDetails.OrderID", "ProductID" : "newOrderDetails.ProductID", "UnitPrice" : "newOrderDetails.UnitPrice", "Quantity" : "newOrderDetails.Quantity"})\
.execute()
Step 5: Reading some data
# Reading some data using a basic filter
df_orders = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orders.delta").filter("OrderID == 11078")
df_orders.show()
df_order_details = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/orderdetails.delta").filter("OrderID == 11078").filter("ProductID == 1")
df_order_details.show()
Step 6: Querying and Transforming Data
Creating some temporary views from Dataframe. The views are only accessible within the session it is created in, enabling you to use SQL queries to manipulate the data as if it were a table.
# Directory Datalake Delta
delta_file = "dbfs:/FileStore/tables/datalake/delta/"
delta_path = dbutils.fs.ls(delta_file)
datalake_extension = "delta"
# Extract file names
for file in delta_path:
df = spark.read.format("delta").load(file.path[:-1]) # Remove the last character ("/")
df.createOrReplaceTempView(file.name[:-7]) # Remove the last 7 characters (".delta/")
join_query = """
SELECT orderdetails .OrderID AS OrderID, orderdetails .Quantity , orderdetails .UnitPrice as UnitPrice,
products.ProductID as ProductID, products.ProductName as Product, suppliers.CompanyName AS Suppliers,
employees.LastName as Employee, orders.OrderDate as Date, customers.CompanyName as Customer
FROM orders
JOIN orderdetails ON orders.OrderID = orderdetails .OrderID
JOIN products ON orderdetails .ProductID = products.ProductID
JOIN categories ON products.CategoryID = categories.CategoryID
JOIN suppliers ON products.SupplierID = suppliers.SupplierID
JOIN employees ON orders.EmployeeID = employees.EmployeeID
JOIN shippers ON orders.ShipVia = shippers.ShipperID
JOIN customers ON orders.CustomerID = customers.CustomerID
"""
df_result = spark.sql(join_query)
# Write result in a new Delta Table
df_result.write.format("delta").mode("overwrite").save(f"{delta_file}join.{datalake_extension}")
Step 7: Reading Delta using SQL
#Reading our Delta using SQL
df = spark.read.format("delta").load("dbfs:/FileStore/tables/datalake/delta/join.delta")
df.createOrReplaceTempView("OrdersJoin")
results = spark.sql("SELECT * FROM OrdersJoin WHERE OrderID = 10248 AND ProductID =11 ")
results.show()
Step 8: Cleaning Up
Delete all Delta tables when you’re done.
#Clean up
dbutils.fs.rm("dbfs:/FileStore/tables/datalake/delta/", recurse=True)
Key Features of This Pipeline
- Data Ingestion: Imported CSV files are transformed into Delta tables.
- Upserts: Seamless merging of new and existing data.
- SQL-Based Transformation: Combines the power of Spark with SQL for analytics.
- Data Lakehouse Architecture: Delta Lake provides a unified platform for storage, processing, and querying.
Conclusion
This pipeline demonstrates how Delta Lake simplifies big data management and analytics. By leveraging Databricks and Spark, you can efficiently handle large-scale data workloads with ACID guarantees, making it a robust solution for modern data engineering challenges.
To get all scripts and csv source files, go to my github.