PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0

Sunday, October 23, 2022

[FIXED] How to convert SQL UPDATE query to PySpark (tables to dataframes)

 October 23, 2022     apache-spark, apache-spark-sql, dataframe, pyspark, sql-update     No comments   

Issue

I have this UPDATE SQL query that I need to convert to PySpark to work with dataframes. I'd like to know if it's possible to do it with dataframes and how to do it.

The SQL query:

UPDATE TBL1
SET COL_C=1
FROM TBL1
INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B
INNER JOIN TBL3 ON TBL2.COL_A=TBL3.COL_A AND TBL2.COL_B=TBL3.COL_B
    df_TBL1=TBL1
+-------+--------+----------+------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C|
+-------+--------+----------+------+-----+
|  James|   Smith|1991-04-01|     M| 3000|
|Michael|    Rose|2000-05-19|     M| 4000|
| Robert|Williams|1978-09-05|     M| 4000|
|  Maria|   Jones|1967-12-01|     F| 4000|
|    Jen|   Brown|1980-02-17|     F| 1000|
+-------+--------+----------+------+-----+
    df_TBL2=TBL2
+-------+---------+----------+------+-----+
|  COL_A|    COL_B|       dob|gender|COL_C|
+-------+---------+----------+------+-----+
|   John|     Snow|1791-04-01|     M| 9000|
|Michael|     Rose|2000-05-19|     M| 4000|
| Robert|Baratheon|1778-09-05|     M| 9500|
|  Maria|    Jones|1967-12-01|     F| 4000|
+-------+---------+----------+------+-----+
    df_TBL3=TBL3 
+--------+------+----------+------+-----+
|   COL_A| COL_B|       dob|gender|COL_C|
+--------+------+----------+------+-----+
| Michael|  Rose|2000-05-19|     M| 4000|
|   Peter|Parker|1978-09-05|     M| 4000|
|   Maria| Jones|1967-12-01|     F| 4000|
|MaryJane| Brown|1980-12-17|     F|10000|
+--------+------+----------+------+-----+

The joins give me:

df_TBL_ALL=df_TBL1 \
.join(df_TBL2,(df_TBL1.COL_A==df_TBL2.COL_A) & (df_TBL1.COL_B==df_TBL2.COL_B),how="inner") \
.join(df_TBL3,(df_TBL2.COL_A==df_TBL3.COL_A) & (df_TBL2.COL_B==df_TBL3.COL_B),how="inner") \
.select(df_TBL1["*"]) \
.withColumn("COL_C",spf.lit(1))

And then, I'm trying to join them

df_TBL1_JOINED=df_TBL1 \
.join(df_TBL_ALL,(df_TBL1.COL_A==df_TBL_ALL.COL_A) & (df_TBL1.COL_B==df_TBL_ALL.COL_B),how="left") \
.select(df_TBL1["*"], \
        spf.coalesce(df_TBL_ALL.COL_C,df_TBL1.COL_C).alias("COL_C"))

df_TBL1_JOINED.show()
# +-------+--------+----------+------+-----+-----+
# |  COL_A|   COL_B|       dob|gender|COL_C|COL_C|
# +-------+--------+----------+------+-----+-----+
# |  James|   Smith|1991-04-01|     M| 3000| 3000|
# |    Jen|   Brown|1980-02-17|     F| 1000| 1000|
# |  Maria|   Jones|1967-12-01|     F| 4000|    1|
# |Michael|    Rose|2000-05-19|     M| 4000|    1|
# | Robert|Williams|1978-09-05|     M| 4000| 4000|
# +-------+--------+----------+------+-----+-----+

But I'm confused about how to go on.

I did:

TBL01_R=TBL01_R \
.drop("COL_C")

TBL01_R=TBL01_R \
.withColumnRenamed("COL_Nova","COL_C").show()

TBL01=TBL01_R

