Link Search Close Search Menu Expand Document

Spark Pushdown Optimizations

Figure 1: Spark queries with pushdown optimization

Spark supports loading data from different sources (databases, CSV/parquet/ORC files etc). This involves copying the data from data sources into Spark typically over the network.
To optimize this data transfer, Spark has pushdown optimizations which reduce the amount of data to be transferred. We can see different pushdown optimizations below.

Projection pushdown:

If we are fetching many columns of the data, and later if we use only fewer number of columns, Spark will fetch only the used columns. This feature is also known as projection pruning.
We can see in the below example, we fetched all the columns from table city, we selected only name and population to be used later on. Spark optimizes this to fetch only the name and population columns.

Dataset<Row> ds1 = session.sql("select * from h2.citydb.city");
ds1 = ds1.select("name", "population");
ds1.explain();

We can see in the optimized physical plan that ReadSchema has only name and population columns:

*(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@20a46227 [NAME#25,POPULATION#27L] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<NAME:string,POPULATION:bigint>

Predicate pushdown:

Projection pushdown reduces the number of columns to be fetched. Predicate pushdown reduces the number of rows to be fetched from the underlying storage.
Predicate is a condition which is in the where/filter conditions. If we are reducing the number of records by using these conditions, Spark will pushdown this operation to underlying data storage if the data storage supports the operation.
In this example we are filtering the city records by using condition population >= 5000000.

Dataset<Row> ds2 = session.sql("select * from h2.citydb.city where population >= 5000000");
ds2.explain(true);

This condition can be seen in PushedFilters in the optimized physical plan:

*(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@5315210d [NAME#46,COUNTRY#47,POPULATION#48L] PushedAggregates: [], PushedFilters: [IsNotNull(POPULATION), GreaterThanOrEqual(POPULATION,5000000)], PushedGroupby: [], ReadSchema: struct<NAME:string,COUNTRY:string,POPULATION:bigint>

Aggregate and group by pushdown:

Spark will push down operations like group by and aggregations like MIN, MAX, SUM to the underlying storage whenever possible.
In the below example, we have a where condition, a group by operation and aggregation operations:

Dataset<Row> ds3 = session.sql("select country, MAX(population), " +
	"MIN(population) from h2.citydb.city where country in ('China', 'India') group by country");
ds3.explain(true);

The optimized physical plan shows the pushed operations:

Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@779cb140 [COUNTRY#72,MAX(POPULATION)#82L,MIN(POPULATION)#83L] PushedAggregates: [MAX(POPULATION), MIN(POPULATION)], PushedFilters: [In(COUNTRY, [China,India])], PushedGroupby: [COUNTRY], ReadSchema: struct<COUNTRY:string,MAX(POPULATION):bigint,MIN(POPULATION):bigint>

Complete implementation of Spark Pushdown Optimizations Example



Back to top

Copyright © 2020-2021 Gajanan Bhat. All rights reserved.