Hadoop to BigQuery Migration — New Edition

Vladimir Elvov
16 min readApr 19, 2021

In a story 1.5 years ago we looked for the first time at how to migrate from Impala and HDFS to BigQuery for cloud-based analytics. Since then we’ve seen several new projects in this area and with them increasing complexity. This article outlines how we have improved our solution for a migration since then, focusing particularly at further automating things and reducing manual workload. We also provide code examples for some parts of the migration process.

The on-prem world

The project we’re looking at this time covers the migration of a primarily DWH-focused, Cloudera-based on-prem data lake to GCP BigQuery. The architecture of this on-prem data platform follows a very common pattern that we’ve seen in many companies. Data gets ingested from several sources, like files servers, relational databases, or message queues. Ingestion happens either with ETL-vendor based solutions (e.g. Talend, Informatica, Pentaho, etc.) or with custom code (e.g. Spark). The ingested raw data arrives first in Hive or Impala tables that persist data in Avro format on HDFS. Since Avro is a row-oriented format, using it in the raw data layer is a good idea to quickly extract all columns of a table for later transformations. These transformations often happen SQL-based with data being moved to Impala tables that persist data in Parquet format. For simplicity, let’s call this part of the on-prem data platform the analytical layer. In this analytical area keeping data in a columnar format is a good idea to allow for fast analytical queries. Also Impala is faster than Hive so that analytics teams often choose it for creating reports. The tables with transformed data forming the basis for reports are then accessed by data visualization tools like Tableau, Microstrategy, etc. Additional consumers of raw data can be data wrangling pipelines for Data Science use cases to cleanse and prepare data for training models or performing batch predictions. Often the end consumer here is the Cloudera Data Science Workbench. The architecture looks typically as follows:

Standard Hadoop based architecture for DWH style use cases

Moving to the Cloud

Running that kind of SQL-based analytics in the Cloud is typically much more cost-efficient and light-weight. Instead of continuously running a Hadoop cluster, one does only pay for what one uses. Particularly a serverless solution like BigQuery allows to scale whenever requiring additional storage and compute power without any upfront investments. Therefore various enterprises are now looking to migrate their on-prem Hadoop clusters to the cloud and especially the SQL at scale part BigQuery is a popular choice. But how to do this? Generally, migrations are made up of two parts: The initial load and the delta load. The first part is where historical data is loaded into BigQuery, the second one is used for a continuous replication of data between the on-prem cluster and the GCP whilst data is still ingested from the sources to the on-prem cluster.

There are two reasons for this two-phased approach. One is that doing both the initial load and then switching to the cloud-based data pipeline immediately is a Big Bang approach and this is very risky. Generally speaking, migrations are complicated and the more you can reduce the risk the better for your data, your data platform, your data consumers and also for your chances to get a promotion or at least not to get angry mails from the ones who are using your data. The second reason is that if you don’t do a Big Bang approach, then you need to get continuous refreshes of data to your new cloud based pipeline. This delta load can be done by accessing the sources from the cloud right away, however, this would double the load to the source system (as the on-prem cluster also needs this data). If this source system is a relational database, this is a bad idea as its capacity is limited and you might break down the entire database. This is where the continuous replication from the on-prem cluster comes in handy. Of course if you’re ingesting data from a system that is designed to allow parallel consumption like Apache Kafka, you don’t face such an issue and can consume the data directly from the Kafka topic through a cloud-based Kafka consumer. However, in most cases data still does arrive in on-prem Cloudera clusters from relational databases, so the continuous replication will come in handy. In the next sections we will look in more depth at how the initial load and the delta load work.

The Initial Load

In the initial load data is first migrated and loaded from both the raw data layer and the analytical layer. The data is persisted on-prem in HDFS, so you will copy the data first from HDFS to Google Cloud Storage (GCS) in the form of files. This can be done using the HDFS connector for GCS that is installed on the on-prem cluster nodes and then allows to run a distcp command that will copy files to a GCS bucket by specifying the respective bucket URI. The folder structure in HDFS will be also copied over into GCS. We will later see why this is so helpful. The exact steps on how to install the connector and run the distcp command are pretty well documented in the official Google documentation, so we won’t be diving into the details here.

Data arriving in GCS will be available in the form of files and now needs to be loaded into BigQuery. For this the GCP offers bq load jobs and we’ve made use of them in our previous story. However, if you want to load to a BigQuery table, which is partitioned by a specific column and also encrypted, you will have to create this table first. Creating tables is simple and can be done through a DDL statement. Unfortunately it’s not possible to just run a SHOW CREATE TABLE statement in Hue and then copy it to BigQuery because BigQuery does not use the same data types as Hive or Impala do. So you will have to manually change these data types in your DDL statement. Now, if you have hundreds or even thousands of tables with more than 100 columns to migrate, this can become a very time consuming and error-prone exercise. At this point you might want to use third-party software, which is going to do this DDL conversion for you. Or you can leverage the schema auto-detection in BigQuery and save some of your migration budget. Data in GCS is available in Avro and Parquet formats — this means the files contain schema information themselves.

