AWS Adhesive for Redshift: is it possible to replace, update or delete data?

Here are a few points in terms of how I have settings:

  • I have CSV files uploaded to S3 and install an adhesive finder to create a table and diagram.
  • I have a glue job setup that writes data from a Glue table to our Amazon Redshift database using a JDBC connection. The job is also responsible for matching columns and creating a redshift table.

By re-running the task, I get duplicate lines at redshift (as expected). However, is there a way to replace or delete rows before inserting new data using a key or setting sections in the glue?

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from awsglue.transforms import SelectFields from pyspark.sql.functions import lit ## @params: [TempDir, JOB_NAME] args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) columnMapping = [ ("id", "int", "id", "int"), ("name", "string", "name", "string"), ] datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0") applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1") resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1") dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1") df1 = dropnullfields1.toDF() data1 = df1.withColumn('platform', lit('test')) data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1") ## Write data to redshift datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") job.commit() 
+21
jdbc amazon-web-services pyspark aws-glue
source share
8 answers

This was the solution I got from AWS Glue support:

As you know, although you can create primary keys, Redshift does not provide uniqueness. Therefore, if you repeat Glue jobs, then duplicate rows can be inserted. Some of the ways to preserve uniqueness are:

  • Use the staging table to insert all the rows and then do upsert / merge [1] in the main table, this needs to be done outside the glue.

  • Add another column to the redshift table [1], such as the insert timestamp, to duplicate, but know which one came first or last, and then delete the duplicate after that if you need to.

  • Load previously inserted data into a dataframe, and then compare the data to insert to avoid duplicate insertion [3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html and http://www.silota.com/blog/amazon-redshift-upsert-support- staging-table-replace-rows /

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

+4
source share

Job bookmarks are the key. Just edit the work and turn on “Bookmarks of work”, and it will not process already processed data. Please note that the task needs to be re-run once before it discovers that it does not need to re-process the old data.

For more information, see: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html.

The name "bookmark", in my opinion, is a little far-fetched. I would never have looked at this if I hadn’t accidentally stumbled upon it during my searches.

+14
source share

Today I tested and got a workaround for updating / deleting from the target table using the JDBC connection.

I used as below

 import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import pg8000 args = getResolvedOptions(sys.argv, [ 'JOB_NAME', 'PW', 'HOST', 'USER', 'DB' ]) # ... # Create Spark & Glue context sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # ... config_port = **** conn = pg8000.connect( database=args['DB'], user=args['USER'], password=args['PW'], host=args['HOST'], port=config_port ) query = "UPDATE table .....;" cur = conn.cursor() cur.execute(query) conn.commit() cur.close() query1 = "DELETE AAA FROM AAA A, BBB B WHERE A.id = B.id" cur1 = conn.cursor() cur1.execute(query1) conn.commit() cur1.close() conn.close() 
+2
source share

The bookmark function of bookmarks in the glue should do the trick, as suggested above. I used it successfully when my source is S3. http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

0
source share

Impossible, unfortunately, now! I had a similar use case and tested with AWS support. They asked me to run the s3 COPY script or temp stage / delete / insert from a standalone machine.

So yes, its impossible!

0
source share

According to my testing (with the same scenario), the BOOKMARK functionality does not work. Duplicate data is inserted when the task is run multiple times. I solved this problem by deleting files daily from S3 location (via lambda) and applying Staging & Target tables. data will be inserted / updated based on the corresponding key columns.

0
source share

Please check this answer. There is an explanation and sample code on how to insert data into Redshift using a staging table. You can use the same approach to run any SQL query before or after Glue writes data using the preactions and postactions :

 // Write data to staging table in Redshift glueContext.getJDBCSink( catalogConnection = "redshift-glue-connections-test", options = JsonOptions(Map( "database" -> "conndb", "dbtable" -> staging, "overwrite" -> "true", "preactions" -> "<another SQL queries>", "postactions" -> "<some SQL queries>" )), redshiftTmpDir = tempDir, transformationContext = "redshift-output" ).writeDynamicFrame(datasetDf) 
0
source share

I just tested the functionality of "Bookmark Bookmark" in Glue and realized that it does not prevent duplicate entries when the same task runs more than once. Is there an easier and easier way to upload files from S3 to Redshift via Glue?

0
source share

All Articles