Wednesday, April 26, 2023

This is a continuation of articles as they appear here for a discussion on the Azure Data Platform: 

This article continues the discussion on copying between source and destination with a focus on the declaration of such a copy activity in the Azure Data Factory.

 

A copy activity in Azure Data Factory that copies from all the buckets under an S3 account to the Azure Data Lake Gen 2 would require including an iteration in the pipeline logic. For example,

{

    "name": "CopyPrjxPodItemsPipeline_23n",

    "properties": {

        "activities": [

            {

                "name": "ForEachItemInPod",

                "type": "ForEach",

                "dependsOn": [

                    {

                        "activity": "GetPodContents",

                        "dependencyConditions": [

                            "Succeeded"

                        ]

                    }

                ],

                "userProperties": [],

                "typeProperties": {

                    "items": {

                        "value": "@activity('GetPodContents').output.childItems",

                        "type": "Expression"

                    },

                    "isSequential": true,

                    "activities": [

                        {

                            "name": "CopyPodItem",

                            "type": "Copy",

                            "dependsOn": [],

                            "policy": {

                                "timeout": "0.12:00:00",

                                "retry": 0,

                                "retryIntervalInSeconds": 30,

                                "secureOutput": false,

                                "secureInput": false

                            },

                            "userProperties": [

                                {

                                    "name": "preserve",

                                    "value": "Attributes"

                                }

                            ],

                            "typeProperties": {

                                "source": {

                                    "type": "BinarySource",

                                    "storeSettings": {

                                        "type": "AmazonS3CompatibleReadSettings",

                                        "recursive": true

                                    },

                                    "formatSettings": {

                                        "type": "BinaryReadSettings"

                                    }

                                },

                                "sink": {

                                    "type": "BinarySink",

                                    "storeSettings": {

                                        "type": "AzureBlobFSWriteSettings",

                                        "copyBehavior": "PreserveHierarchy"

                                    }

                                },

                                "preserve": [

                                        "Attributes"

                                ],

                                "enableStaging": false

                            },

                            "inputs": [

                                {

                                    "referenceName": "SourceDataset_23n",

                                    "type": "DatasetReference",

                                    "parameters": {

                                        "bucketName": "@item().name"

                                    }

                                }

                            ],

                            "outputs": [

                                {

                                    "referenceName": "DestinationDataset_23n",

                                    "type": "DatasetReference",

                                    "parameters": {

                                        "bucketName": "@item().name"

                                    }

                                }

                            ]

                        }

                    ]

                }

            },

            {

                "name": "GetPodContents",

                "type": "GetMetadata",

                "dependsOn": [],

                "policy": {

                    "timeout": "0.12:00:00",

                    "retry": 0,

                    "retryIntervalInSeconds": 30,

                    "secureOutput": false,

                    "secureInput": false

                },

                "userProperties": [],

                "typeProperties": {

                    "dataset": {

                        "referenceName": "SourceDataset_prjx",

                        "type": "DatasetReference"

                    },

                    "fieldList": [

                        "childItems"

                    ],

                    "storeSettings": {

                        "type": "AmazonS3CompatibleReadSettings",

                        "enablePartitionDiscovery": false

                    },

                    "formatSettings": {

                        "type": "BinaryReadSettings"

                    }

                }

            }

        ],

        "annotations": [],

        "lastPublishTime": "2023-04-25T15:18:34Z"

    },

    "type": "Microsoft.DataFactory/factories/pipelines"

}

There are a few things to note about the pipeline logic above:

1.       It requires source and destination connections as prerequisite to the copy activity.

2.       The copy activity is inside the forEach loop

3.       The forEach loop gets the item list from the GetPodContents activity which reads the buckets ta the source

4.       The metadata is asked to be preserved for each object as it is copied from source to destination.

5.       The iteration is sequential because if all of the activities were writing to the same location, the original length of the location might be read differently by each.

6.       The destination happens to be a distributed file system and preserves the original hierarchy of objects.


No comments:

Post a Comment