While working with pyspark dataframes, we often need to filter rows based on different criteria. In this article, we will discuss different ways to filter rows in a pyspark dataframe.
The filter() Method
The filter()
method, when invoked on a pyspark dataframe, takes a conditional statement as its input. The conditional statement generally uses one or multiple columns of the dataframe and returns a column containing True or False values. The filter()
method checks the mask and selects the rows for which the mask created by the conditional statement has the value True in the output. The rest of the rows are discarded.
PySpark Filter DataFrame by Column Value
To filter a pyspark dataframe by a column value, we will use the filter()
method. Here, we will check for the column value in a conditional statement and pass it to the filter()
method. After execution, we will get a pyspark dataframe with rows satisfying the condition. You can observe this in the following example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter(dfs.Physics>80)
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
In this example, we first read a csv file into a pyspark dataframe. Then, we used the filter()
method to filter rows from the dataframe. In the filter()
method, we passed the condition dfs.Physics>80
. Here, dfs
is the dataframe created from the csv file and Physics
is the column name. Hence, the filter()
method will return a dataframe having values greater than 80 in the Physics
column.
Filter PySpark DataFrame Using SQL Statement
You can also use SQL statements to filter a pyspark dataframe by column value. For this, we can use the SQL SELECT statement with a WHERE clause to check for the condition in the given column name. To filter a dataframe by column value using SQL in PySpark, we can use the following steps.
- First, we will create a view of the pyspark dataframe using the
createOrReplaceTempView()
function. ThecreateOrReplaceTempView()
method, when invoked on a pyspark dataframe, take the name of the view as its input argument. After execution, it generates a view of the dataframe with the given name. We can execute SQL statements on this view to filter data. - Next, we will create an SQL statement to filter rows using the SELECT statement and WHERE clause.
- Finally, we will use the
sql()
function to execute the SQL statement.
After executing the sql()
function, we will get the output dataframe with filtered rows. You can observe this in the following example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
dfs.createOrReplaceTempView("df_sql")
new_df=spark.sql("SELECT * FROM df_sql WHERE Physics>80")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
In the above example, we first created a dataframe from the csv file. Then, we used the createOrReplaceTempView()
method to create a view of the pyspark dataframe as we cannot directly execute sql statements on a dataframe. Next, we used the sql()
function to execute the SQL statement for filtering the rows of the dataframe based on the Physics
column.
Instead of using the above approach, you can also pass the statement in the sql WHERE clause directly to the filter()
method by invoking it on the input dataframe. After this, you will get the output dataframe with desired rows as shown in the following example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter("Physics>80")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
In this example, we directly passed the SQL sub-query that we used in the WHERE clause in the previous example to the filter()
method. However, the output remains unchanged.
Filter PySpark DataFrame by Multiple Conditions
You can also filter pyspark dataframes by multiple conditions. For this, you need to include all the conditions inside the filter()
method or in the sql WHERE clause using conditional operators.
For instance, we can filter rows in the pyspark dataframe by multiple conditions using the filter()
method as shown below.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter((dfs.Physics>70) & (dfs.Chemistry<90))
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
| Joel| 45| 75| 87|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
In the above example, we have passed two conditions to the filter()
method using the AND operator (&). Hence, the the filter()
method filters the input dataframe by both the conditions and produces the result. Here, you need to keep in mind that each condition is enclosed in a parentheses and then they are combined using conditional operators.
If you don’t use parentheses with the conditional statements in the filter()
method. The program will run into an error. You can observe this in the following example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter(dfs.Physics>70 & dfs.Chemistry<90)
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
Py4JError: An error occurred while calling o240.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
In the above code, we haven’t used parentheses with the conditions in the filter()
method. Hence, the program runs into Py4JError.
PySpark Filter DataFrame by Multiple Conditions Using SQL
Instead of the filter method, you can also use sql WHERE clause to filter a pyspark dataframe by multiple conditions. For this, you can pass all the conditions in the WHERE clause and combine them using conditional operators. After execution of the conditional statement, you will get the desired output.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
dfs.createOrReplaceTempView("df_sql")
new_df=spark.sql("SELECT * FROM df_sql WHERE Physics>70 AND Chemistry<90")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
| Joel| 45| 75| 87|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
Instead of creating a view of the dataframe and executing the SQL statement using the sql()
function, you can also pass the sub-query used in the WHERE clause in the SQL statement to the filter()
method. After this, you will get the same output as the previous example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("filter_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input dataframe is:")
dfs.show()
new_df=dfs.filter("Physics>70 AND Chemistry<90")
print("The filtered rows are:")
new_df.show()
spark.sparkContext.stop()
Output:
The input dataframe is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
|Katrina| 49| 47| 83|
| Joel| 45| 75| 87|
| Agatha| 77| 76| 93|
| Sam| 99| 62| 95|
| Aditya| 65| 89| 71|
+-------+-----+-------+---------+
The filtered rows are:
+------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+------+-----+-------+---------+
|Aditya| 45| 89| 71|
| Chris| null| 85| 82|
| Joel| 45| 75| 87|
| Joel| 45| 75| 87|
|Aditya| 65| 89| 71|
+------+-----+-------+---------+
Conclusion
In this article, we discussed multiple ways to filter rows in a pyspark dataframe. To learn more about pyspark, you can read this article on how to select distinct rows from a pyspark dataframe. You might also like this article on how to sort a pyspark dataframe.
I hope you enjoyed reading this article. Stay tuned for more informative articles.
Happy Learning!
Recommended Python Training
Course: Python 3 For Beginners
Over 15 hours of video content with guided instruction for beginners. Learn how to create real world applications and master the basics.