sope-etl

YAML Transformer:

The YAML Transformer reads a yaml file and executes the transformations defined in the file at runtime (during spark-submit). You will find the transformer useful for developing monotonous and lengthy transformation pipelines by using an external YAML file, instead of writing scala/java files. Also, the transformer provides high-level abstractions for commonly used processing patterns. Following are some useful features provided by the transformer:

The details about the Transformer constructs are provided in the following document: Transformer Constructs

Transformer Modes:

The transformer supports both end-to-end and intermediate mode.

Optimizations:

The Yaml Transformer will try to figure out if there are any transformations that are being reused and persist them using MEMORY_ONLY mode. This may be useful if you do not want to explicitly tag the transformation for persistence and let the transformer decide on it.

Also, if the transformation to be persisted is being used for multiple joins, it will be pre-partitioned on the join columns that are involved in most joins. This feature is enabled by default. To deactivate auto-persist set the ‘auto_persist’ option to false through the utility shell.

Streaming support:

The Transformer also supports Structured streaming mode. The input and output specification for streaming mode can be found in the transformer constructs page. Following shows a sample use of streaming mode:

   inputs:
      - {type: csv, alias: streaming_input, path: "hdfs://streaming-folder", options: {header: true}, is_streaming: true, schema_file: csv_schema.yaml}
   transformations :
      - input: streaming_input
        alias: grouped_event_time
        actions:
         - {type: watermark, event_time: time, delay_threshold: "24 hours"}
         - {type: group_by, columns: ["window(time, '1 minutes')", age], expr: "count(*)"}
   outputs:
      - {type: custom, input: grouped_event_time, is_streaming: true, format: console, output_mode: complete}

Templates:

The Transformer supports a ‘yaml’ action construct which can be used to call another yaml. It also supports substitution to drive the templates using dynamic values. There are some etl specific templates that are provided for reference:

Custom User Defined Functions:

An interface ‘com.sope.etl.register.UDFRegistration’ is provided for registering custom UDFs for use in the Transformer. You can create a jar for custom UDFs and register with Transformer using ‘custom_udfs_class’ option on the utility shell. There is also a way to provide Scala UDFs in the YAML file itself. Following sample shows how to provide dynamic UDFs:

    udfs: {
      custom_upper: "(desc: String) =>  if(desc.startsWith("P")) desc.toUpperCase else desc"
      }
    inputs:
        - {type: hive, alias: product, db: dim, table: product}
        - {type: hive, alias: date, db: dim, table: date}
        - {type: hive, alias: transactions, db: stage, table: transactions}
    transformations :
        - description: "SQL can be used for simple transformations"
          input: product
          alias: product_filtered
          persist: memory_only
          sql: "select product_id, custom_upper(product_desc) as product from product where product_desc != 'N.A.'"
          ....

Example invocation for registering custom udfs jar:

./sope-spark-submit.sh  --yaml_folders "/home/user/yaml-files" --main_yaml_file demo.yaml --cluster_mode true  --custom_udfs_class=com.custom.CustomUDFs --name sope_etl_demo --master yarn --jars custom_udf.jar

Custom Transformation:

If there is a need to call some complex/pre-built logic developed in using Scala/Java SparkSQl API’s, they can be integrated by calling the ‘named_transform’ action construct. For integration, you need to implement the ‘com.sope.etl.register.TransformationRegistration’ trait and define the ‘registerTransformations’ which returns a Map of transformation name and transformation function (DataFrame*) => DataFrame. The jar needs to be added to Spark’s classpath and the class is to be registered using the utility shell option ‘custom_transformations_class’

Example invocation for registering transformations jar:

./sope-spark-submit.sh  --yaml_folders "/home/user/yaml-files" --main_yaml_file demo.yaml --cluster_mode true  --custom_transformations_class=com.custom.CustomTranformations --name sope_etl_demo --master yarn --jars custom_transformations.jar

Testing mode:

The testing mode allows sampling of data for all sources at once, without the need to edit the Yaml transformation file. This is helpful if you need to test the transformation pipeline on a small subset of data. To enable testing mode, set the ‘testing_mode’ during invocation using the utility shell. You can also control the data fraction using the ‘testing_data_fraction’ option (Optional, if not provided defaults to 0.1).

Example:

./sope-spark-submit.sh  --yaml_folders "/home/user/yaml-files" --main_yaml_file demo.yaml --cluster_mode true  --testing_mode=true --testing_data_fraction=0.2 --name sope_etl_demo --master yarn