BigQuery Migration using GCP native methods

Vladimir Elvov
7 min readMar 1, 2020

In our previous story we looked at the challenge of how to migrate from a Hadoop based analytics ecosystem with Impala and HDFS to GCP’s BigQuery. We tried to solve this problem with a commercial data integrator called Striim. Whilst this worked pretty well, reading data from files on HDFS and writing to BigQuery via Striim requires to parse the files in order to impose a schema on the data. When dealing with 50 tables where some have up to 300 columns this can be a lot of manual and particularly error-prone work. So how to avoid this?

After trying multiple approaches including Cloud Data Fusion and looking into custom development, we found out that BigQuery is actually a very smart system that offers multiple ways of loading data. One of these ways is to use BigQuery load jobs (bq load jobs). When loading data through these bq load jobs, the respective data format can be specified. Options include CSV, JSON, Avro and Parquet. Avro and Parquet are superior to the first two because they are binary files and also contain the schema. Loading binary files means that the load process is going to be much faster when compared to loading files with human-readable formats. Also the schema of the data is self detected by BigQuery, so you don’t have to deal with parsing a file — this reduces your work-load a lot!

At the same time Hadoop allows to generate both Avro and Parquet files automatically. We started out with using Avro and generated an Avro test table on our Hadoop cluster. After some issues with how to make timestamps are shown correctly when executing the bq load job (the parameter use_avro_logical_types must be set to true) we were able to get a test table into BigQuery. However when using Avro tables, it is required to create an Avro schema file — so we have to deal at least partially with the schema again.

So eventually we started to look into using Parquet as file format. Parquet excels at compression rate and requires no schema files for creating a Parquet table on Hadoop’s side. Depending on the data we were able to achieve a compression rate of 40 — this means a table of 200 GB in CSV format gets reduced to Parquet files totaling 5 GB thus boosting initial load speed heavily!

For exporting the data on Hadoop’s side we used Impala “export” tables. Such a table is basically a copy of the actual source table in Impala (More details on our source Hadoop architecture can be found here). When creating these tables we could copy the exact schema of a source table. Some additional parameters also are required to boost the solution’s performance. When loading data from the source tables the following is required:

set PARQUET_FILE_SIZE=512m
set PARQUET_ANNOTATE_STRINGS_UTF8=true

The first one refers to the size of the actual Parquet files in HDFS where the data of the Impala export table will be located physically. When setting it to a larger size we found out that a bq load job will hit a memory exception so we recommend keeping it at 512 MB. The second parameter ensures that data loaded to BigQuery will be decoded to a human-readable String. Without it, all String columns in BigQuery will stay in binary format — which is not usable for anyone who wants to work with the table.

On BigQuery’s side we had to create the target tables first, however dealing with data types in BigQuery is fairly easy because BigQuery covers most of them.

With the files now available in HDFS and tables ready in BigQuery, we are good to go for the initial load of 3 TB of data. We needed to transfer the HDFS files to Google Cloud Storage (GCS) to eventually load them into BigQuery via bq load jobs. To use as much automation as possible, a simple Java app fetches a file in the specified HDFS directory, calls the GCS API and thus copies the file to a GCS bucket. As soon as all files associated with a specific table are available in GCS we start a script that iterates through the files in a GCS bucket and triggers a bq load job for every file. It’s important to note that it’s not recommended to load all files in the GCS folder for a table with one bq load command because BigQuery guarantees data consistency only within one loaded file. The bq load command used looks as this:

bq load \
--source_format=Parquet \
datasetName.tableName \
"gs://BucketName/folder/FileName.parquet"

But we don’t just want to conduct an initial load, we also want to be able to load updates with a dily delta to our BigQuery target tables. An additional concern is that BigQuery is a DataWarehouse solution, which is good for creating historized tables with append-only records. However, what if the data scientists and analysts working with these tables want to have an exact copy of the source tables in Oracle — something that would require not only inserts but also updates of already existing records? BigQuery supports DML operations, which makes designing a solution easier. But how to solve this in detail?

In our Hadoop cluster Oracle Data Integrator already loads a daily delta to an Impala Parquet staging table for our daily provisioning of data into the Hadoop based analytics solution. Since the data is available in an Impala table, this means that the same data is physically stored in a HDFS directory in form of Parquet files. The next step is to move these files to the Cloud. Specifically GCS, because this is from where our bq load jobs can propagate the data towards BigQuery. This is taken care by a Java app that is scheduled through an Oozie Workflow and copies files from HDFS to GCS daily.

We can’t however just upload our Parquet files from GCS to a target BigQuery table via a bq load job because these don’t support DML operations. This can only be done through the usage of SQL. So we do need a staging table in BigQuery where the data from the delta file will be kept first before we can actually use it to update the target table. This staging table will be loaded through a bq load job. When the data is available in the staging table, a MERGE statement towards the target table will be executed using a unique id column — the Primary Key column from the Oracle table. If there is a match for a record, we update the old value, if there is no match we simply insert the new record to the target table.

For a table with the three fields id (that is also the key-column), column1 and column2 the Merge statement would look as follows:

" MERGE " + '`' + Dataset.TargetTable + '`' + " T USING " + '`' + Dataset.SourceTable + '`' + " S ON  T.id = S.id WHEN MATCHED THEN UPDATE SET targetTable.id=sourceTable.id ,  targetTable.column1= sourceTable.column1, targetTable.column2=sourceTable.column2, WHEN NOT MATCHED THEN INSERT VALUES ( id ,   column1 ,   column2 );"

We now can manually execute bq load jobs and Merge statements to load data to our BigQuery target tables. But of course we don’t want to this manually and we also need some housekeeping. So what can trigger the bq load joband Merge statement?

Although this use case hardly can be viewed as a streaming one, the answer is — as often — with event-driven architectures. Every file arriving in the GCS bucket can be viewed as an event. This event in its turn triggers a further chain of data processing. To deal with such triggering events, Google Cloud Platform offers a server-less solution called Cloud Functions. A Cloud Function has a trigger that it’s listening to and a set of actions that are executed when the event gets fired. In our case we can configure the trigger to be a newly arriving file in the GCS bucket. The set of actions would be the bq load job and the merge statement. The Cloud Function has the schema of the staging table as parameter from a config file so it can create the staging table when it is executed. In addition to these steps, we also require some housekeeping. We also have to delete the staging table after the merge statement has been successfully executed. The following diagram depicts the entire workflow inside the Cloud Function:

This diagram shows our full BigQuery delta load architecture from the Hadoop cluster to the Cloud:

The advantage of this architecture is that it is extendable to as many tables and datasets as required due the parametrization of the Cloud Function. It also scales due to the scalability of the utilized components like Cloud Functions, BigQuery load jobs and Parquet files. It can also be extended to serve much more frequent update cycles than once per day — it can easily handle hourly or even minutely updates if required.

About the Authors: Mark and Vladimir Elvov work in different functions in Data Architecture. New technologies are our core passion — using architecture to drive their adoption is our daily business. In this blog we write about everything that is related to Big Data and Cloud technologies: From High-Level strategies via use cases down into what we’ve seen in the front-line trenches of production and daily operations work.

--

--