# +-------+--------+----------+------+-----+
# |  COL_A|   COL_B|       dob|gender|COL_C|
# +-------+--------+----------+------+-----+
# |  James|   Smith|1991-04-01|     M| 3000|
# |    Jen|   Brown|1980-02-17|     F| 1000|
# |  Maria|   Jones|1967-12-01|     F|    1|
# |Michael|    Rose|2000-05-19|     M|    1|
# | Robert|Williams|1978-09-05|     M| 4000|
# +-------+--------+----------+------+-----+

I got to the expected result but I don't know if it is the best performing way to achieve it.

Expected result: df_tbl1 with COL_C updated with a 1 in all rows present in the join of df_tbl1 with df_tbl2 and df_tbl3.

df_TBL1:

+-------+--------+----------+------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C|
+-------+--------+----------+------+-----+
|  James|   Smith|1991-04-01|     M| 3000|
|Michael|    Rose|2000-05-19|     M|    1|
| Robert|Williams|1978-09-05|     M| 4000|
|  Maria|   Jones|1967-12-01|     F|    1|
|    Jen|   Brown|1980-02-17|     F| 1000|
+-------+--------+----------+------+-----+

Solution

I tried to do a concise and performant option. The following does just 2 necessary joins avoiding the last inner join which you used in your question.

from pyspark.sql import functions as F

updating = F.forall(F.array('t2', 't3'), lambda x: x)
df_TBL1 = (
    df_TBL1.withColumnRenamed('COL_C', 'COL_C_old').alias('T1')
    .join(df_TBL2.withColumn('t2', F.lit(True)), ['COL_A', 'COL_B'], 'left')
    .join(df_TBL3.withColumn('t3', F.lit(True)), ['COL_A', 'COL_B'], 'left')
    .withColumn('updated_c', F.when(updating, 1).otherwise(F.col('COL_C_old')))
    .select('T1.*', F.col('updated_c').alias('COL_C'))
    .drop('COL_C_old')
)
df_TBL1.show()
# +-------+--------+----------+------+-----+
# |  COL_A|   COL_B|       dob|gender|COL_C|
# +-------+--------+----------+------+-----+
# |  James|   Smith|1991-04-01|     M| 3000|
# |    Jen|   Brown|1980-02-17|     F| 1000|
# |  Maria|   Jones|1967-12-01|     F|    1|
# |Michael|    Rose|2000-05-19|     M|    1|
# | Robert|Williams|1978-09-05|     M| 4000|
# +-------+--------+----------+------+-----+

How the update works, line-by-line

First, you join the 3 tables together based on COL_A and COL_B. But all with slight modifications: df_TBL1 has renamed COL_C and added an alias 'T1' (another name for better accessing the table); df_TBL2 and df_TBL3 each have one additional column 't2' and 't3' respectively, which always is True (after joining, they will indicate that those rows exist in those tables).

.withColumnRenamed('COL_C', 'COL_C_old').alias('T1')
+-------+--------+----------+------+---------+
|  COL_A|   COL_B|       dob|gender|COL_C_old|
+-------+--------+----------+------+---------+
|  James|   Smith|1991-04-01|     M|     3000|
|Michael|    Rose|2000-05-19|     M|     4000|
| Robert|Williams|1978-09-05|     M|     4000|
|  Maria|   Jones|1967-12-01|     F|     4000|
|    Jen|   Brown|1980-02-17|     F|     1000|
+-------+--------+----------+------+---------+
.join(df_TBL2.withColumn('t2', F.lit(True)), ['COL_A', 'COL_B'], 'left')
+-------+--------+----------+------+---------+----------+------+-----+----+
|  COL_A|   COL_B|       dob|gender|COL_C_old|       dob|gender|COL_C|  t2|
+-------+--------+----------+------+---------+----------+------+-----+----+
|  James|   Smith|1991-04-01|     M|     3000|      null|  null| null|null|
|    Jen|   Brown|1980-02-17|     F|     1000|      null|  null| null|null|
|  Maria|   Jones|1967-12-01|     F|     4000|1967-12-01|     F| 4000|true|
|Michael|    Rose|2000-05-19|     M|     4000|2000-05-19|     M| 4000|true|
| Robert|Williams|1978-09-05|     M|     4000|      null|  null| null|null|
+-------+--------+----------+------+---------+----------+------+-----+----+
.join(df_TBL3.withColumn('t3', F.lit(True)), ['COL_A', 'COL_B'], 'left')
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+
|  COL_A|   COL_B|       dob|gender|COL_C_old|       dob|gender|COL_C|  t2|       dob|gender|COL_C|  t3|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+
|  James|   Smith|1991-04-01|     M|     3000|      null|  null| null|null|      null|  null| null|null|
|    Jen|   Brown|1980-02-17|     F|     1000|      null|  null| null|null|      null|  null| null|null|
|  Maria|   Jones|1967-12-01|     F|     4000|1967-12-01|     F| 4000|true|1967-12-01|     F| 4000|true|
|Michael|    Rose|2000-05-19|     M|     4000|2000-05-19|     M| 4000|true|2000-05-19|     M| 4000|true|
| Robert|Williams|1978-09-05|     M|     4000|      null|  null| null|null|      null|  null| null|null|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+

