Author: Eswara Pendli
Recently, MuleSoft released BigQuery Connector in Anypoint Exchange which is created by Connectivity Partners. This connector supports Mule 4.X Runtime version.
For Basic Understanding of BigQuery and Features of BigQuery, Please Refer Mulesoft + BigQuery Series 1.
In this Series, We will discuss Operations supported by BigQuery Connector and Simple Demo on ‘Fetch Table Data’ using BigQuery Connector.
Quick Points?
- MuleSoft’s BigQuery Connector creates connectivity with Google BigQuery through Anypoint Platform.
- BigQuery is a serverless, highly scalable, and cost-effective cloud data warehouse designed to help you make informed decisions quickly, so we can transform our business with ease.
- This connector provides organizations access to BigQuery through interfacing the Google BigQuery API.
- The Google BigQuery Connector allows customers to create, manage, share and query data.
Operations Supported By Connector:
- Create Job:
This operation is used to create and start an asynchronous job.
Major options are :
- Copy Job
- Extract Job
- Load Job
- Query Job
- Get Job :
This operation is used to return information about a specific job based on given BigQuery Job name.
- List Job:
This operation is used to list all jobs that we started in the specified project.
- Cancel Job:
This operation is used to cancel the BigQuery Job based on given Job name.
- Create Dataset:
This operation is used to create a new empty dataset.
Which can hold one or more tables in it.Dataset names should be unique.
- Get Dataset:
This operation is used to return the dataset based on given datasetId.
- List Dataset:
This operation is used to retrieve information on each dataset.Which consists of a limited amount of information about each dataset: DatasetId, FriendlyName and GeneratedId. If we want to get complete information, we have to use the “Get Dataset” operation.
- Update Dataset:
This operation is used to update information in an existing dataset.
- Delete Dataset:
This operation is used to delete the dataset based on given datasetId.
- Create Table:
This operation is used to create a new table in existing dataset. We have to provide a new table name along with datasetId.
- Get Table:
This operation is used to return the specified table from dataset based on given datasetId and tableId.
- List Table:
This operation is used to list the tables in the specified dataset based on given datasetId.
Which consists of a limited amount of information about each table: TableId, FriendlyName, Generated and type. If we want to get complete information, we have to use the “Get Table” operation.
- Update Table:
This operation is used to update information in an existing table based on given datasetId and tableId.
- Delete Table:
This operation is used to delete the specified table from the dataset based on given datasetId and tableId.
- List Table Data:
This operation is used to list the content of a table in rows based on given datasetId and tableId.
- Query:
This operation is used to run the query associated with the request based on given JobId.
- Get Query Result:
This operation is used to return the results of a query job based on given JobId.
- Insert All:
This operation is used to stream data into BigQuery one record at a time without running the load job based on given datasetId and tableId.
Pre-Requisites:
- We need to have google cloud account/sandbox account.
Demo:
Create Dataset, Table and Insert data to Table from Anypoint Studio:
- Create a new project and Add “Google BigQuery” palette or dependency to project.

- Once the Connector is added to the palette, we can multiple operations supported by BigQuery.

- Configure the BigQuery global configuration as shown below.
Global configuration for this connector is simple.We need to update this configuration with our service Account Key details:
Where,
Project ID: Project Name in which BigQuery is associated.
Service Account Key:Download the JSON file from the Credentials section.
:.: (To know, How to setup Client-ID, Please refer 5th Point under “Configuration / Connection Establishment for Mulesoft”from Mulesoft + BigQuery Series 1)


- Here is the mule snap to create dataset, table and insert data to table.

- In this flow, we are expecting datasetId and tableId from queryParams.

- We are creating new dataset using datasetId.

- Ideally, the table will be present under the dataset. So we are creating a new table using existing datasetId with a new table schema. The table schema contains relevant information which is fieldName, fieldType and fieldMode in the schema section as shown below:

- We will be creating a new table using datasetId along with tableId. We would need to send table schema in object type only.

- On successful table creation. We can insert the data using InsertAll operation. We need to update the payload at “Row Data” section. There is a limitation on “Insert All”operation, We can send the data in a bunch of 10,000 records/rows at a time.

