Building a Formula 1 Data Platform with Azure Databricks and PySpark

Rafael Rampineli
3 min readSep 30, 2024

--

In this blog, I’ll walk you through how built a cloud-based Formula 1 data platform using Azure Databricks and PySpark. We will focus on ingesting raw race data from multiple sources, transforming it, and storing it in a Delta Lake to enable fast and reliable data analysis. This project leverages key technologies such as Azure Data Lake Storage, Delta Lake, and PySpark to handle massive amounts of data.

1. Project Overview: Formula 1 Cloud Data Platform

We’re ingesting, transforming, and analyzing Formula 1 race data to generate insights such as driver standings, constructor rankings, and race results. The solution integrates Azure Data Lake Storage (ADLS), Databricks Notebooks, and Delta Lake for efficient processing of data.

Key Components:

  • Data Ingestion from multiple CSV and JSON files.
  • Data Transformation using PySpark.
  • Storing transformed data in Delta format.

2. Setting Up the Azure Databricks Environment

First, create a cluster and a notebook in Azure Databricks. We are using Databricks Notebooks to write our PySpark code and orchestrate the workflows.

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Formula1DataPlatform") \
.getOrCreate()

This initializes the Spark session, which acts as the entry point for running our PySpark scripts.

3. Ingesting Circuit Data

We begin by ingesting the Circuits data from a CSV file. Here’s how we read the file and define the schema using PySpark.

Ingesting the Circuits File:

# Define the schema for circuits
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

circuits_schemas = StructType(fields=[StructField("circuitId", IntegerType(), False),
StructField("circuitRef", StringType(), True),
StructField("name", StringType(), True),
StructField("location", StringType(), True),
StructField("country", StringType(), True),
StructField("lat", DoubleType(), True),
StructField("lng", DoubleType(), True),
StructField("alt", DoubleType(), True),
StructField("url", StringType(), True),
])

# Read the CSV file
circuits_df = spark.read \
.option("header",True) \
.schema(circuits_schemas) \
.csv(f'{raw_folder_path}/{var_filedate}/circuits.csv')

circuits_df.show(5)

In this step, we define the schema explicitly and ingest the circuits data from ADLS into a DataFrame.

Transforming the Data:

Once the data is ingested, we clean it up by selecting and renaming columns, and we add metadata such as the ingestion date and environment.

from pyspark.sql.functions import lit

# Select and rename columns
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("circuitRef", "circuit_ref") \
.withColumnRenamed("lat", "latitude") \
.withColumnRenamed("lng", "longitude") \
.withColumnRenamed("alt", "altitude")

circuits_final_df.show(5)

Writing the Data to ADLS:

After transformation, we save the data as a Parquet file in ADLS for efficient querying.

circuits_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.circuits")

This allows us to store the circuits data in a Delta Table for downstream transformations and analytics.

4. Joining Race Results

The race results data is another key dataset in our platform. We ingest multiple related datasets such as drivers, constructors, circuits, and race results, and perform joins to create a unified view.

Ingesting and Joining Data:

We load the drivers, constructors, and circuits data from Parquet files and join them with the race results.

# Load dataframes
drivers_df = spark.read.format("delta").load(f"{processed_folder_path}/drivers") \
.withColumnRenamed("number","driver_number") \
.withColumnRenamed("name","driver_name") \
.withColumnRenamed("nationality","driver_nationality")

constructors_df = spark.read.format("delta").load(f"{processed_folder_path}/constructors") \
.withColumnRenamed("name","team")

circuits_df = spark.read.format("delta").load(f"{processed_folder_path}/circuits") \
.withColumnRenamed("location","circuit_location")

races_df = spark.read.format("delta").load(f"{processed_folder_path}/races") \
.withColumnRenamed("name","race_name") \
.withColumnRenamed("race_timestamp", "race_date")

results_df = spark.read.format("delta").load(f"{processed_folder_path}/results") \
.filter(f"file_date = '{var_filedate}'") \
.withColumnRenamed("time","race_time") \
.withColumnRenamed("race_id","result_race_id") \
.withColumnRenamed("file_date","result_file_date")

# Join DataFrames
races_circuits_df = races_df.join(circuits_df, on="circuit_id", how="inner") \
.select(races_df.race_id, races_df.race_year, races_df.race_name, races_df.race_date, circuits_df.circuit_location)

race_results_df = results_df.join(races_circuits_df, results_df.result_race_id == races_df.race_id, how="inner") \
.join(drivers_df, on="driver_Id", how="inner") \
.join(constructors_df, on="constructor_Id", how="inner") \
.select( races_df.race_id, races_df.race_year, races_df.race_name, races_df.race_date,
circuits_df.circuit_location,
drivers_df.driver_name, drivers_df.driver_number, drivers_df.driver_nationality,
constructors_df.team,
results_df.grid, results_df.fastest_lap, results_df.race_time, results_df.points, results_df.position, results_df.result_file_date) \
.withColumn("created_date", current_timestamp()) \
.withColumnRenamed("result_file_date", "file_date")

This step involves joining several dataframes to produce a race results table that combines race, driver, and constructor information​.

Partitioning and Writing to Delta Lake:

To optimize the data storage and querying, we partition the data by race year before saving it as a Delta table.

race_results_df.write.mode("overwrite").partitionBy("race_year").format("delta").saveAsTable("f1_presentation.race_results")

This partitioning allows for more efficient queries when analyzing the race results.

5. Full Solution Architecture:

6. Conclusion:

Through this project, we’ve demonstrated how to use Azure Databricks and PySpark to build a scalable data platform for processing Formula 1 race data. By leveraging Delta Lake and ADLS, we’ve ensured that the data is ingested, transformed, and stored in a format optimized for both batch and real-time analysis.

If you wanna see full solution architecture, please check it out on my github.

--

--

Rafael Rampineli
Rafael Rampineli

Written by Rafael Rampineli

I specialize in managing and optimizing complex database environments to ensure high availability, performance, and data integrity.

No responses yet