Snowflake Streaming: Loading from AWS S3 using SnowPipe with Change Data Capture (CDC) to DW
The landscape of data management has drastically changed over the past few decades, shifting from traditional on-premise systems to cloud-based solutions that promise scalability, flexibility, and real-time capabilities. The rise of cloud computing has revolutionized data storage, processing, and analysis, making it possible to handle vast amounts of data with unprecedented speed and efficiency.
In this context, real-time data processing emerged as a critical need for businesses aiming to remain competitive in a fast-paced world. In particular, Change Data Capture (CDC) has become a key technique for capturing changes in databases and immediately reflecting them in data warehouses. CDC allows organizations to track every change made to their data, ensuring they always have access to the most up-to-date information, often with little to no delay.
The combination of cloud platforms like Snowflake and cloud services like AWS S3, SQS, and SnowPipe has made real-time (or near real-time) data ingestion and processing easier than ever before. This shift allows businesses to not only capture real-time transactional data but also perform continuous analytics on it, driving faster, more informed decisions.
In this guide, we will explore how to harness the power of Snowflake Streaming — a cutting-edge tool that enables automatic and real-time data loading from AWS S3 using SnowPipe, alongside the power of Change Data Capture (CDC). By the end of this walkthrough, we will have set up a seamless pipeline that takes data from S3 bucket source, ingests it in near real-time, and feeds it into a Snowflake data warehouse, ready for analytics and reporting. This solution ensures that data remains fresh and actionable, enhancing your organization’s ability to make timely, data-driven decisions.
We will split this um 3 main configurations:
1.Configuring SnowFlake Databases;
2.Configuring AWS Resources;
3.Configuring a CDC Type 2;
So stop “blablabla” and let’s work!
1.Configuring SnowFlake Databases
First of all, let’s create our SnowFlake Databases and tables Structure.
- Creating our SnowFlake Database
Snowflake Tables architecture
Selecting a Database STAGING, let’s create some tables:
CREATE TABLE customers (
customer_id INTEGER PRIMARY KEY,
customer_name VARCHAR(50) NOT NULL,
email VARCHAR(50) UNIQUE,
address VARCHAR(100)
);
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
product_name VARCHAR(50) NOT NULL,
description VARCHAR(500),
price DECIMAL(10,2) NOT NULL
);
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
sale_date DATE NOT NULL,
quantity INTEGER NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
Selecting a Database DW, let’s create some tables and insert some Data:
CREATE TABLE dim_customers (
customer_sk INTEGER AUTOINCREMENT PRIMARY KEY,
customer_id INTEGER NOT NULL UNIQUE,
customer_name VARCHAR(50) NOT NULL,
email VARCHAR(50) UNIQUE,
address VARCHAR(100)
);
CREATE TABLE dim_products (
product_sk INTEGER AUTOINCREMENT PRIMARY KEY,
product_id INTEGER NOT NULL UNIQUE,
product_name VARCHAR(50) NOT NULL,
description VARCHAR(500),
price DECIMAL(10,2) NOT NULL
);
CREATE TABLE dim_dates (
date_sk INTEGER AUTOINCREMENT PRIMARY KEY,
date DATE NOT NULL UNIQUE,
day INTEGER NOT NULL,
month INTEGER NOT NULL,
year INTEGER NOT NULL,
quarter INTEGER NOT NULL
);
CREATE TABLE fact_sales (
sale_sk INTEGER AUTOINCREMENT PRIMARY KEY,
sale_id INTEGER,
customer_sk INTEGER NOT NULL,
product_sk INTEGER NOT NULL,
date_sk INTEGER NOT NULL,
quantity INTEGER NOT NULL,
FOREIGN KEY (customer_sk) REFERENCES dim_customers(customer_sk),
FOREIGN KEY (product_sk) REFERENCES dim_products(product_sk),
FOREIGN KEY (date_sk) REFERENCES dim_dates(date_sk)
);
This kind of date is usefull to build some queries and searchs.
INSERT INTO DIM_DATES ( DATE,DAY,MONTH, YEAR, QUARTER)
WITH date_range AS (
SELECT
DATEADD(day, ROW_NUMBER() OVER (ORDER BY seq4()) - 1, '2020-01-01') AS date
FROM
TABLE(GENERATOR(rowcount => 2191)) -- 2191 days between '2020-01-01' and '2025-12-31'
),
date_components AS (
SELECT
date,
EXTRACT(day FROM date) AS day,
EXTRACT(month FROM date) AS month,
EXTRACT(year FROM date) AS year,
EXTRACT(quarter FROM date) AS quarter
FROM
date_range
)
SELECT date,day,month, year, quarter FROM date_components
Now we have our base structure created, let’s advance at AWS portal to configure Policies, Roles, S3 Bucket and SQS.
2.Configuring AWS Resources
- Creating AWS IAM Policy
Choose IAM [Identity & Access Management] from AWS console.
I’ll share you a JSON to configure it:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": [
"arn:aws:s3:::<Bucket_s3>",
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::Bucket_s3>",
"*"
]
}
]
}
- Creating AWS IAM Role
- Creating an Integration between Snowflake and AWS
The identity generated on AWS is stored in Snowflake. This helps avoid passing credentials during every transaction.
This command must be running inside Snowflake.
CREATE STORAGE INTEGRATION S3_Snowflake
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<ARN_IAM_ROLE>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/')
- Updating our AWS IAM Role
After creating the integration object, run the command and get values of STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID.
DESC INTEGRATION S3_Snowflake
The IAM role created needs to be updated with the new values retrieved from the Snowflake integration.
In the AWS console search for IAM and edit JSON putting values of STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID.
{
"Version": "2012-10-17",
"Statement": [
{ "Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<snowflake_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<snowflake_external_id>"
}
}
}
]
}
- Creating an External SnowFlake Stage
We need a External Snowflake Stage to be able to extract data from S3 using Snowpipe.
We’re just showing a creating to Customers Table, but you should do the same for all tables of Staging Database.
create stage stageCustomer
url = 's3://<bucket>/<Folder>/'
storage_integration = S3_Snowflake;
- Creating a Pipe to Auto Ingesting Data
Pipe will help us with near real-time streaming data.
We’re just showing a Pipe creating to Customers Table, but you should do the same for all tables of Staging Database.
create pipe S3_Snowflake auto_ingest=true as
copy into customers
from @stageCustomer
file_format = (type = 'CSV');
The statement assumes that there is a table called customers within the database and its structure is the same to CSVs structure that will be uploaded to the S3 bucket.
- Configuring SQS on AWS S3 Bucket
We need to create an event notification on S3. This event notifier will inform the Snowpipe when a new CSV is uploaded to the S3 bucket.
Execute Show pipes command on Snowflake worksheet and copy the ARN of the SQS queue for the stage from the notification_channel column.
SHOW PIPES
Save this information and go to AWS S3 Bucket Properties and complete the SQS queue ARN with the value returned from command above.
The near real-time streaming of data between S3 and Snowflake is done. Upload a CSV to S3 bucket and try refreshing the corresponding table on Snowflake. You will see the data populated on the corresponding Snowflake table.
- Some important and helpfull commands Snowflake
-- Check information about PIPE
SELECT SYSTEM$PIPE_STATUS ('S3_Snowflake_pipe')
-- Force Pipe to refresh if some data need to be processed
ALTER PIPE S3_Snowflake_pipe REFRESH
-- Resume our Pause a Snowpipe
ALTER PIPE S3_Snowflake_pipe SET PIPE_EXECUTION_PAUSED = <FALSE/TRUE>;
-- Check information about table
DESCRIBE TABLE CUSTOMERS;
-- Use this command to check if Loaded by Snowpipe was success.
SELECT *
FROM snowflake.account_usage.copy_history
ORDER BY LAST_LOAD_TIME DESC;
3.Configuring a CDC Type 2
At this step of guide, we will configure a Stream to capture Data changes at our tables on Staging Database.
Based on this changes, we will ingest data on DW Databases tables.
Microsoft has a nice post about Change Data Capture types that you can check here.
First, let’s create our STREAM responsable for capture changes of our source table.
USE STAGING
GO
-- CUSTOMERS
CREATE STREAM CUSTOMERS_STREAM ON TABLE CUSTOMERS;
-- PRODUCTS
CREATE STREAM PRODUCTS_STREAM ON TABLE PRODUCTS;
-- SALES
CREATE STREAM SALES_STREAM ON TABLE SALES;
Now we have our Streaming tables, we can configure the CODE responsible for get this information and send to DW tables:
USE DW
GO
INSERT INTO DW.PUBLIC.DIM_CUSTOMERS (CUSTOMER_ID, CUSTOMER_NAME, EMAIL, ADDRESS)
SELECT CS.CUSTOMER_ID, CS.CUSTOMER_NAME, CS.EMAIL, CS.ADDRESS
FROM STAGING.PUBLIC.CUSTOMERS_STREAM CS
WHERE CS.METADATA$ACTION = 'INSERT'
INSERT INTO DW.PUBLIC.DIM_PRODUCTS (PRODUCT_ID, PRODUCT_NAME, DESCRIPTION, PRICE)
SELECT PS.PRODUCT_ID, PS.PRODUCT_NAME,PS.DESCRIPTION, PS.PRICE
FROM STAGING.PUBLIC.PRODUCTS_STREAM PS
WHERE PS.METADATA$ACTION = 'INSERT'
INSERT INTO DW.PUBLIC.FACT_SALES (SALE_ID, CUSTOMER_SK, PRODUCT_SK, DATE_SK, QUANTITY)
SELECT SS.SALE_ID,
(SELECT MAX(CUSTOMER_SK) FROM DW.PUBLIC.DIM_CUSTOMERS DM WHERE DM.CUSTOMER_ID = SS.CUSTOMER_ID) AS CUSTOMER_SK,
(SELECT MAX(PRODUCT_SK) FROM DW.PUBLIC.DIM_PRODUCTS DP WHERE DP.PRODUCT_ID = SS.PRODUCT_ID) AS PRODUCT_SK,
(SELECT MAX(DATE_SK) FROM DW.PUBLIC.DIM_DATES DD WHERE DD.DATE = SS.SALE_DATE) AS DATE_SK,
SS.QUANTITY
FROM STAGING.PUBLIC.SALES_STREAM SS
WHERE SS.METADATA$ACTION = 'INSERT'
When looking for Metadata$Action = ‘INSERT’ we’re getting operations about INSERT and UPDATE.
Here are a basic Query that we could do on DW data:
SELECT DD.QUARTER
, DD.YEAR
, SUM(DP.PRICE) AS TOTAL_PRICE
, SUM(FS.QUANTITY) AS TOTAL_QUANTITY
FROM DW.PUBLIC.FACT_SALES FS
INNER JOIN DW.PUBLIC.DIM_CUSTOMERS DC ON FS.CUSTOMER_SK = DC.CUSTOMER_SK
INNER JOIN DW.PUBLIC.DIM_PRODUCTS DP ON FS.PRODUCT_SK = DP.PRODUCT_SK
INNER JOIN DW.PUBLIC.DIM_DATES DD ON FS.DATE_SK = DD.DATE_SK
GROUP BY DD.QUARTER, DD.YEAR
ORDER BY DD.QUARTER, DD.YEAR
With this kind of DW solution, we can use our DW tables as a Version storage as well, because of SK column (Substitute Key), if some data was updated on STAGING database, a new row will be inserted on DW and you can check what information you had before.
To simulate some operations we can do this steps:
- Insert some rows manually;
- Select a stream “table” created before;
- Run the code insert into above; and
- Check the date on DW tables;
We’ll not do this now, because our start process will start when some CSV was uploaded on S3 Bucket, but we can Scheduler step 3 by configuring TASKS! Let’s do it:
- Configuring TASK on Snowflake
How we can configure? we should configure our TASKs based on a sequence, so our fact_tables must be populated after dimension tables has been inserted!
We’ll do this configuring a Scheduler to run each one minute for any tables!
Attention: This is not the better solution, is just a solution to show how TASK can be used in snowflake.
CREATE OR REPLACE TASK DIM_CUSTOMER_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
INSERT INTO DW.PUBLIC.DIM_CUSTOMERS (CUSTOMER_ID, CUSTOMER_NAME, EMAIL, ADDRESS)
SELECT CS.CUSTOMER_ID, CS.CUSTOMER_NAME, CS.EMAIL, CS.ADDRESS
FROM STAGING.PUBLIC.CUSTOMERS_STREAM CS
WHERE CS.METADATA$ACTION = 'INSERT';
CREATE OR REPLACE TASK DIM_PRODUCTS_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
INSERT INTO DW.PUBLIC.DIM_PRODUCTS (PRODUCT_ID, PRODUCT_NAME, DESCRIPTION, PRICE)
SELECT PS.PRODUCT_ID, PS.PRODUCT_NAME,PS.DESCRIPTION, PS.PRICE
FROM STAGING.PUBLIC.PRODUCTS_STREAM PS
WHERE PS.METADATA$ACTION = 'INSERT';
CREATE OR REPLACE TASK FACT_SALES_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
INSERT INTO DW.PUBLIC.FACT_SALES (SALE_ID, CUSTOMER_SK, PRODUCT_SK, DATE_SK, QUANTITY)
SELECT SS.SALE_ID,
(SELECT MAX(CUSTOMER_SK) FROM DW.PUBLIC.DIM_CUSTOMERS DM WHERE DM.CUSTOMER_ID = SS.CUSTOMER_ID) AS CUSTOMER_SK,
(SELECT MAX(PRODUCT_SK) FROM DW.PUBLIC.DIM_PRODUCTS DP WHERE DP.PRODUCT_ID = SS.PRODUCT_ID) AS PRODUCT_SK,
(SELECT MAX(DATE_SK) FROM DW.PUBLIC.DIM_DATES DD WHERE DD.DATE = SS.SALE_DATE) AS DATE_SK,
SS.QUANTITY
FROM STAGING.PUBLIC.SALES_STREAM SS
WHERE SS.METADATA$ACTION = 'INSERT';
To enable our TASK we can do this by the command:
ALTER TASK DIM_CUSTOMER_TASK RESUME;
ALTER TASK DIM_PRODUCTS_TASK RESUME;
ALTER TASK FACT_SALES_TASK RESUME;
Done!
Now with this basic and demonstrate solution we did an Importation of data from S3 Buckets to SnowFlake using SQS and Snowpipe and inside Snowflake we built a stream CDC type 2 to populate our DW database.
Conclusion
By combining the power of Snowflake’s cloud architecture with the flexibility of AWS S3, SQS, and Snowpipe, we’ve successfully created a highly efficient, near real-time data streaming pipeline. This solution allows us to automate the ingestion of data, ensuring that our data warehouse is always up-to-date and ready for analysis. With the implementation of Change Data Capture (CDC), we can ensure that even as data evolves, our historical records remain intact, providing valuable insights into business trends over time. The flexibility offered by Snowflake’s streaming and task scheduling capabilities not only simplifies the data integration process but also opens up a world of possibilities for advanced analytics and reporting. By automating key processes such as data loading and transformation, businesses can focus on driving actionable insights, improving operational efficiency, and making more informed decisions in real-time. The solution we’ve built serves as the foundation for scalable, efficient, and dynamic data pipelines, ensuring that data is always available and ready to deliver business value.