BigQuery is able to identify the schema in these files and create a so called external table on top of the files. This works very similar to how Impala and HDFS work — just think of BigQuery being Impala (query engine) and GCS being HDFS (persistent storage). Besides just being able to query the data in files on GCS, BigQuery also supports Hive partitioning schemas in GCS. This means if you’ve had a partitioning schema by e.g. year, month and day on HDFS, then copied over the data into GCS with that exact folder structure, then BigQuery can pick up this schema and create additional columns for the partitions. Now you can query the external table and do partition pruning as well. Since BigQuery charges you for the amount of data queried, this is a significant mechanism to both improve performance and reduce cost.

The next steps will depend on what your use case looks like. If you want to stick to the approach you’ve done on-prem and use an append-only model for your tables then using a BigQuery external table and persist the data in GCS in Avro files is a good fit for the data from the raw data layer. For the analytical layer it makes more sense to load the data to BigQuery native tables as this offers the best performance for analytical queries. If your current use case is focused on getting data loaded as a copy of a table in a relational database and you need to run truncate and load scripts, then using BigQuery native tables is a better option for the raw data layer they support DML operations. Also if you want to run analytical queries on raw data then bringing it to BigQuery native tables is the best option due to better performance as compared to external tables.

Let’s assume that you want to have your data natively in BigQuery. Once you have an external table, you can now use a CREATE TABLE AS SELECT FROM SQL query to create a new BQ native table and move this data to this native table from the external one. It’s possible to specify partitioning for the BQ native table and an encryption key for encrypting the data with a Customer Managed Encryption Key (CMEK). Partitioning is important to configure at this stage, because in BigQuery the table partitioning can only be defined at table creation time. With this we now have a partitioned BigQuery table with all data from the Impala table.

The architecture for the initial load will look like this:

Initial load of data from the raw and analytical areas to BigQuery

Code example for Initial Load

