How Ataccama ONE uses Snowpark data frames to automate profiling


  • Anonymous
  • 0 replies

Ataccama is known and appreciated by our customers for complex, big deployments where massive amounts of data are processed. We were one of the first to support MapReduce and Spark as native pushdown and enable distributed processing across different cluster nodes.

Ataccama’s engine works with the major of enterprise data sources and processes them to provide valuable insight, such as statistics of the data, detection of domains, and data quality.

To provide the best experience for our customers that use Snowflake, we developed a tighter integration between our technologies. Specifically, we use the Snowflake data processing capabilities in a pushdown manner which means that data is processed directly on Snowflake.

The key advantages of this integration are:

  • Lower data transfer costs for the client.
  • Snowflake’s computational capabilities deliver results faster because of reduced IO operations.
  • Customers don’t incur additional costs to maintain external systems for Big Data.
  • Improved security as the data does not leave the system.

To deliver this integration, Ataccama uses Snowpark. With Snowpark’s data frames, users have the ability to transform the shape of the data frames until they are ready to receive results.

 

Example use case

To highlight the integration between Ataccama and Snowflake’s Snowpark, we’ll walk you through a use case with data samples and code samples.

In this use case, we assume a table payments with two columns: value of type integer and currency of type string. Our task is to

  • Calculate the mean of the value column
  • Calculate the mean length of strings in the currency column
  • Determine if currency contains abbreviations. The value is an abbreviation if it contains less than or equal to 3 characters.

To begin, we create a session with a hardcoded configuration in the Snowflake Warehouse. In the real world, there are many variations of this setup. It is possible to use configuration files by calling the method configFile(path) on the SessionBuilder class. Additional authentication methods are explained in the official Snowflake documentation.

Once the session is created, the next step is developing a data frame. There are multiple options for developing the frame. The simplest is to use session.table(”payment”). To specify a more generic SQL query, the expression can be adjusted to session.sql("select * from payment").


Session session = Session.builder()
.config("URL", "https://sampleIdentifier.snowflakecomputing.com")
.config("USER", "SAMPLE_USER")
.config("PASSWORD", "samplePassword")
.config("ROLE", "SAMPLE_ROLE")
.config("WAREHOUSE", "SAMPLE_WH")
.config("DB", "SAMPLE_DB")
.config("SCHEMA", "SAMPLE_SCHEMA")
.create();

try {
DataFrame table = session.table("payments");

// Transforms table data frame to aggregated form.
table = transformDataFrame(table);

// Collects and prints the result.
printResults(table);
} finally {
session.close();
}

Creating a session and data frame

Now that we have created the data frame representing the table, we need to modify the data frame to achieve the desired result.

 

Modifying the data frame

Now the data frame representing the table is successfully created. After this, we need to modify the data frame to get the desired result.

The manipulation of the data frame has two phases:

  1. Appending new columns
  2. Aggregating data

Let’s go through them.

1. Appending new columns

The first step is extending the table data frame with new virtual columns. These new columns are derived from the existing ones and only serve as an intermediate step. The purpose of these virtual columns is to pre-process table values, thus making it easier to reuse the computed value later. The additional benefit of this approach is that Snowpark can reuse the precomputed value multiple times.

New columns can be added by using the following method withColumn(columnName, expression).

This step is used to precompute the detection rule result on a row level. The rule uses the SQL expression length(currency) <= 3 to create a new virtual column called is_abbreviation.

 

2. Aggregating data

The ultimate goal is to extract useful insights from the whole dataset by using aggregation functions.

The example uses several aggregation functions. First, there is the count function that computes the number of total rows. This function is expressed asFunctions.count(Functions.lit(1)).

The other method is the expression count() on a given data frame which executes the SQL count query in the background and returns the result number.

Snowflake stores a cache of this information in its internal table, which provides faster queries. However, additional time is consumed waiting for the result. For this reason we put the count expression into the same data frame with other aggregation functions.

Next, we use the following expression for computing average values: Functions.avg(expression).

Finally, we need to compute the number of rows which satisfy the detection rule. Usually, this is achieved this by leveraging an SQL function count_if; however, no such function is available in Snowpark. Leveraging the expression Functions.callUDF(functionName, parameters...) works for built-in Snowflake functions.

Now we implement the method transformDataFrame to perform all of the above computations. Note that no computation at the Snowflake Data Warehouse is happening at this point since all the data frame modifications happen only in the memory of the process and describe the steps that need to be taken.


private static DataFrame transformDataFrame(DataFrame df) {
// Creates an expression that checks if the given value obeys the abbreviation rule.
Column currencyLength = Functions.length(df.col("currency"));
Column ruleColumn = currencyLength.leq(Functions.lit(3));

// First transformation of the data frame. Here we add new columns to the table.
// The change modifies the dataframe only, no computation is happening at this point.
df = df.withColumn("is_abbreviation", ruleColumn);

// We count the rows of the table. It is also possible to use the call df.count() instead.
// This way we can omit one SQL query to the Warehouse which slightly improves performance.
Column count = Functions.count(Functions.lit(1)).as("total_count");

// Expression for getting an average length for currency. We can reuse the existing currencyLength variable.
Column avgLength = Functions.avg(currencyLength).as("avg_length_currency");

// Expression that computes an average value for the value.
Column avgValue = Functions.avg(df.col("value")).as("avg_value");

// Counts how many rows satisfy our rule.
// Note: Snowpark does not directly contain the function count_if but we can work around that by
// calling Functions.callUDF which allows to specify arbitrary function and parameters.
Column ruleCount = Functions.callUDF("count_if", ruleColumn);

// This is a second transformation of the data frame. The transformation aggregates all the rows in the table
// to one. The change in the data frame still does not compute anything.
return df.select(count, avgLength, avgValue, ruleCount);
}

