Skip to content
GitHubDiscordThreads

Azure Data Factory (Data Flow)

Recently I’m working in Azure to implement ETL jobs. The main tool is ADF (Azure Data Factory). This post show some solutions to resolve issue in my work.

Process CSV files and merge different system files into one file

  • Source: CSV files with filename format (abcd_yyyymmdd_uuid.csv), where abcd is system id.
    • a_20180101_9ca2bed1-2ed0-eaeb-8401-784f43755025.csv
    • a_20180101_cca2bed1-aed0-11eb-8401-784f73755025.csv
    • b_20190202_ece2bed1-2ed0-abeb-8401-784f43755025.csv
    • c_20180101_ada2bed1-2ed0-22eb-8401-784f43755025.csv
  • Sink: yyyymmdd.csv
    • 20180101.csv
    • 20190202.csv

Pipeline

  • Input: source directory/parameters
  • Output: metadata of each object

Get Metadata activity iterate source directory to obtain each object. The most important one is Argument Get Metadata

  • Input: output of Get Metadata
  • Output: None

ForEach activity is used to process each object in source direcoty.

Terminal window
@activity('Get Metadata1').output.childItems

It’s convenient to predefine a value used in next step.

The dataflow merge all files with same date, and source1 and sink are the same destination. So, initially source1 is empty and check this options.

The only configuration in Sink is the File name option

The last problem in dataflow is how to merge files with same date in dataflow, which means we firstly find out all these files. The solution to this problems is regex expression.

Generally CSV file has a header and we can process it easily in ADF. However, a special case is a large CSV file has multiple different headers and we need to automatically split it into regular csv files with headers respectively.

  • Sample data:

    h1,h1_col1,h1_col2,h1_col3 h2,h2_col1,h2_col2,h2_col3,h2_col4,h2_col5 h3,h3_col1,h3_col2 h1,h1_row1_1,h1_row1_2,h1_row1_3 h1,h1_row2_1,h1_row2_2,h1_row2_3 h1,h1_row3_1,h1_row3_2,h1_row3_3 h2,h2_row1_1,h2_row1_2,h2_row1_3,h2_row1_4,h2_row1_5 h2,h2_row2_1,h2_row2_2,h2_row2_3,h2_row2_4,h2_row2_5 h2,h2_row3_1,h2_row3_2,h2_row3_3,h2_row3_4,h2_row3_5 h2,h2_row4_1,h2_row4_2,h2_row4_3,h2_row4_4,h2_row4_5 h2,h2_row5_1,h2_row5_2,h2_row5_3,h2_row5_4,h2_row5_5 h3,h3_row1_1,h3_row1_2 h3,h3_row2_1,h3_row2_2

  • Explanation:

    • header format: header name, columns names
    • 3 headers : h1, h2 and h3
    • the 1st column of each row is header name and rest of columns are values
  • Output:

    • h1 file

      h1_col1,h1_col2,h1_col3 h1_row1_1,h1_row1_2,h1_row1_3 h1_row2_1,h1_row2_2,h1_row2_3 h1_row3_1,h1_row3_2,h1_row3_3

    • h2 file

      h2_col1,h2_col2,h2_col3,h2_col4,h2_col5 h2_row1_1,h2_row1_2,h2_row1_3,h2_row1_4,h2_row1_5 h2_row2_1,h2_row2_2,h2_row2_3,h2_row2_4,h2_row2_5 h2_row3_1,h2_row3_2,h2_row3_3,h2_row3_4,h2_row3_5 h2_row4_1,h2_row4_2,h2_row4_3,h2_row4_4,h2_row4_5 h2_row5_1,h2_row5_2,h2_row5_3,h2_row5_4,h2_row5_5

    • h3 file

      h3_col1,h3_col2 h3_row1_1,h3_row1_2 h3_row2_1,h3_row2_2

The dataset used in source and sink must uncheck this

Because no header is in the dataset, ADF automatically assign a column name to each one. The column name format is _colindex_

In this task the header column is _col0_ and we can map this one to another name like filename

This dataflow will automatically split composite CSV file into different files and save them at container root path. To save them at another directory, you can add folder name to the mapping column name in DerivedColumn activity.

We use blob event trigger to implement automation. Once uploading a new file is done, these pipeline will process it automatically. How to create event trigger

Two values in trigger are used by pipeline

  • @triggerBody().folderPath : /container name/folder/
  • @triggerBody().fileName : blob name
import pandas as pd
import csv
df = pd.read_csv('sample.csv', sep='^([^,]+),',engine='python', header=None)
df.drop(df.columns[0], axis=1, inplace=True)
heads = df[df.columns[0]].unique()
d = dict(tuple(df.groupby(df.columns[0])))
for h in heads:
outputfile = d[h]
outputfile.drop(outputfile.columns[0], axis=1, inplace=True)
outputfile.to_csv('{0}.csv'.format(h), sep=' ', index=False, header=False)