- Here is the snap of newly created Dataset and data inserted to created table in google BigQuery:


- Similarly, we can use list/fetch the data from table using “ListTableData”operation by giving datasetId and tableId:

- Here is the complete XML code:
<?xml version=”1.0″ encoding=”UTF-8″?> <mule xmlns:file=”http://www.mulesoft.org/schema/mule/file” xmlns:ee=”http://www.mulesoft.org/schema/mule/ee/core”xmlns:bigquery=”http://www.mulesoft.org/schema/mule/bigquery” xmlns:http=”http://www.mulesoft.org/schema/mule/http” xmlns=”http://www.mulesoft.org/schema/mule/core”xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation”xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=” http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/bigquery http://www.mulesoft.org/schema/mule/bigquery/current/mule-bigquery.xsd http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd”> <bigquery:config name=”Google_bigquery_Config” doc:name=”Google_bigquery Config” doc:id=”d3e7f8c8-f574-451d-9601-8b84b870398f” > <bigquery:connection projectId=”navigation-api-demo”jsonServiceAccountKeyFile=”navigation-api-demo-29521a1df0a1.json”/> </bigquery:config> <flow name=”bigQuery-connector-operationsFlow” doc:id=”f5fb8dee-cc67-4d55-92dc-2b70b6cf5408″ > <http:listener doc:name=”Listener” doc:id=”890546de-34a8-4b22-9c0d-4f15280811a9″ config-ref=”HTTP_Listener_config” path=”/createDataset”/> <ee:transform doc:name=”fetch RowData” doc:id=”8f0e4925-adde-4075-b9b3-3f35c9a5cd0e” > <ee:message > </ee:message> <ee:variables > <ee:set-variable variableName=”rowData” ><![CDATA[%dw 2.0 output application/json — payload]]></ee:set-variable> </ee:variables> </ee:transform> <ee:transform doc:name=”fetch Query Params” doc:id=”adfa1642-afb8-4b88-8b59-010d5cf20c52″ > <ee:message > </ee:message> <ee:variables > <ee:set-variable variableName=”params” ><![CDATA[%dw 2.0 output application/json — { dataSet: attributes.queryParams.dataSet, tableName: attributes.queryParams.tableName }]]></ee:set-variable> </ee:variables> </ee:transform> <bigquery:create-dataset doc:name=”Create Dataset” doc:id=”df761e1d-43ca-464d-82fc-820396f4774c” config-ref=”Google_bigquery_Config” datasetName=”#[vars.params.dataSet]” target=”dataSet” /> <!– <logger level=”INFO” doc:name=”Dataset Logger” doc:id=”da427d49-8f39-4319-bc8f-ff498411685f” message=”Dataset has created with : #[vars.params.dataSet] !” /> –><ee:transform doc:name=”Define Table Schema” doc:id=”2fdfafad-eb22-40da-9dbd-91c1a0c26e6c”> <ee:message> </ee:message> <ee:variables> <ee:set-variable variableName=”tableSchema”><![CDATA[%dw 2.0 output application/json — [{ “fieldName” : “Name”, “fieldType” : “STRING”, “fieldDescription” : “This is name”, “fieldMode” : “NULLABLE” }, { “fieldName” : “Age”, “fieldType” : “INTEGER”, “fieldDescription” : “This is Age”, “fieldMode” : “NULLABLE” }, { “fieldName” : “ID”, “fieldType” : “INTEGER”, “fieldDescription” : “This is Employee Id”, “fieldMode” : “NULLABLE” }]]]></ee:set-variable> </ee:variables> </ee:transform> <bigquery:create-table doc:name=”Create Table” doc:id=”95d64707-8c9a-47d3-a705-aa446716d229″ config-ref=”Google_bigquery_Config” target=”tableName”tableFields=”#[%dw 2.0 output application/java fun parseSchema(schema) = schema map ( item , index ) -> { fieldName : item.fieldName, fieldType : item.fieldType, fieldDescription : item.fieldDescription, fieldMode : item.fieldMode } — parseSchema(vars.tableSchema)]”> <error-mapping targetType=”TABLE:NOT_FOUND” /> <bigquery:table-info> <bigquery:table table=”#[vars.params.tableName]” dataset=”#[vars.params.dataSet]” /> </bigquery:table-info> <bigquery:table-option /> </bigquery:create-table> <!– <logger level=”INFO” doc:name=”Table Logger” doc:id=”1e5ab309-74eb-44e0-8fe9-843cf82716c4″ message=”Table has created with : #[vars.params.tableName] !” /> –><foreach doc:name=”For Each” doc:id=”7658c0c7-196c-4eee-aa6d-f9e92b14b5fe” collection=”#[vars.rowData]” batchSize=”10000″> <!– <logger level=”INFO” doc:name=”Logger” doc:id=”9ad00e68-6e48-4773-b074-0e56313a174f” message=”Loop Started for : #[vars.counter]”/> –> <ee:transform doc:name=”Transform Message” doc:id=”4378d85a-0af8-4417-81b3-f9ded1e884ba” > <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 output application/json — payload]]></ee:set-payload> </ee:message> </ee:transform> <bigquery:insert-all doc:name=”Insert All” doc:id=”b018af78-9b38-4fad-b56a-4ba4dee62c52″ config-ref=”Google_bigquery_Config” tableId=”#[vars.params.tableName]”datasetId=”#[vars.params.dataSet]” rowsData=”#[output application/json — payload]” /> <ee:transform doc:name=”Transform Message” doc:id=”2ddc192c-cfd0-4957-acfa-a279f815537d”> <ee:message> <ee:set-payload><![CDATA[%dw 2.0 output application/json — payload]]></ee:set-payload> </ee:message> </ee:transform> <logger level=”INFO” doc:name=”Logger” doc:id=”def0515f-e1cb-4e07-be8e-9461b0deb921″ message=”#[payload]”/> </foreach> </flow> <flow name=”bigQuery-connector-operations-ListTableDataFlow” doc:id=”5778fd4a-1afe-4209-9eff-869a04a7f6f4″ > <http:listener doc:name=”Listener” doc:id=”da61427f-a542-4cfc-af4c-a4a177561d97″ config-ref=”HTTP_Listener_config” path=”/ListTableData”/> <bigquery:list-table-data doc:name=”List Table Data” doc:id=”d7483c77-9fc2-44d4-99cb-302164309cd2″ config-ref=”Google_bigquery_Config” datasetId=”#[attributes.queryParams.dataSet]” tableId=”#[attributes.queryParams.tableName]”/> <ee:transform doc:name=”Transform Message” doc:id=”cd2df2e2-a650-4641-9364-471c3a73bba5″ > <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 output application/json var p1 = (flatten(payload.values map ( value: $.value))) — p1.value map { Name : $[0], Age : $[1], ID : $[2] }]]></ee:set-payload> </ee:message> </ee:transform> <!– <file:write doc:name=”Write” doc:id=”6b66be79-f33e-4505-be5a-981d7146a9a1″ path=”ListTableData.json” config-ref=”File_Config”/> –><logger level=”INFO” doc:name=”Logger” doc:id=”82140a01-1aae-42f0-9264-e6999b672a2a” message=”Total Number of ListTableData is: #[sizeOf(payload)]”/> </flow> </mule> |
### BigQuery is fully-managed, we don’t need to deploy any resources, such as disks and virtual machines. Easy to use with high SLA###
!!!!! Happy Learning !!!!!
References:
- https://www.mulesoft.com/press-center/runtime-fabric-google-cloud-gcp
- https://cloud.google.com/docs/authentication/getting-started
- https://eu1.anypoint.mulesoft.com/exchange/com.mulesoft.connectors/mule-bigquery-connector/
- https://opendoc.gslab.com/bq_release_notes.html
- https://opendoc.gslab.com/bq_user_guide.html
- https://cloud.google.com/bigquery/
- https://opendoc.gslab.com/bq_api_reference.html
- https://www.mulesoft.com/legal/versioning-back-support-policy#anypoint-connectors