banner



How To Create Rdd From Csv File In Pyspark

Apache Spark Tutorial— How to Read and Write Data With PySpark

A PySpark cheat sheet for novice Data Engineers

Prashanth Xavier

Buddy is a novice Data Engineer who has recently come across Spark, a popular big data processing framework.

Considering the fact that Spark is being seamlessly integrated with cloud data platforms like Azure, AWS, and GCP Buddy has now realized its existential certainty. This has driven Buddy to jump-start his Spark journey, by tackling the most trivial exercise in a big data processing life cycle - "Reading and Writing Data"

TL;DR

Inundated with work Buddy and his impatient mind unanimously decided to take the shortcut with the following cheat sheet using Python.

TS; WM

In hindsi g ht, Buddy deems that it is imperative to come to terms with his impatient mind. The shortcut has proven to be effective, but a vast amount of time is being spent on solving minor errors and handling obscure behavior.

It is time to tackle the details.

Reading and writing data in Spark is a trivial task, more often than not it is the outset for any form of Big data processing. Buddy wants to know the core syntax for reading and writing data before moving onto specifics.

The core syntax for reading data in Apache Spark

              DataFrameReader.format(…).option("key", "value").schema(…).load()            

DataFrameReader is the foundation for reading data in Spark, it can be accessed via the attribute spark.read

  • format — specifies the file format as in CSV, JSON, or parquet. The default is parquet.
  • option — a set of key-value configurations to parameterize how to read data
  • schema — optional one used to specify if you would like to infer the schema from the data source.

Read Modes — Often while reading data from external sources we encounter corrupt data, read modes instruct Spark to handle corrupt data in a specific way.

There are 3 typical read modes and the default read mode is permissive.

  • permissive — All fields are set to null and corrupted records are placed in a string column called _corrupt_record
  • dropMalformed — Drops all rows containing corrupt records.
  • failFast — Fails when corrupt records are encountered.

The core syntax for writing data in Apache Spark

            DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()          

The foundation for writing data in Spark is the DataFrameWriter, which is accessed per-DataFrame using the attribute dataFrame.write

Save modes — specifies what will happen if Spark finds data already at the destination.

There are 4 typical save modes and the default mode is errorIfExists

  • append — appends output data to files that already exist
  • overwrite — completely overwrites any data present at the destination
  • errorIfExists — Spark throws an error if data already exists at the destination
  • ignore — if data exists do nothing with the dataFrame

That's a great primer! Buddy seems to now understand the reasoning behind the errors that have been tormenting him. He would like to expand on this knowledge by diving into some of the frequently encountered file types and how to handle them.

CSV files

How to read from CSV files?

To read a CSV file you must first create a DataFrameReader and set a number of options.

            df=spark.read.format("csv").option("header","true").load(filePath)          

Here we load a CSV file and tell Spark that the file contains a header row. This step is guaranteed to trigger a Spark job.

Spark job: block of parallel computation that executes some task.

A job is triggered every time we are physically required to touch the data. In this case, the DataFrameReader has to peek at the first line of the file to figure out how many columns of data we have in the file.

When reading data you always need to consider the overhead of datatypes. There are two ways to handle this in Spark, InferSchema or user-defined schema.

Reading CSV using InferSchema

df=spark.read.format("csv").option("inferSchema","true").load(filePath)

inferSchema option tells the reader to infer data types from the source file. This results in an additional pass over the file resulting in two Spark jobs being triggered. It is an expensive operation because Spark must automatically go through the CSV file and infer the schema for each column.

Reading CSV using user-defined Schema

The preferred option while reading any file would be to enforce a custom schema, this ensures that the data types are consistent and avoids any unexpected behavior.

In order to do that you first declare the schema to be enforced, and then read the data by setting schema option.

            csvSchema = StructType([StructField("id",IntegerType(),False)])            df=spark.read.format("csv").schema(csvSchema).load(filePath)          

As a result of pre-defining the schema for your data, you avoid triggering any jobs. Spark did not see the need to peek into the file since we took care of the schema. This is known as lazy evaluation which is a crucial optimization technique in Spark.

How to Write CSV Data?