After the join, you have withColumn which creates one additional column named 'updated_c': if you have True in BOTH 't2' and 't3' columns, you write 1 in 'updated_c', otherwise you take the value from 'COL_C_old'. forall does the check in the array if ALL its values conform to the lambda function. The array is created using function array from columns 't2' and 't3'. lambda x: x just checks if the value is True (if row existed in df_TBL2 or df_TBL3, values would be True, if not, they would be null - we need only True).

updating = F.forall(F.array('t2', 't3'), lambda x: x)
.withColumn('updated_c', F.when(updating, 1).otherwise(F.col('COL_C_old')))
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+---------+
|  COL_A|   COL_B|       dob|gender|COL_C_old|       dob|gender|COL_C|  t2|       dob|gender|COL_C|  t3|updated_c|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+---------+
|  James|   Smith|1991-04-01|     M|     3000|      null|  null| null|null|      null|  null| null|null|     3000|
|    Jen|   Brown|1980-02-17|     F|     1000|      null|  null| null|null|      null|  null| null|null|     1000|
|  Maria|   Jones|1967-12-01|     F|     4000|1967-12-01|     F| 4000|true|1967-12-01|     F| 4000|true|        1|
|Michael|    Rose|2000-05-19|     M|     4000|2000-05-19|     M| 4000|true|2000-05-19|     M| 4000|true|        1|
| Robert|Williams|1978-09-05|     M|     4000|      null|  null| null|null|      null|  null| null|null|     4000|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+---------+

Then you select all the columns from df_TBL1 using its alias name 'T1.*' (it also takes 'COL_C_old'), and add one more column: 'updated_c', renaming it to 'COL_C'.

.select('T1.*', F.col('updated_c').alias('COL_C'))
+-------+--------+----------+------+---------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C_old|COL_C|
+-------+--------+----------+------+---------+-----+
|  James|   Smith|1991-04-01|     M|     3000| 3000|
|    Jen|   Brown|1980-02-17|     F|     1000| 1000|
|  Maria|   Jones|1967-12-01|     F|     4000|    1|
|Michael|    Rose|2000-05-19|     M|     4000|    1|
| Robert|Williams|1978-09-05|     M|     4000| 4000|
+-------+--------+----------+------+---------+-----+

Finally, drop unnecessary 'COL_C_old'.

.drop('COL_C_old')
+-------+--------+----------+------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C|
+-------+--------+----------+------+-----+
|  James|   Smith|1991-04-01|     M| 3000|
|    Jen|   Brown|1980-02-17|     F| 1000|
|  Maria|   Jones|1967-12-01|     F|    1|
|Michael|    Rose|2000-05-19|     M|    1|
| Robert|Williams|1978-09-05|     M| 4000|
+-------+--------+----------+------+-----+


Answered By - ZygD
Answer Checked By - Willingham (PHPFixing Volunteer)
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Newer Post Older Post Home

0 Comments:

Post a Comment

Note: Only a member of this blog may post a comment.

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
Comments
Atom
Comments

Copyright © PHPFixing