Wednesday, May 20, 2020

Salesforce Einstein Analytics Dataflow to Deduplicate a Dataset

Let's say you have the following data:

Key,Value,Timestamp
K1,V1,2020-05-01
K1,V2,2020-05-02
K1,V3,2020-05-03
K1,V4,2020-05-04
K2,V1,2020-05-04
K2,V2,2020-05-04
K2,V3,2020-05-02
K2,V4,2020-05-01
K3,V4,2020-05-01
K3,V3,2020-05-02
K3,V2,2020-05-03
K3,V1,2020-05-04
K4,V4,2020-05-04
K4,V3,2020-05-03
K4,V2,2020-05-02
K4,V1,2020-05-01
K5,V1,2020-05-01
K5,V2,2020-05-01
K5,V3,2020-05-01
K5,V4,2020-05-01

Further, let's say that you want to remove duplicate rows (based on the Key) and keep the latest row (based on the Timestamp), with the additional complexity of there being duplicate Timestamps.

Your target result would consist of the following, reduced data:

Key,Value,Timestamp
K1,V4,2020-05-04
K2,V2,2020-05-04
K3,V1,2020-05-04
K4,V4,2020-05-04
K5,V4,2020-05-01

To achieve this result, you can generalize the following approach, in reference to the sample dataflow included in its entirety at the bottom of this post.

  1. Get the dataset ;-)
  2. computeRelative (computePartitionCounter)
    • Partition by the Key
    • Order by the Timestamp (ascending)
    • Compute a partitionCounter column (number, default = 1)
      • saqlExpression: previous(partitionCounter) + 1
  3. computeRelative (computeKeepRow)
    • Partition by the Key
    • Order by the partitionCounter (descending)
      • Note that weather you sort by descending timestamp and then ascending partitionCounter, or vice versa (as I have done here) is immaterial
    • Compute a keepRow column (text)
      • saqlExpresion: case when current(partitionCounter) == first(partitionCounter) then "true" else "false" end
        • Note that in the case of K2 and K5, since some or all of the rows have the same Timestamp, keepRow will be "true" for one of them, arbitrarily, more than likely based on any internal ordering with Einstein Analytics. 
  4. Filter by keepRow == "true"
  5. Slice off parititionCounter and keepRow
  6. Register the de-duplicated dataset
In my example, I went from this...

 

...to this...


Cheers! 


Dataflow:

{
  "getDataset": {
    "action": "edgemart",
    "parameters": {
      "alias": "KeyValueTimestamp"
    }
  },
  "computePartitionCounter": {
    "action": "computeRelative",
    "parameters": {
      "source": "getDataset",
      "computedFields": [
        {
          "name": "partitionCounter",
          "label": "partitionCounter",
          "expression": {
            "saqlExpression": "previous(partitionCounter) + 1",
            "type": "Numeric",
            "scale": 2,
            "default": "1"
          }
        }
      ],
      "orderBy": [
        {
          "name": "Timestamp",
          "direction": "asc"
        }
      ],
      "partitionBy": [
        "Key"
      ]
    }
  },
  "computeKeepRow": {
    "action": "computeRelative",
    "parameters": {
      "source": "computePartitionCounter",
      "computedFields": [
        {
          "name": "keepRow",
          "label": "keepRow",
          "expression": {
            "saqlExpression": "case when current(partitionCounter) == first(partitionCounter) then \"true\" else \"false\" end",
            "type": "Text"
          }
        }
      ],
      "orderBy": [
        {
          "name": "partitionCounter",
          "direction": "desc"
        }
      ],
      "partitionBy": [
        "Key"
      ]
    }
  },
  "filterDataset": {
    "action": "filter",
    "parameters": {
      "source": "computeKeepRow",
      "saqlFilter": "keepRow == \"true\""
    }
  },
  "sliceDataset": {
    "action": "sliceDataset",
    "parameters": {
      "mode": "drop",
      "fields": [
        {
          "name": "keepRow"
        },
        {
          "name": "partitionCounter"
        }
      ],
      "source": "filterDataset"
    }
  },
  "registerDataset": {
    "action": "sfdcRegister",
    "parameters": {
      "alias": "KeyValueTimestampDeduplicated",
      "name": "KeyValueTimestampDeduplicated",
      "source": "sliceDataset"
    }
  }
}

No comments:

Post a Comment