Writing data in Spark is fairly simple, as we defined in the core syntax to write out data we need a dataFrame with actual data in it, through which we can access the DataFrameWriter.

            df.write.format("csv").mode("overwrite).save(outputPath/file.csv)          

Here we write the contents of the data frame into a CSV file. Setting the write mode to overwrite will completely overwrite any data that already exists in the destination.

What you expect as a result of the previous command is a single CSV file output, however, you would see that the file you intended to write is in fact a folder with numerous files within it. This is further confirmed by peeking into the contents of outputPath.

            %fs ls /outputPath/file.csv          

This is an important aspect of Spark distributed engine and it reflects the number of partitions in our dataFrame at the time we write it out. The number of files generated would be different if we had repartitioned the dataFrame before writing it out.

Partitioning simply means dividing a large data set into smaller chunks(partitions). In Spark they are the basic units of parallelism and it allows you to control where data is stored as you write it.

JSON files

How to Read from JSON file?

Reading JSON isn't that much different from reading CSV files, you can either read using inferSchema or by defining your own schema.

df=spark.read.format("json").option("inferSchema","true").load(filePath)

Here we read the JSON file by asking Spark to infer the schema, we only need one job even while inferring the schema because there is no header in JSON. The column names are extracted from the JSON object's attributes.

To maintain consistency we can always define a schema to be applied to the JSON data being read.

            jsonSchema = StructType([...])            df=spark.read.format("json").schema(jsonSchema).load(filePath)          

Remember that JSON files can be nested and for a small file manually creating the schema may not be worth the effort, but for a larger file, it is a better option as opposed to the really long and expensive schema-infer process.

How to Write to JSON file?

As you would expect writing to a JSON file is identical to a CSV file.

            df.write.format("json").mode("overwrite).save(outputPath/file.json)          

Again, as with writing to a CSV, the dataset is split into many files reflecting the number of partitions in the dataFrame.

Parquet files

Apache Parquet is a columnar storage format, free and open-source which provides efficient data compression and plays a pivotal role in Spark Big Data processing.

How to Read data from Parquet files?

Unlike CSV and JSON files, Parquet "file" is actually a collection of files the bulk of it containing the actual data and a few files that comprise meta-data.

To read a parquet file we can use a variation of the syntax as shown below both of which perform the same action.

            #option1            df=spark.read.format("parquet).load(parquetDirectory)            #option2            df=spark.read.parquet(parquetDirectory)          

As you notice we don't need to specify any kind of schema, the column names and data types are stored in the parquet files themselves.

The schema inference process is not as expensive as it is for CSV and JSON, since the Parquet reader needs to process only the small-sized meta-data files to implicitly infer the schema rather than the whole file.

How to Write data to Parquet files?

Writing Parquet is as easy as reading it. Simply specify the location for the file to be written.

            df.write.format("parquet").mode("overwrite").save("outputPath")          

The same partitioning rules we defined for CSV and JSON applies here.

Delta

Buddy has never heard of this before, seems like a fairly new concept; deserves a bit of background.

Delta Lake is a project initiated by Databricks, which is now opensource. Delta lake is an open-source storage layer that helps you build a data lake comprised of one or more tables in Delta Lake format.

It is an open format based on Parquet that brings ACID transactions into a data lake and other handy features that aim at improving the reliability, quality, and performance of existing data lakes.

In order to understand how to read from Delta format, it would make sense to first create a delta file.

How to Write data to Delta format?

In order to create a delta file, you must have a dataFrame with some data to be written. Once you have that, creating a delta is as easy as changing the file type while performing a write. Instead of parquet simply say delta.

someDataFrame.write.format("delta").partitionBy("someColumn").save(path)

How to Read data from Delta format?

If Delta files already exist you can directly run queries using Spark SQL on the directory of delta using the following syntax:

SELECT * FROM delta. `/path/to/delta_directory`

In most cases, you would want to create a table using delta files and operate on it using SQL. The notation is : CREATE TABLE USING DELTA LOCATION

            spark.sql(""" DROP TABLE IF EXISTS delta_table_name""")            spark.sql(""" CREATE TABLE delta_table_name USING DELTA LOCATION '{}' """.format(/path/to/delta_directory))          

This is called an unmanaged table in Spark SQL. It now serves as an interface between Spark and the data in the storage layer. Any changes made to this table will be reflected in the files and vice-versa. Once the table is created you can query it like any SQL table.

Apart from writing a dataFrame as delta format, we can perform other batch operations like Append and Merge on delta tables, some of the trivial operations in big data processing pipelines.

Conclusion

In this article, Buddy learned

  1. How to read and write data using Apache Spark.
  2. How to handle Big Data specific file formats like Apache Parquet and Delta format.

The details coupled with the cheat sheet has helped Buddy circumvent all the problems.

Spark can do a lot more, and we know that Buddy is not going to stop there!

If you are looking to serve ML models using Spark here is an interesting Spark end-end tutorial that I found quite insightful. Give it a thumbs up if you like it too!

How To Create Rdd From Csv File In Pyspark

Source: https://towardsdatascience.com/spark-essentials-how-to-read-and-write-data-with-pyspark-5c45e29227cd

Posted by: packerangem1981.blogspot.com

0 Response to "How To Create Rdd From Csv File In Pyspark"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel