Here are a few points in terms of how I have settings:
- I have CSV files uploaded to S3 and install an adhesive finder to create a table and diagram.
- I have a glue job setup that writes data from a Glue table to our Amazon Redshift database using a JDBC connection. The job is also responsible for matching columns and creating a redshift table.
By re-running the task, I get duplicate lines at redshift (as expected). However, is there a way to replace or delete rows before inserting new data using a key or setting sections in the glue?
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from awsglue.transforms import SelectFields from pyspark.sql.functions import lit ## @params: [TempDir, JOB_NAME] args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) columnMapping = [ ("id", "int", "id", "int"), ("name", "string", "name", "string"), ] datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0") applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1") resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1") dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1") df1 = dropnullfields1.toDF() data1 = df1.withColumn('platform', lit('test')) data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1") ## Write data to redshift datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") job.commit()
jdbc amazon-web-services pyspark aws-glue
krchun
source share