U-SQL SearchLog Aggregations as ADF Data Flows

I’ve found that the U-SQL samples are a good starting point for new ADF Mapping Data Flow users. In many cases, U-SQL scripts are meant to achieve many of the same goals including data transformations at scale, flexible schema mapping, and other ETL functions. In this post, I’m going to take the seminal example of aggregating the duration time of visitors to URLs in the Search Logs U-SQL sample that uses a sample TSV file of URL visits.

Please note that as of this writing, Mapping Data Flows are a limited public preview feature in Azure Data Factory (ADF), meaning that you must request for your Azure subscription to be whitelisted in order to light-up the feature in your ADF V2 factories. Click here to request access to ADF Data Flows.

I’ll walk you through the U-SQL steps with corresponding Data Flow steps in ADF. The sample TSV file for this demo is here. The final data flow design should look something like this:

searchlog1

  1. First, map the schema for the SearchLogs.tsv file. In ADF, you’ll notice that the Source and Sink transformations include an option for “Schema Drift” which is an important setting if your data sources need to handle flexible schema changes.
    • U-SQL
      • @searchlog =
        EXTRACT UserId int,
        Start DateTime,
        Region string,
        Query string,
        Duration int?,
        Urls string,
        ClickedUrls string
        FROM "/Samples/Data/SearchLog.tsv"
        USING Extractors.Tsv();
        OUTPUT @searchlog
        TO “/output/SearchLog-first-u-sql.csv”
        USING Outputters.Csv();
    • ADF Mapping Data Flow
      • Add a Source transformation and a Select transformation
      • The source defines the location of the TSV file noting TAB as the delimiter and no header row defined in the dataset
      • You can define data types manually or you can use “Detect Types” to ask ADF to infer data types automatically
      • The Select transformation aliases the headerless columns with the appropriate schema names
  2. Let’s jump to the portion of the sample where region and start date are parsed and used for filtering
    • U-SQL
        • @searchlog =
          @rs1 =
          SELECT Start, Region, Duration
          FROM @searchlog
          WHERE Region == "en-gb";

          @rs1 =
          SELECT Start, Region, Duration
          FROM @rs1
          WHERE Start >= DateTime.Parse("2012/02/16") AND Start <= DateTime.Parse("2012/02/17");

          OUTPUT @rs1
          TO "/output/SearchLog-transform-datetime.csv"
          USING Outputters.Csv();
    • ADF Mapping Data Flow
      • In Data Flow, as the data flows from left to right in your design, you can build filters like the U-SQL Where clause using the Filter transformation with the ADF Data Flow expression language. To achieve the date math, I used this expression:
      • start >= toDate('2012-02-16','yyyy-MM-dd') && start <= toDate('2012-02-17','yyyy-MM-dd')
      • In Debug mode, you can test your expressions to make sure that you are seeing the correct results from your transformationssearchlog4
      • ADF Mapping Data Flow allows you to interactively view your data as you build your transformations and you can also view the execution plans of your runs, similar to the ADLA execution graphmicrosoftteams-image (1)
      • Also similar to ADLA, ADF allows you to set the size of compute that you assign to each ETL jobactivity-data-flow
  3. Next, the U-SQL sample creates a total duration aggregation and filters only the durations that are larger than 200
    • U-SQL
        • @rs1 =
          SELECT
          Region,
          SUM(Duration) AS TotalDuration
          FROM @searchlog
          GROUP BY Region;

          @res =
          SELECT *
          FROM @rs1
          ORDER BY TotalDuration DESC
          FETCH 5 ROWS;
          Mcode>@res =
          SELECT
          Region,
          SUM(Duration) AS TotalDuration
          FROM @searchlog
          GROUP BY Region
          HAVING SUM(Duration) > 200;
    • ADF Mapping Data Flow
      • I use the Aggregate transformation in ADF to Group By the region and use the SUM expression on duration to create a new aggregated field in my data called “TotalDuration”
      • To achieve the filtering, I again use the Filter transform with this expression:totalduration > 200
      • Let’s check our results in Debugsearchlog5
  4. Now that we’ve completed the transformations, I use the final step in my Data Flow to land the data directly into Azure Data Warehouse by using the “Auto Map” feature with Schema Drift turned on to automatically land whatever schema I receive from the input files into my data warehouse table.
Advertisement

One comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s