ADF Mapping Data Flows: Iterate multiple files with Source Transformation

Azure Data Factory has a number of different options to filter files and folders in Azure and then process those files in a pipeline. You can use the pipeline iterator ForEach in conjunction with a Get Metadata activity, for example:

wildcards2

But when you are processing large numbers of files using Mapping Data Flows, the best practice is to instead simplify the pipeline with a single Execute Data Flow activity and let the Source Transformation inside of the Data Flow handle iterating over several files:

wildcards1

The reason that this works better inside data flow in ADF is that each request for a Data Flow execution will spin-up an Azure Databricks environment. So, if you were to attempt to process a folder in Azure Blob Store with 100 files, one file at a time, inside of a ForEach, you would ask ADF to stand-up and tear-down the execution environment 100 times in a JIT manner, which is not very efficient.

Instead, use a generic Blob or ADLS folder dataset inside of your Source Transformation that points simply to your container, leave the folder and file settings blank:

wildcards5

Now, you can use a combination of the wildcard, path, and parameters feature in the Data Flow source transformation to pick the folders and files you wish to process. ADF will process each file that matches your settings inside a single Data Flow execution, which will perform much better:

wildcards

In the example above, I am processing every file that ends in “.txt” in my “DateFiles” directory that sits in my container “mycontainer” and has the date that I’ve set in the parameter called “$mydate”.

I’ve set the value of my parameter to “201907” to match the naming convention of this folder and file set:

wildcards3

Now, when I execute this data flow from my pipeline, all files that include that value that I’ve sent into my data flow through the “mydate” parameter will get processed. For this example, I have a single row in 3 files for July 2019 and so you can see that ADF uses the Data Flow Source transformation to take the rows from each matching file and processes them in a single execution:

wildcards4

If you’d like to keep track of the source lineage of each record, enter a new column name in the “Column to store file name” field in Source Options. Another common pattern is to chose “Move” or “Delete” as “After Completion” action so that ADF can clean up your processed files when the process is completed.

By leveraging the parameter capability above, you can see how each time you call this single Execute Data Flow activity, you can send in a different date value to filter from your Blob Store container / folder path.

8 comments

  1. Yeah I tired that , my Data lake folder is set up like this ‘MyFolder/2020/11′ this have bunch of CSV files underneath, I created two Parameters $WildcardPathYear = toString(currentUTC() ,’yyyy’) and $WilcardPathMonth = toString(currentUTC(),’MM’) . My Source option Wildcard Path is set up like this = ‘MyFolder/*’+$WildcardPathYear+’/’+$WildcardPathMonth+’/’+’*.csv’ , its throwing an error below with my first file name , any idea what I am missing ??

    • What is the error? What you can do is enter your expression in a Derived Column just for the purpose of testing that the path is being interpreted correctly. Use the data preview to look at the results.

  2. Hi Mark!
    I want to follow your recomendation to use the loop inside DataFlows, but with drifting schema.
    That somehow does not work with “Drifting” checked when the files in the folder have different structure, gives an error “The schema of file ‘…….’ is different compared with others!”
    How can I force to sequential loop the filelist?

    • A single Source transformation cannot handle different schemas. Schema Drift is built to handle evolving schemas or schema patterns over time, i.e. subsequent iterations of that same data flow can handle files with evolving schemas. We are working on enabling this scenario natively in ADF, but today the solution is this:

      Iterate over the file list in a for each activity in the pipeline and use a parameterized dataset where you set the file name to the file iterator from the for each. This way, each iteration of the data flow can be a different schema and then this will work.

      • I see, thank you.
        I was taking this approach as you mentioned to loop inside and not making a cluster run for each single file…
        Maybe we see in the future more… 😉

  3. Hi Mark, Do you know how can i read list of .pdf file from blob storage and merge then in one single file

  4. Within Mapping data flow (trying to avoid Foreach)

    Is this possible with a structure like this container/directory/File

    Land/’Folder1/20210815’/a.csv
    Land/’Folder1/20210816’/b.csv
    Land/’Folder2/20210815’/c.csv

    I want to load Folder1 and subfiles and sink them to Base in the data lake

    Base/’Folder1/20210815’/a.csv
    Base/’Folder1/20210816’/b.csv

    In dataflow, I will add one derived column which is loaddate.

    All files contain the same csv columns structure column1,column2,column3,loaddate

    I do not want to specify a date basically bulk load any csv under a subfolder

Leave a Reply to Dwain Goulbourne Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s