End-to-end automated Analytics workload using Cloud Functions — Data Fusion — BigQuery and Data Studio
Summary:
In this article we will demonstrate how to trigger an automated end to end data loading, transformation and visualization process using Google Cloud Platform. At the end of this exercise a Cloud Data Fusion Pipeline execution will be trigger automatically with a Cloud Function every time a new data file is uploaded to a Google Cloud Storage Bucket, this data pipeline will then perform some transformations on the data and load the results into a BigQuery table which feeds a report with a couple visualizations created in Data Studio.
Introduction:
Quite a common use case for businesses wanting to start leveraging data & analytics workloads for rapid realization of business value using the cloud without having to undergo a major database/data warehouse migration (which may easily span from a few weeks up to several months), is to quickly setup an automated data transformation pipeline and ingestion to a data analytics warehouse from which some new simple and cost effective report and/or visualizations are to be generated.
This article provides a detailed solution to do just that using Data storage, Processing and Analytics with Google Cloud Platform.
GCP Services used for the solution:
- Google Cloud Storage.
- Google Cloud Functions.
- Google Cloud Data Fusion.
- Google BigQuery.
- Google Data Studio.
Description of the solution we will put in place in this exercise:
The idea is to create a very simple report in Data Studio that feeds from a table previously created and loaded in BigQuery. The source data resides in a single cumulative CSV file on premise that needs to be somewhat transformed before being loaded into BigQuery, so a data transformation pipeline in Data Fusion handles the transformations required on the file and ingests the results in the BigQuery table, by truncating the table first (although with the change of literally one flag in the Data Fusion pipeline you may modify it to append data in the BigQuery table rather than truncating the table every single run). The source file is updated continuously and is uploaded to Google Cloud Storage many times a day in an on-demand fashion, we don’t know when a new file may arrive but we know we need to refresh the report with any new data that arrives as soon as it does, so we use a Cloud Function to trigger the Data Fusion pipeline upon the event of a new CSV file being uploaded to the Cloud Storage bucket. In this way, the whole process of data ingestion, transformation, load and visualization is triggered by simply uploading a new data file to a Google Cloud Storage bucket.
Main steps:
- Create the storage buckets we will use and prepare the source CSV file to be used to create the transformation pipeline.
- Create the Dataset and table in BigQuery that will store the data from the CSV file.
- Create the Data Fusion instance and assign proper roles to the Service Account it will use.
- Within the Data Fusion instance, create the Data Pipeline that will process the new CSV files and ingest them into the BigQuery table.
- Create the Cloud Function that will be triggered when a new file is uploaded to the defined Storage bucket, and will pass the newly added file to the Data Fusion pipeline for processing and ingestion to BigQuery.
- Create a simple report in Data Studio with some visualizations of the data in BigQuery.
Prerequisites:
- A GCP project with an associated billing account and at least a user with project editor role on the project. For this exercise we created a project named “simplecsvbqload”.
- Enable the APIs that will be required for the project:
- Cloud Build
- Cloud Functions
- Cloud Data Fusion
Steps:
1. Create buckets that will be used for the data processing:
Using the cloud shell, create the following buckets in Cloud Storage. You can name them however you like, they just need to be unique, you will need to provide these later in future steps (remember to replace the bucket name in brackets [], with your own bucket names):
gsutil mb gs://[YOUR_DATA_SOURCE_BUCKET]
Bucket for raw CSV source files to be uploaded to GCP (files uploaded to this bucket will trigger the data transformation and data load process, this bucket will be used when writing the Cloud Function as the triggering event of the function).
gsutil mb gs://[YOUR_CDAP_TEMP_BUCKET]
Staging bucket that the Data Fusion pipeline will use as temporary store while inserting data into BigQuery.
gsutil mb gs://[YOUR_CDAP_ERRORS_BUCKET]
Bucket that will hold the output of any errors during the data processing in the Cloud Data Fusion pipeline.
Finally, we will upload a sample CSV file with a subset of data to make it easier to derive the data structure and schema using the Wrangler later on when we build the data pipeline with Data Fusion.
gsutil cp SampleFile.csv gs://[YOUR_DATA_SOURCE_BUCKET]
As seen in the data preview above, the source files that will feed the process has the following schema:
- id: Long
- status_code: Long
- invoice_number: Long
- item_category: String
- channel: String
- order_date: Datetime
- delivery_date: Datetime
- amount: Float
There are some changes that need to be made prior to inserting the data into BigQuery, the date columns in the dataset are in the format dd-MM-yyyy H:mm and will need to be parsed to yyyy-MM-dd hh:mm in order for BigQuery to insert them correctly, also, in the float column the decimal separator are commas rather than points, we will also need to replace that to be able to insert the number as a float data type in BigQuery. We’ll perform all these transformations later on in the Data Fusion pipeline.
2. Create the Dataset and table in BigQuery:
As described in the previous section, we will use the schema as the input for creating the table in BigQuery that will hold the data.
Navigate to the BigQuery service and within you project, create a new dataset, for simplicity I named it dataset1 :
Then, within the newly created dataset, I just created a simple empty table and named it table1, with the same column names as the source CSV file and accommodating the corresponding data types. So it looks like this:
3. Create the Data Fusion Instance:
Enable the Cloud Data Fusion API if you haven’t done so already:
Once the API is enabled, navigate to Data Fusion in the left menu and click on CREATE AN INSTANCE:
Name your instance with any name you like, I named it cdf-test-instance.
Data Fusion leverages Cloud Dataproc as its underlying big data processing engine, this means that when a data pipeline is executed, Data Fusion spawns an ephemeral Dataproc cluster to perform the data processing for you and submits the pipeline processing to the Dataproc cluster as a spark job, once the execution of the job is finished, Data Fusion deletes the Dataproc cluster for you. Make sure to grant the service account that Data Fusion uses, the proper permissions to spawn Cloud Dataproc Clusters, it will prompt you to authorize it, when it does, click the GRANT PERMISSION button (also keep in mind the region in which the Data Fusion instance is deployed, needs to match the region in which the BigQuery dataset was created):
Click on the CREATE button, the instance creation will take a few minutes.
Once the Data Fusion instance is created, copy the Service Account Data Fusion is using and grant it the “Cloud Data Fusion API Service Agent” role by navigating to IAM and clicking the +ADD button, with this role assigned to the Data Fusion Service Account, Data Fusion can access data from/to other services such as Cloud Storage, BigQuery and Dataproc:
Now that the instance is created and we made sure the Service Account that Data Fusion uses has the required permissions, we’re ready to create the data transformation pipeline that will take the CSV source file, it will perform some transformations on it and it will load the data to table in BigQuery we created earlier.
4. Create the data pipeline in Data Fusion
In the Data Fusion section you’ll see the Data Fusion instance, click on the View instance link to access the Data Fusion UI:
The Data Fusion UI will be opened in another tab on your browser:
You may click on the Start Tour button if you’re not familiar with Data Fusion to take a look and get familiar with it. For the purposes of this exercise we will skip it for now, click in the No Thanks button.
We will begin designing the pipeline from scratch for clarity, so click in the Studio link under the Integrate card
You will be redirected to the studio page, first, give the pipeline a meaningful name in the top center of the screen (you will need to reference the pipeline name later on in the Cloud Function python script, when the Cloud Function references it to get the execution started):
Next, we will read the sample CSV file, we will just pass through the whole file to the next stage, and will parse the CSV in the following step.
Hover your mouse over the green GCS box that was added to the canvas when you clicked the GCS icon on the left menu. A “properties” button will appear, click on this button.
In the properties page of the GCS source component, set the “Path” property to: gs://[YOUR_DATA_SOURCE_BUCKET]/${FileName}
Where ${FileName} acts as a Runtime Argument variable that will be passed to the pipeline at execution time (we will define the variable later on as part of the pipeline’s metadata).
In the output schema section to the right, delete the offset field so that it does not get passed through to the next step in the pipeline, and only the body of the text gets passed on to the data wrangler component we will set up in the next step of the pipeline.
Next, we’ll add a wrangler operation to the canvas in which we will perform the transformations and parsing mentioned in step 1. To do this, expand the Transform menu to the left, and click on the Wrangler tool. It will get added to the canvas as seen in the blue box below:
Now, we need to connect the output of the CSV file read to the wrangler, as depicted below. Then, hover over to the wrangler and click on the properties button that appears to access the wrangler properties:
Here, you can put any name you like to the label, for now, we’ll leave everything by default, but we’ll change the Error Handling behavior from the default “Skip error”, to “Send to error port” so that we can redirect errors during pipeline execution to a file in GCS for further processing later. We could write the recipe for the transformation directly with Directives using JEXL syntax (https://commons.apache.org/proper/commons-jexl/reference/syntax.html), but for simplicity, and to take advantage of the Data Fusion UI, we’ll do it using the wrangler tool, whatever we do in the wrangler will later be translated into JEXL directives and displayed under the recipe here. Click in the WRANGLE button, below the directives section.
Now is when uploading the SampleFile.csv file to the source data bucket created in Step 1 will come in handy, in the GCS browser that opens, navigate to [YOUR_DATA_SOURCE_BUCKET] and select the file you uploaded in Step 1. By providing a sample of the data file to the wrangler, we allow the wrangler to infer the data structure and perform the data wrangling tasks intuitively.
The file contents will be displayed to the left, and the columns that will go out of the wrangler are displayed in the section to the right. As of now, the wrangler reads each line in the file as a single field called “body” and outputs it as it is. We’ll begin the transformations now:
First we will have the wrangler parse the file as a semicolon-delimited CSV file. Click in the little arrow pointing downwards to the left of the column name (body), and select the parse→ CSV option.
As the columns in the CSV file are separated with semicolon (;), we select custom delimiter in the pop-up that shows up and insert a semicolon in there, we also check the flag to treat the first row as headers, and then click Apply:
Now, the wrangler has separated all the input fields in the file and created a column for each as reflected in the output columns list to the right, it keeps the body input as a separate column, but we don’t need it so we’ll delete it by selecting it as we did before and choosing the “Delete column” option:
We can now begin with the parsing and transformations required to ingest the data into the BigQuery table.
By default, and as this is a CSV file, Data Fusion treats all the inputs as Strings, we need to change that to insert the data properly.We will use the Change Data Type option in the arrow menu on the columns to set the data types to something compatible with how we defined the columns in the BigQuery table on Step 2.
First, by following this same procedure, we will change the data types of the fields id, status_code and invoice_number to Long, so they can be inserted in the BigQuery table:
Next, we will replace the commas with points in the amount field prior to changing the data type to float so that it can easily be inserted in BigQuery as a float value and so that the commas don’t get misinterpreted as being thousand separators (this is caused by points being used as the thousands separator rather than commas in non-english countries and the commas being used as a decimal separator). We will use the Find and replace function for this, fill in the comma in the Old value field and a dot in the New value field, and then click on the Replace All button:
We can now change the data type of the field to float.
Now, we will parse the order_date field so it can be inserted correctly as of type Datetime into BigQuery.
As explained earlier, we will have to use a custom format as the input dates are in the format dd-MM-yyyy H:mm , so select the “custom format” option, provide the format in the text field and click Apply.
Next, we’ll use the “Send to error” function to filter out the rows where the amount is 0, these will get sent out to the error file where they can be reviewed further and reprocessed if required later on (because we had previously changed the data type of this field to float, the wrangler automatically has replaced the input values from 0 to 0.0, so we need to set the filter to 0.0 accordingly).
Select the Send to error Transformation in the amount column, set the condition to “value is” , and the value to 0.0, then click on Apply:
The source data has some nulls in the field delivery_date, we will also send those records to the error port so that those can be reviewed later.
Select the Send to error Transformation in the delivery_date column, set the condition to “value is empty” and click on Apply:
Finally, we’ll parse the field delivery_date, as datetime with the same procedure as we did for the order_date field:
Now that we have made the recipe for all the transformations, to the right side of the wrangler screen, we can see a summary of the transformation steps, you can also delete steps if you wish to do so:
Lastly, click the Apply button on the upper right side and you will get back to the wrangler properties page, where you will now see the recipe in JEXL syntax, you will also see to the right, the output fields that will come out of the wrangler step in the pipeline:
Now you can click the Validate button to validate everything looks alright, and then when ready, click on the X next to the button to go back to the pipeline design canvas.
Now we’ll handle the errors, in the canvas, we’ll expand the “Errors handlers and Alerts” section in the left menu, and click on “Error Collector”:
Then, we will drag and drop an arrow from the Error port in the Wrangler to the Error Collector to direct and collect the output errors from the Wrangler, we won’t perform any further configuration in the ErrorCollector properties. Instead, we will expand the Sink section in the left menu and click on “GCS” to add a GCS Writer to the canvas, to which we will write the output of the errors collected from the ErrorCollector:
Drag an arrow from the ErrorCollector to the GCS Writer and then hover the mouse over the newly added GCS Writer to access its properties. Here we will set the label to “GCS-WriteErrors”, the Path field to gs://[YOUR_CDAP_ERRORS_BUCKET] created in Step 1, the format to CSV and the write headers flag to True, finally we’ll set the Output File prefix to “errors”. This will write a CSV file to the path you defined, and add a new line to it by each error that comes out of the wrangler during the pipeline execution:
Now that we’ve handled the errors properly, we will add the final step in the pipeline, ingesting the output of the wrangler to the BigQuery table. In the same sink section in the left menu, click in the BigQuery icon and drag an arrow from the output of the wrangler to this new icon:
Now the pipeline should look like this, hover the mouse to the BigQuery icon in the canvas and access its properties to setup the final part of the pipeline:
In the BigQuery sink properties we will set a few values:
We will set the dataset and table to the dataset and tables we created in BigQuery during Step 2. We’ll set the Temporary Bucket Name to gs://[YOUR_CDAP_TEMP_BUCKET]
We will also enable the Truncate Table flag so that the table gets truncated every single time the pipeline runs:
Now that the pipeline is complete, you may try a dry run of the pipeline by clicking on the Preview button on the top of the canvas, and manually setting the FileName we setup as a Runtime Argument defined for the pipeline (after the pipeline is deployed and executed from the Cloud Function, this Runtime Argument will be filled with the actual File name that is uploaded to gs://[YOUR_DATA_SOURCE_BUCKET] the and it will be passed to the pipeline at runtime), for now we will provide the same File Name that we uploaded to the bucket in Step 1 for the purposes of this dry run, and click on Run:
The preview of the pipeline will run for about a minute but no actual execution will be performed, no Dataproc cluster will spawn and no records will be written to the BigQuery table:
Once the preview run finishes, you can Preview what data reached the input at each step and what came out after the processing of each step. You do so by clicking on the “Preview Data” links on each of the pipeline steps.
Once you have reviewed the preview and are satisfied with how the pipeline behaves, it’s time to deploy it. For this, go back to the canvas and disable the preview mode by clicking on the Preview button again, and then click on the Deploy button.
After a few seconds, the pipeline will be deployed and ready to be executed:
5. Create the Cloud Function that will trigger the execution of the pipeline upon a new file being uploaded to the Data Source Storage bucket.
Now that the Data Fusion pipeline that will transform the data and load it into BigQuery is deployed and ready to be executed, we will set up the Cloud Function that will get Triggered whenever a new file is uploaded to the Data Source GCS bucket. The purpose of this Function is to pass the newly uploaded file to the Data Fusion pipeline and start its execution.
For this, we will need to enable the Cloud Build API first, enable it if you haven’t done so already:
Then, navigate to Cloud Functions:
Once you’re in there, click on CREATE FUNCTION:
Set the basic function configuration and Triggering behavior:
Give the new Function a meaningful name, set the Trigger type to Cloud Storage, the Event Type to Finalize/Create and the Bucket to [YOUR_DATA_SOURCE_BUCKET] , then hit the SAVE button.
Then click on the NEXT button below.
Note: If you did not enable the Cloud Build API yet, you will be prompted to do so (as depicted in the red banner in the image below, compelling you to enable it).
In the Code section of the Cloud Function wizard, to the left you see a list of file templates that are automatically created for you and are required depending on the Runtime and language you will use to create your function. As we will create the function with the Python 3.9 runtime, 2 files are created: main.py and requirements.txt
The requirements.txt file contains a list of the Python libraries and its versions that are referenced in the Python function itself and hence are required for the function to work. The main.py file contains the actual Python code that gets executed when the function is triggered.
First we will edit the requirements.txt file, in there we will list down the Python libraries required for the Cloud Function, in the case, the only library that doesn’t come bundled by default in the Runtime and that we will use in the main.py file is the requests library (which we will use in the script to send an HTTP POST request the Data Fusion REST API to start the data pipeline we created earlier with Data Fusion, and to retrieve an access token from the Metadata Server):
Select Python 3.9 in the Runtime dropdown, then click in the requirements.txt file below and add the following line to the requirements.txt file:
requests>=2.25.1
So it looks like this:
Then we edit the main.py file and write the python script that will run whenever a new file is uploaded to the Storage Bucket we specified earlier:
Python Script:
Remember to replace the values in the YOUR_CDAP_INSTANCE_ENDPOINT and YOUR_PIPELINE_NAME constants with your Data Fusion instance’s endpoint. You can find out your instance’s endpoint by running the following command in the cloud shell:
gcloud beta data-fusion instances describe \
- location=${LOCATION} \
- format="value(apiEndpoint)" \
${INSTANCE_ID}
Example:
You can find the code in the main.py file in GitHub here:
gcp-functions-bq/main.py at main · davavillerga/gcp-functions-bqContribute to davavillerga/gcp-functions-bq development by creating an account on GitHub.github.comhttps://medium.com/media/c9662ec1fdbfd55b8420c76a15fa36e4main.py code for the Cloud Function that triggers the execution of the Data Fusion Pipeline
Finally, click on DEPLOY, the function should take a couple of minutes to deploy. When it finishes you should see it listed under the deployed Functions as a correctly deployed function like this:
To test that everything is working as expected so far, we will simply upload a CSV file to gs://[YOUR_DATA_SOURCE_BUCKET] it doesn’t matter how you name the file, as long as the data and headers remain the same as the Sample file we uploaded at the beginning.
You can confirm the Cloud Function ran by checking the function logs in you function:
You can also confirm that the Data Fusion pipeline was started by the function in the Data Fusion instance UI:
The Status will change from Deployed, to Provisioning and then to Running.
In the Dataproc Clusters page, you will see that Data Fusion has spawned an ephemeral cluster that will stay alive as long as the pipeline is running, and when it finishes it will get destroyed:
After around 5 to 6 minutes, the pipeline execution will finish and the status will change to Succeeded and you will see in Data Fusion how many records were passed from each step to the next one. By clicking in Runtime Args at the top, you can also see the value that the Cloud Function passed as FileName argument to the pipeline:
Finally in BigQuery you can check the records were loaded in the BigQuery table:
6. Create a simple report in Data Studio with some visualizations of the data in BigQuery.
Now that we have some data in BigQuery to play around with, we will very quickly put together a simple report in Data Studio. In the same BigQuery table explorer, export the the table with the Export menu to the right to begin exploring the data in Data Studio:
The Data Studio UI will open in a separate tab with the table data preloaded:
We just added 3 very simple visualizations, a sum of the column amount, a ring graph with the distribution of percentage of records by item_category, and a time series graph with the amount by order_date, right now as the input data only has data for 2 days in the order_date column, the graph looks weird, we’ll add a new file with more records to see the difference.
We save the data exploration with the save button and hit the share drop down and select Create report and share to save the report and go from the explore mode to the the report mode.
Confirm the prompt in the popup that shows up for DataStudio to access the data from BigQuery. Now, from here we can continue adding pages, visualizations and graphs to the report, format and apply some nice styles to the report so it looks nice and tidy, and prepare it to be shared in many different ways or even schedule emailing to share it.
We upload the file UpdateFile.csv (which contains much more data) to the data source bucket and wait for the data update process to be completed, once done, we can easily refresh the report in Data Studio to send another up-to-date report:
The data gets refreshed and you can send another updated report:
That ‘s it! Now you got yourself a fully functional automated initial ETL + Analytics workload. All you have to do now is keep feeding data into GCS and whenever you need it, you can have Data Studio send scheduled reports with up to date data.