The csv file format is one of the most used file formats to store tabular data. In this article, we will discuss different ways to read a csv file in PySpark.
Pyspark Read CSV File Using The csv() Method
To read a csv file to create a pyspark dataframe, we can use the DataFrame.csv()
method. The csv()
method takes the filename of the csv file and returns a pyspark dataframe as shown below.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv")
print("The input csv file is:")
dfs.show()
spark.sparkContext.stop()
Output:
The input csv file is:
+-------+-----+-------+---------+
| _c0| _c1| _c2| _c3|
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
| Aditya| 45| 89| 1 |
| Chris| 86| 85| 2|
| Joel| null| 85| 3 |
|Katrina| 49| 47| 4|
| Agatha| 76| 89| 5|
| Sam| 76| 98| 6|
+-------+-----+-------+---------+
In the above example, we have used the following CSV file.
In the output, you can observe that the column names are given as _c0
,_c1
, _c2
. However, the actual column names should be Name
, Maths
, Physics
, and Chemistry
. Hence, we need to find a way to read the csv with its column names.
Read CSV With Header as Column Names
To read a csv file with column names, you can use the header
parameter in the csv()
method. When we set the header
parameter to True in the csv()
method, the first row of the csv file is treated as the column names. You can observe this in the following example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input csv file is:")
dfs.show()
spark.sparkContext.stop()
Output:
The input csv file is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 1 |
| Chris| 86| 85| 2|
| Joel| null| 85| 3 |
|Katrina| 49| 47| 4|
| Agatha| 76| 89| 5|
| Sam| 76| 98| 6|
+-------+-----+-------+---------+
In this example, we have set the header
parameter to True in the csv()
method. Hence, the first line of the csv file is read as column names.
Read CSV With inferSchema Parameter
By default, the csv()
method reads all the values as strings. For example, if we print the data types using the dtypes attribute of the pyspark dataframe, you can observe that all the column names have string data types.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True)
print("The input csv file is:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv file is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 1 |
| Chris| 86| 85| 2|
| Joel| null| 85| 3 |
|Katrina| 49| 47| 4|
| Agatha| 76| 89| 5|
| Sam| 76| 98| 6|
+-------+-----+-------+---------+
The data type of columns is:
[('Name', 'string'), ('Maths', 'string'), ('Physics', 'string'), ('Chemistry', 'string')]
In the above output, you can observe that all the columns have string data types irrespective of the values in the columns.
To read a csv file with correct data types for columns, we can use the inferSchema
parameter in the csv()
method. When we set the inferSchema
parameter to True, the program scans all the values in the dataframe and assigns the best data type to each column. You can observe this in the following example.
import pyspark.sql as ps
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv("sample_csv_file.csv",header=True,inferSchema=True)
print("The input csv file is:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv file is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| 1.0|
| Chris| 86| 85| 2.0|
| Joel| null| 85| 3.0|
|Katrina| 49| 47| 4.0|
| Agatha| 76| 89| 5.0|
| Sam| 76| 98| 6.0|
+-------+-----+-------+---------+
The data type of columns is:
[('Name', 'string'), ('Maths', 'int'), ('Physics', 'int'), ('Chemistry', 'double')]
In this example, we have set the inferSchema
parameter to True. Hence, the columns are given proper data types.
Why Should You Avoid Using The inferSchema Parameter in PySpark?
Using the inferSchema
parameter to decide the data type for columns in a pyspark dataframe is a costly operation. When we set the inferSchema
parameter to True, the program needs to scan all the values in the csv file. After scanning all the values in a given column, the data type for the particular column is decided. For large datasets, this can be a costly operation. This is why setting the inferSchema
parameter to True is a costly operation and it isn’t recommended for large datasets.
PySpark Read CSV File With Schema
Instead of using the inferSchema
parameter, we can read csv files with specified schemas.
A schema contains the column names, their data types, and a boolean value nullable to specify if a particular column can contain null values or not.
To define the schema for a pyspark dataframe, we use the StructType()
function and the StructField()
function.
The StructField()
function is used to define the name and data type of a particular column. It takes the column name as its first input argument and the data type of the column as its second input argument. To specify the data type of the column names, we use the StringType()
, IntegerType()
, FloatType()
, DoubleType()
, and other functions defined in the pyspark.sql.types
module.
In the third input argument to the StructField()
function, we pass True or False specifying if the column can contain null values or not. If we set the third parameter to True, the column will allow null values. Otherwise, it will not.
The StructType()
function is used to create the schema for the pyspark dataframe. It takes a list of StructField
objects as its input argument and returns a StructType
object that we can use as a schema.
To read a csv file with schema using pyspark, we will use the following steps.
- First, we will define the data type for each column using the
StructField()
function. - Next, we will pass a list of all the
StructField
objects to theStructType()
function to create a schema. - Finally, we will pass the
StructType
object to theschema
parameter in thecsv()
function while reading the csv file.
By executing the above steps, we can read a csv file in pyspark with a given schema. You can observe this in the following example.
import pyspark.sql as ps
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
list_of_cols=[StructField("Name",StringType(),True),
StructField("Maths",IntegerType(),True),
StructField("Physics",IntegerType(),True),
StructField("Chemistry",IntegerType(),True)]
schema=StructType(list_of_cols)
print("The schema is:")
print(schema)
spark.sparkContext.stop()
Output:
The schema is:
StructType([StructField('Name', StringType(), True), StructField('Maths', IntegerType(), True), StructField('Physics', IntegerType(), True), StructField('Chemistry', IntegerType(), True)])
In the above code, we have defined the schema for the csv file using the StructField()
function and the StructType()
function.
After defining the schema, you can pass it to the csv()
method to read the csv file with a proper data type for each column as shown in the following example.
import pyspark.sql as ps
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
list_of_cols=[StructField("Name",StringType(),True),
StructField("Maths",IntegerType(),True),
StructField("Physics",IntegerType(),True),
StructField("Chemistry",IntegerType(),True)]
schema=StructType(list_of_cols)
dfs=spark.read.csv("sample_csv_file.csv",header=True,schema=schema)
print("The input csv file is:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv file is:
+-------+-----+-------+---------+
| Name|Maths|Physics|Chemistry|
+-------+-----+-------+---------+
| Aditya| 45| 89| null|
| Chris| 86| 85| 2|
| Joel| null| 85| null|
|Katrina| 49| 47| 4|
| Agatha| 76| 89| 5|
| Sam| 76| 98| 6|
+-------+-----+-------+---------+
The data type of columns is:
[('Name', 'string'), ('Maths', 'int'), ('Physics', 'int'), ('Chemistry', 'int')]
In the above example, we have read a csv using schema. Observe that the values in a column that cannot be converted to the given data type in the schema are replaced with null values.
Read CSV With Different Delimiter in PySpark
The csv files need not contain the comma character as its delimiter. They might also contain characters like tabs, spaces, colons (:), semi-colons (;), pipe characters (|), etc as delimiters. For example, let us take the following file that uses the pipe character as the delimiter.
To read a csv file in pyspark with a given delimiter, you can use the sep parameter in the csv()
method. The csv()
method takes the delimiter as an input argument to the sep parameter and returns the pyspark dataframe as shown below.
import pyspark.sql as ps
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv("demo_file.csv",header=True,inferSchema=True, sep="|")
print("The input csv file is:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv file is:
+------+----+----------+-----+
| Name|Roll| Language|Extra|
+------+----+----------+-----+
|Aditya| 1| Python| 11|
| Sam| 2| Java| 12|
| Chris| 3| C++| 13|
| Joel| 4|TypeScript| 14|
+------+----+----------+-----+
The data type of columns is:
[('Name', 'string'), ('Roll', 'int'), ('Language', 'string'), ('Extra', 'int')]
In the above example, the csv file contains the | character as its delimiter. To read the file, we have passed the | character to the sep parameter as input in the csv()
method.
Read Multiple CSV Files into a Single PySpark DataFrame
To read multiple csv files into a pyspark dataframe at once, you can pass the list of filenames to the csv()
method as its first input argument. After execution, the csv()
method will return the pyspark dataframe with data from all files as shown below.
import pyspark.sql as ps
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv(["demo_file.csv","demo_file2.csv"],header=True,inferSchema=True, sep="|")
print("The input csv files are:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv files are:
+------+----+----------+-----+
| Name|Roll| Language|Extra|
+------+----+----------+-----+
|Aditya| 1| Python| 11|
| Sam| 2| Java| 12|
| Chris| 3| C++| 13|
| Joel| 4|TypeScript| 14|
|George| 12| C#| 15|
| Sean| 13| SQL| 16|
| Joe| 14| PHP| 17|
| Sam| 15|JavaScript| 18|
+------+----+----------+-----+
The data type of columns is:
[('Name', 'string'), ('Roll', 'int'), ('Language', 'string'), ('Extra', 'int')]
In the above example, we have used the following files.
In the output, you can observe that the contents of the files are stacked horizontally in the order they are passed in the csv()
function.
Multiple CSV Files With Different Column Names
If the files that we pass to the csv()
method have the same number of columns but different column names, the output dataframe will contain the column names of the first csv file. The data in the columns are stacked by their positions to create the output dataframe. You can observe this in the following example.
import pyspark.sql as ps
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv(["demo_file.csv","demo_file2.csv"],header=True,inferSchema=True, sep="|")
print("The input csv files are:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv files are:
23/07/09 04:54:17 WARN CSVHeaderChecker: CSV header does not conform to the schema.
Header: Name, Roll, Language, Extra
Schema: Name, Roll, Language, Ratings
Expected: Ratings but found: Extra
CSV file: file:///home/aditya1117/codes/demo_file2.csv
+------+----+----------+-------+
| Name|Roll| Language|Ratings|
+------+----+----------+-------+
|Aditya| 1| Python| 11|
| Sam| 2| Java| 12|
| Chris| 3| C++| 13|
| Joel| 4|TypeScript| 14|
|George| 12| C#| 15|
| Sean| 13| SQL| 16|
| Joe| 14| PHP| 17|
| Sam| 15|JavaScript| 18|
+------+----+----------+-------+
The data type of columns is:
[('Name', 'string'), ('Roll', 'string'), ('Language', 'string'), ('Ratings', 'string')]
In the above example, the first csv file has the column names Name
, Roll
, Language
, and Ratings
. The second csv file has Extra as the last column instead of Ratings.
In the output, you can observe that the column names of the first csv files are selected as schema. Hence, the csv()
function prints a warning when it encounters a different column name.
CSV Files With Different Numbers of Columns in PySpark
If the input files contain a different number of columns, the column names in the schema of the output dataframe are selected from the CSV file with more columns. Here, the rows from the csv file with lesser columns are filled with null values in the extra columns.
To understand this, let us add an extra column to the demo_file.csv
. The updated file is as follows.
Now, let us read both files into a pyspark dataframe using the csv()
function.
import pyspark.sql as ps
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = ps.SparkSession.builder \
.master("local[*]") \
.appName("readcsv_example") \
.getOrCreate()
dfs=spark.read.csv(["demo_file2.csv","demo_file.csv"],header=True,inferSchema=True, sep="|")
print("The input csv files are:")
dfs.show()
print("The data type of columns is:")
print(dfs.dtypes)
spark.sparkContext.stop()
Output:
The input csv files are:
23/07/09 04:57:08 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
Header length: 4, schema size: 5
CSV file: file:///home/aditya1117/codes/demo_file2.csv
+------+----+----------+-------+-----+
| Name|Roll| Language|Ratings|Grade|
+------+----+----------+-------+-----+
|Aditya| 1| Python| 11| A|
| Sam| 2| Java| 12| A|
| Chris| 3| C++| 13| A+|
| Joel| 4|TypeScript| 14| A+|
|George| 12| C#| 15| null|
| Sean| 13| SQL| 16| null|
| Joe| 14| PHP| 17| null|
| Sam| 15|JavaScript| 18| null|
+------+----+----------+-------+-----+
The data type of columns is:
[('Name', 'string'), ('Roll', 'string'), ('Language', 'string'), ('Ratings', 'string'), ('Grade', 'string')]
In the above code, the demo_file.csv
contains 4 columns. Hence, the column names given in demo_file.csv
are selected for the schema despite the fact that we have passed it as the second file to the csv()
function. You can also observe that the output pyspark data frame contains the data from the demo_file.csv
on the top of the dataframe as the schema is selected from this file.
Conclusion
In this article, we have discussed different ways to read a CSV file in Pyspark. To learn more about pyspark, you can read this article on pyspark vs pandas. You might also like this article on how to create an empty pyspark dataframe.
I hope you enjoyed reading this article. Stay tuned for more informative articles.
Happy Learning!
Recommended Python Training
Course: Python 3 For Beginners
Over 15 hours of video content with guided instruction for beginners. Learn how to create real world applications and master the basics.