This is a
continuation of articles as they appear here for a discussion on the Azure Data Platform:
This article
continues the discussion on copying between source and destination with a focus
on the declaration of such a copy activity in the Azure Data Factory.
A copy
activity in Azure Data Factory that copies from all the buckets under an S3
account to the Azure Data Lake Gen 2 would require including an iteration in
the pipeline logic. For example,
{
"name": "CopyPrjxPodItemsPipeline_23n",
"properties": {
"activities": [
{
"name":
"ForEachItemInPod",
"type":
"ForEach",
"dependsOn": [
{
"activity":
"GetPodContents",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value":
"@activity('GetPodContents').output.childItems",
"type":
"Expression"
},
"isSequential":
true,
"activities": [
{
"name":
"CopyPodItem",
"type":
"Copy",
"dependsOn": [],
"policy":
{
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
{
"name": "preserve",
"value": "Attributes"
}
],
"typeProperties":
{
"source": {
"type": "BinarySource",
"storeSettings": {
"type": "AmazonS3CompatibleReadSettings",
"recursive": true
},
"formatSettings": {
"type": "BinaryReadSettings"
}
},
"sink": {
"type": "BinarySink",
"storeSettings": {
"type":
"AzureBlobFSWriteSettings",
"copyBehavior": "PreserveHierarchy"
}
},
"preserve": [
"Attributes"
],
"enableStaging": false
},
"inputs":
[
{
"referenceName": "SourceDataset_23n",
"type": "DatasetReference",
"parameters": {
"bucketName":
"@item().name"
}
}
],
"outputs": [
{
"referenceName":
"DestinationDataset_23n",
"type": "DatasetReference",
"parameters": {
"bucketName": "@item().name"
}
}
]
}
]
}
},
{
"name":
"GetPodContents",
"type":
"GetMetadata",
"dependsOn": [],
"policy": {
"timeout":
"0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput":
false,
"secureInput":
false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "SourceDataset_prjx",
"type":
"DatasetReference"
},
"fieldList": [
"childItems"
],
"storeSettings":
{
"type":
"AmazonS3CompatibleReadSettings",
"enablePartitionDiscovery": false
},
"formatSettings":
{
"type":
"BinaryReadSettings"
}
}
}
],
"annotations": [],
"lastPublishTime":
"2023-04-25T15:18:34Z"
},
"type":
"Microsoft.DataFactory/factories/pipelines"
}
There are a few things to note about the pipeline logic
above:
1.
It requires source and destination connections as
prerequisite to the copy activity.
2.
The copy activity is inside the forEach loop
3.
The forEach loop gets the item list from the GetPodContents
activity which reads the buckets ta the source
4.
The metadata is asked to be preserved for each
object as it is copied from source to destination.
5.
The iteration is sequential because if all of the
activities were writing to the same location, the original length of the
location might be read differently by each.
6.
The destination happens to be a distributed file
system and preserves the original hierarchy of objects.
No comments:
Post a Comment