Issue
I am coding a DAG and want to execute an UPDATE
statement to selectively set the values of certain fields in certain rows. The SQL statement is easy, but I am not sure how to execute it via Airflow.
The documentation on BigQueryUpdateTableOperator
here says that the entire dataset will be replaced. I tried searching for a long time and could not find the right operator.
I tried putting an UPDATE
statement in BigQueryInsertJobOperator
and that threw an error.
How do I execute an UPDATE
query on BigQuery via Airflow? My DAG is within a GCP Composer environment.
Solution
I used BigQueryInsertJobOperator
and was able to use UPDATE
statement by storing it in an SQL file and then calling the sql file in the query
parameter.
Please see below code I used in my testing:
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
dag_id = "update-dag"
my_final_taskid = 'update-bq'
sql_file = 'my-query.sql'
with models.DAG(
dag_id,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
update_bq_table = BigQueryInsertJobOperator(
task_id=my_final_taskid,
configuration={
"query": {
"query": sql_file,
"useLegacySql": False,
}
},
)
Content of my my-query.sql
:
update your-dataset.your-table set your_column = 'string' where another_column = 'string';
Answered By - Scott B Answer Checked By - David Goodson (PHPFixing Volunteer)
0 Comments:
Post a Comment
Note: Only a member of this blog may post a comment.