This is the code to create an external BigQuery table. First a table definition has to be created, specifying the file type, the uri prefix (highest level of folder structure before the partitioning comes in) and the file location (typically the uri prefix followed by a /* for a wildcard):

bq mkdef --source_format=PARQUET --hive_partitioning_mode=AUTO \
--hive_partitioning_source_uri_prefix=gcs_uri_shared_prefix \
gcs_uri >/tmp/table_def_file.json

Once the table definition file is there, you can view its contents. It will look similar to this, assuming that the files are located in a bucket with the URI gs://bucketNameand it also is the top level directory above the partitioning directories:

"hivePartitioningOptions": {
"mode": "AUTO",
"sourceUriPrefix": "gs://bucketName"
},
"sourceFormat": "PARQUET",
"sourceUris": [
"gs://transaction_data_bucket_for_bigquery/bucketName/*"
]
}

Pay attention when using Avro files to ensure that using Avro logical types has been whitelisted for your project and the respective flag is set to true in the table definition file. Otherwise you might face issues with your data, especially with timestamp columns. When using Parquet files, it’s important to get the encoding of String fields right. More details on dealing with specifics of Avro and Parquet files can be found in our previous story. Then the external BigQuery table is created:

bq mk --table --external_table_definition=/tmp/table_def_file.json DatasetName.external_staging_table

Once the external table is created a simple CREATE TABLE AS SELECT statement can be used to load data to a BigQuery native table. It is possible to specify a partitioning column, clustering columns and also encryption options for Customer Managed Encryption Keys (CMEK). Here is an example for a table partitioned by day on a timestamp column with data coming from the federated table created before without using the partitioning columns from Hadoop. The partitioning columns from the external table get removed because they are not required anymore as partitioning is done natively in BigQuery now:

CREATE TABLE avro_native
PARTITION BY TIMESTAMP_TRUNC (columnNameForTimestampPartition, DAY)
CLUSTER BY columnNameForClustering
OPTIONS(kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY')
AS SELECT * except (year, month, day)
FROM external_staging_table

Using these steps ensures that one does not have to deal with the schema of tables, converting DDLs or even creating bq load jobs and making sure they will run through for a large amount of files. Thus migrating data from Hadoop to BigQuery becomes much easier and less error-prone.

The Delta Load

Completing the initial load is a big step forward and will allow users to start working with the new cloud data platform. But they typically will want to have their data refreshed and this is where the Delta Load comes in. There are two main differences between the Initial Load and the Delta Load. The first is that data is now replicated only for the raw data layer, not for the analytical layer. This is done so that it is possible to build the actual data transformation pipeline directly on the cloud. In most cases these pipelines are SQL scripts in BigQuery, more complex transformations done on-prem in Spark can be migrated to Cloud Dataproc. Once it is clear that the output of the analytical layer both on-prem and in the cloud are the same, the new cloud based pipeline is ready. The second difference is that since it has to be done in a continuous way instead of just once as with the initial load, using DistCp is sometimes not the best way ahead. Instead, it is better to use a solution that will be loading data through the GCS API. Typically this is a simple replication app running either on-prem or in the cloud, depending on whether it’s possible to push or pull data from on-prem. This will depend on your networking and security policies. Also, usingDistCP might be required if the size of a single file is very large and a copy operation, which would require to assemble the file first from separate HDFS blocks before actually calling the GCS API to load the file, would take too long. The high-level architecture for the Delta Load looks as follows:

Replication Architecture for Delta Load

One of the major questions is how do you identify the delta? This heavily depends your on-prem implementation, i.e. on how exactly data is ingested t the raw data layer. In some cases the delta is made available in a staging directory in HDFS (often the case for truncate&load scenarios, see our previous story). In some, rather rare cases, a messaging mechanism is available that will publish the URI of a delta file on HDFS to e.g. a Kafka topic. In both these cases you can just fetch the file from HDFS and copy it to GCS through an API call. If the delta is not simply made available, then you need to calculate it. Typically, a table in a DWH-style Hadoop use case has a column with an insert-timestamp in it. Then it’s required to run a query from the replication app to calculate the delta using the insert-timestamp column. The delta will be calculated by comparing the insert-timestamp to a timestamp value when the delta calculation was previously run. This timestamp value can be stored on a per-table basis in a log table. Once the delta is calculated, the replication app loads the delta through an API call to GCS. In the past we’ve typically used a simple Java app running on a Kubernetes cluster to do this replication.

How to load data from GCS to BigQuery?

Once the delta is available in GCS, the data needs to be loaded to BigQuery. This part depends on how you implement your use case. It depends on whether you need to transform the data before loading it to BigQuery and whether you have an append-only data model or want to run DML operations on it. The range of options goes from using external BigQuery tables (so no more loading of data to BigQuery native storage), over using load jobs run through a Cloud Function, using an orchestration tool like Cloud Composer or leveraging heavy-weight distributed compute frameworks like Spark or Beam/Dataflow. Generally speaking it makes sense to use the last option only if you need to perform transformations on data before loading it into BigQuery. Otherwise, using Cloud Functions or Cloud Composer to move data from GCS to native BigQuery tables is a viable approach as well. We will focus on the usage of Cloud Functions here.

A Cloud Function is a serverless construct, i.e. you don’t need to manage any infrastructure for it. Instead, when the triggering event is fired in our case a new file arrives in a GCS bucket it will run a piece of code. This piece of code can be a simple BigQuery load job that will load the contents of the replicated file from GCS to a BigQuery table in append only mode. If you want to replace truncate & load scripts because eventually you want to have a copy of a relational database table, then the DML functionality offered by BigQuery can be used out of the box. To do this, you would first load a the delta file to a staging table in BigQuery and then run a Merge operation between the target table and the staging table. The lifecycle of such a Cloud Function would look as follows:

Cloud Function with Merge functionality

It’s important to point out that the Merge operation works as an Upsert, i.e. when there is no DML operation column in the record in the delta file that says what kind of DML operation has to happen with a record (like there is for example in a CDC stream), then only updates and inserts will be executed on the BigQuery target table. If you need to delete records, you will need to provide information on the type of DML operation as part of the delta file on a per-record basis.

However, Cloud Functions can hit limits if you want to run DML operations and need to ensure ordering on data. The issue you could face is that data arrives in multiple files at the same time and these files contain data for the same id. Cloud Functions kick off immediately when their triggering event happens. If the file with a record for an id with a later point in time gets processed before a file with a record for the same id for an earlier point in time, you will end up with corrupted data which is out of order. If your ingestion mechanism to GCS can generate multiple files containing records for the same id at the same time, you should rather use sequential processing of the files on a time-based schedule, e.g. using Cloud Composer. For example, you could run the code shown below using the Python Operator and achieve DML functionality without data inconsistencies using sequential processing. If on the other hand your ingestion mechanism can ensure that files will contain only one record per id in a batch load (e.g. when you have daily or hourly batch loads of deltas) then you can use Cloud Functions with DML safely without worrying about data inconsistencies.

Finally, it’s important to point out that a Cloud Function triggered through a GCS event only guarantees at-least-once processing. This means, that in some rare cases, it is possible that a function will fire twice for the same file being written to GCS. If your data load process is idempotent (i.e. you use Upserts/DMLs) then this is not a problem. If you load data in append-only mode, then it is best to use a SQL query based transformation inside BigQuery in order to provide consistent data and remove potential duplicates.

Code example of the Cloud Function for loading a delta to BigQuery

Here is an example of what a Cloud Function can look like that loads a delta file to BigQuery in Append mode:

def load_gcs_to_bq(event, context):

file = event
bucket = 'bucketName'
uri= "gs://" + bucket + "/" + event['name']
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
# job_config: Defines file type and insert type (append vs
truncate)
job_config.write_disposition =
bigquery.WriteDisposition.WRITE_APPEND

job_config.source_format = bigquery.SourceFormat.PARQUET
# Run load job

load_job = client.load_table_from_uri(uri, client.dataset(
dataset_id='datasetName', project=None).table('tableName'),
job_config=job_config)

It is also possible to extend the Cloud Function code to support DML functionality by adding the following function. In this case the file would be first loaded to the staging table in Truncate mode before the processMerge function will be called:

# Load of file to BQ staging table completed beforedef processMerge (client,dataset,tableNameStage,tableNameTarget):# Define Merge query with Upsert functionality    queryMerge = (    " MERGE " + '`' + dataset.tableNameTarget + '`' + " T USING " +     
'`' + dataset.tableNameStage + '`' + " S ON T.id = S.id WHEN
MATCHED THEN UPDATE SET T.id=S.id , T.column1= S.column1,
T.column2=S.column2, WHEN NOT MATCHED THEN INSERT VALUES ( id ,
column1 , column2 );")
# Run Merge query
query_job = client.query(queryMerge)

To increase code flexibility we can also avoid hardcoding the table names. For that we extract them from the GCS file URI. This requires the GCS files to follow a specific naming convention, in this case they have the term prefix in the filename right before the table name:

# Tables in BigQuery follow a naming convention. The uri is extracted from the Cloud Function eventtableNamebase=getTableNames(uri)

tableNameStage = projectid + '.' + dataset + '.'+tableNamebase+"_stage"

tableNameTarget = projectid + '.' + dataset + '.'+tableNamebase+"_target"
# Function to extract tablename from the GCS event that triggers the Cloud Function.def getTableNames(uri):
split_uri = uri.split("/")
tindex=split_uri.index('prefix')+1
tablename=split_uri[tindex]
return tablename

Ensuring Data Quality

A data platform is only as successful as the quality of the data it can offer to its users. Therefore both after the initial load and during the delta load you should perform some checks of the data replicated from the on-prem Hadoop cluster. The most basic form is to compare the counts of the amount of records per table between Hive/Impala and BigQuery. You can also perform some spot-checksums. For more advanced checks you can calculate hashes or checksums and compare them. Especially if you have changed your data model it also makes sense to let the users of your data platform (in this case the ones who will consume data from BigQuery output tables) check the results in the output tables in the analytical area before making the final switch from the Hadoop cluster to BigQuery.

Conclusion

This article gives an overview of how data can be migrated from a Hadoop cluster to BigQuery for SQL at scale style use cases. The procedure for the initial load will be very similar for most migrations, whilst the delta load will heavily depend on how you want to deal with your data model. Depending on whether you want to change it or not, there are different options to set up a fast and cost-efficient loading of data to GCS and BigQuery.

Another important topic is the encryption of data and the networking setup. If you have data encrypted on-prem using Ranger or Cloudera KMS, then you will have to decrypt data before moving it to the cloud or share keys between the Hadoop cluster or GCP KMS. Alternatively, you can also replicate the encrypted file (client-side encryption) without sharing the keys with GCP KMS but then you will need to spin up a Cloudera instance with Ranger or Cloudera KMS in it to perform the decryption in the cloud before encrypting with a KMS key again. This choice will depend on your security team’s requirements on how data can be moved between on-prem and the cloud. Also the networking setup is very organization specific, this is why we do not go into the details here.

Finally, we hope that this article will help you to get started and give guidance on how to do a migration from Hadoop in a simple and efficient way. After all, being able to bring your data to the cloud offers not only cost-savings for your organization, but also the possibility to build a whole lot of new use cases.

About the Authors: Mark and Vladimir Elvov work in different functions in Data Architecture. New technologies are our core passion — using architecture to drive their adoption is our daily business. In this blog we write about everything that is related to Big Data and Cloud technologies: From High-Level strategies via use cases down into what we’ve seen in the front-line trenches of production and daily operations work.

--

--