Modifying the data frame to fit our purposes

There are two approaches to referencing data frame columns.

The first approach in the example uses method col(name) directly on the data frame. This has the benefit that Snowpark checks the existence of the column from within the data frame.

The process is slightly slowed down as Snowpark sends a query to Snowflake for information about the structure of the table (or query). This communication can be avoided using Functions.col(name) instead. The downside is that we lose the automatic check by Snowpark, so if a mistake is made in the name of the column, the error is detected later during the compilation of the query in Snowflake.

Processing the results

Once the data frame is ready, we can finalize the data frame. We use the method collect() for this. Snowpark transforms the data frame into an SQL query and executes it in the Snowflake Warehouse, meaning we are presented with the result only.

Values of each row are returned as objects of type Row. The result values are retrieved using the expression getXXX(pos) where XXX stands for expected data type (e.g. getString or getBoolean etc.). It is good practice to check if the value is null by calling the method isNullAt(pos). Alternatively, the row object can be mapped to a list of java objects using the expression toList().

Accessing the values from the row by column names requires a workaround. The first step is to find the position of the column in the schema of the data frame. To get an ordered list of column names, we use schema().names() on the data frame.

Alternatively, calling schema().iterator() creates a java iterator on the objects that contain information regarding name and data type.

Here is the implementation of the method printResults.


private static void printResults(DataFrame df) {
// All the magic happens here. Snowpark transforms the data frame to SQL, pushes down the query into
// the Snowflake Warehouse and retrieves the result we want.
Row[] collected = df.collect();

if (collected.length != 1) {
String msg = MessageFormat.format(
"Expected exactly 1 record returned from the server, got {0} instead.", collected.length);
throw new IllegalStateException(msg);
}

Row res = collected[0];

// Result retrieval.
long cnt = res.getLong(0);
double avgLength = res.isNullAt(1) ? 0 : res.getDecimal(1).doubleValue();
double avgValue = res.isNullAt(2) ? 0 : res.getDecimal(2).doubleValue();
long ruleCount = res.isNullAt(3) ? 0 : res.getLong(3);
double satisfiedRatio = cnt == 0 ? 0 : (double) ruleCount / cnt;

log.info("===== PROFILE RESULTS OF TABLE PAYMENTS =====");
log.info("Number of rows: {}", cnt);
log.info("Average length of the currency column: {}", avgLength);
log.info("Average value of the value column: {}", avgValue);

// Threshold value is 0.7 (70%)
log.info("===== DOMAIN DETECTION =====");
log.info("Detection threshold: {}%", THRESHOLD * 100);
log.info("Rule abbreviation: The rule is satisfied in {}/{} ({}%) of cases. Detected: {}", ruleCount, cnt,
satisfiedRatio * 100, satisfiedRatio >= THRESHOLD);
}

Providing the results of data processing

 

Running the program

The above program is ready to run against a replica of the sample table within Snowflake with the following results:


[main] INFO com.ataccama.snowpark.example.Main - ===== PROFILE RESULTS OF TABLE PAYMENTS =====
[main] INFO com.ataccama.snowpark.example.Main - Number of rows: 12
[main] INFO com.ataccama.snowpark.example.Main - Average length of the currency column: 4.0
[main] INFO com.ataccama.snowpark.example.Main - Average value of the value column: 118.36333333
[main] INFO com.ataccama.snowpark.example.Main - ===== DOMAIN DETECTION =====
[main] INFO com.ataccama.snowpark.example.Main - Detection threshold: 70.0%
[main] INFO com.ataccama.snowpark.example.Main - Rule abbreviation: The rule is satisfied in 9/12 (75.0%) of cases. Detected: true

The program successfully computed basic statistics on the table columns. The domain abbreviation was successfully detected in the dataset.

The program is available in Ataccama’s public GitHub repository here.

 

Conclusion

In this blog, we explained how Ataccama built its processing engine traditionally, and how this is changed by the new integration with Snowflake computing capabilities.

The finding of the example proves that computing statistics in a pushdown manner is beneficial. Ataccama chose Snowflake’s Snowpark because most of the code is written in java. It demonstrates the usage of Snowflake in Ataccama, assuming a sample table payment where we want to compute basic statistics and check if we can detect a domain. We have achieved the following:

  • The creation of a Snowflake session and a basic data frame
  • The modification of the data frame to get the required results
  • How processing the results works.
  • A successful execution log.

In Ataccama ONE, we don’t require customers to write code manually. In this example, we demonstrated how our backend engine runs our automated and business user-defined rules automatically using Snowpark.

Please reach out to Ondej Marek at ondrej.marek@ataccama.com with further questions, or visit our team at booth #2023 at Snowflake Summit 2022 between June 13 - 16.


2 replies

Userlevel 2
Badge +3

Hello, Thanks for the blog . We have heavy usage of Snowflake tables in our Eco-system and the above content is really helpful. I have few query 

  1. Will this be a default feature in the Ataccama Platform(as long as using Snowflake tables?) and no external configuration required? which release ?
  1. Does that mean, there is no cache of data happens from Snowflake → Ataccama servers during profile/dq rule checks? Asking bcoz i see the storage is problem whenever we use snowflake data with 100Million records(30 columns) and we had to bump the storage size.

 

Hi @Siva_Madhavan ,

We are happy to hear that the blog is helpful for you.
I would like to share these release notes with you.

Hope it helps.

Reply