Apache Spark is based on the Scala programming language. The Apache Spark community created PySpark to help Python work with Spark. You can use PySpark to work with RDDs in the Python programming language as well. This can be done using a library called Py4j.
Apache spark is an open-source analytics and distributed data processing system for large amounts of data (large-scale datasets). It employs an in-memory caching and an accelerated query execution for quick analytic queries against any size of data.
It is faster because it distributes large tasks across multiple nodes and uses RAM to cache and process data instead of using a file system.
Data scientists and developers use it to quickly perform ETL jobs on large amounts of data from IoT devices, sensors, and other sources. Spark also has a Python DataFrame API that can read a JSON file into a DataFrame and infer the schema automatically.
Spark provides development APIs for Python, Java, Scala, and R. It shares most of its features with PySpark, including Spark SQL, DataFrame, Streaming, MLlib, and Spark Core. We will be looking at PySpark.
Python is well known for its simple syntax and is a high-level language that is simple to learn. Despite its simple syntax, it is also extremely productive. Programmers can do much more with it. Since it provides an easier interface, you don’t have to worry about visualizations or Data Science libraries with Python API. The core components of R can be easily ported to Python as well. It is most certainly the preferred programming language for implementing Machine Learning algorithms.
Spark is implemented in Scala which runs on JVM. PySpark is a Python-based wrapper on top of the Scala API.
PySpark is a Python interface to Apache Spark. It is a Spark Python API that helps you connect Resilient Distributed Datasets (RDDs) to Apache Spark and Python.
It not only allows you to write Spark applications using python but also provides the PySpark shell for interactively analyzing your data in a distributed environment.
- Spark SQL brings native SQL support to Spark and simplifies the process of querying data stored in RDDs (Spark’s distributed datasets) as well as external sources. Spark SQL makes it easy to blend RDDs and relational tables. By combining these powerful abstractions, developers can easily mix SQL commands querying external data with complex analytics, all within a single application.
- DataFrame A DataFrame is a distributed data collection organized into named columns. It is conceptually equivalent to relational tables with advanced optimization techniques. DataFrame can be built from a variety of sources, including Hive tables, Structured Data files, external databases, and existing RDDs. This API was created with inspiration from DataFrame in R Programming and Pandas in Python for modern Big Data and data science applications.
- Streaming is a Spark API extension that allows data engineers and data scientists to process real-time data from a variety of sources like Kafka and Amazon Kinesis. This processed data can then be distributed to file systems, databases, and live dashboards. Streaming is a fault-tolerant, scalable streaming processing system. It supports both batch and streaming workloads natively.
- Machine Learning Library (MLlib) is a scalable machine learning library made up of widely used learning tools and algorithms, such as dimensionality reduction, collaborative filtering, classification, regression, and clustering. With other Spark components like Spark SQL, Spark streaming, and DataFrames, Spark MLLib works without any issues.
- Spark Core is a general execution engine of Spark and is the foundation upon which all other functionality is built. It offers an RDD (Resilient Distributed Dataset) and supports in-memory computing.
Setting up PySpark on Linux(Ubuntu)
Follow the steps below to setup and try pyspark:
Please note that python version 3.7 or above is required.
Create a new directory. Navigate to the directory.
Build and enable a new virtual environment
To check pyspark version
Pyspark comes with an interactive shell. It helps us to test, learn and analyze data in the command line.
Launch pyspark shell with command line ‘pyspark’. It launches the pyspark shell and gives you a prompt to interact with Spark in the Python language. To exit from spark shell use exit()
Create pyspark Dataframe:
Like in pandas, here also we can create dataframe manually by using these two methods toDF() and createDataFrame(), and also from JSON, CSV, TXT, XML formats by reading from S3, Azure Blob file systems e.t.c.
First, create columns and datas
An existing RDD is an easy way to manually create a PySpark DataFrame. First, let’s create a Spark RDD from a List collection by calling the parallelize() function from the SparkContext. This rdd object is required for all of the following examples.
A spark session is an entry point for the spark to access components. To create a Dataframe using toDF() method, we have to build a spark session and then pass the data as an argument to parallelize. Finally, we use “toDF(columns)” to specify column names as in the below code snippets.
To createDataframe using createDataFrame() method:
We have already created a rdd object with data and a spark session. We can use that object in creating a dataframe. We pass the rdd object as an argument for the createDataFrame() method and use toDF(columns) to specify the column names.
Kafka and PySpark:
We are going to use pyspark to produce a stream dataframe to Kafka and consume the stream dataframe. We need kafka and pyspark for the same. We have already setup pyspark in our system, Now we are going to setup kafka in the our system. If you have already setup kafka you can skip this, otherwise you can setup kafka by following these steps:
Set up Kafka using Docker compose:
Docker Compose is used to run multiple containers as a single service and it works on all environments. Docker Compose files are written YAML files.
Now, create docker-compose YAML file named docker-compose.yml for Kfka. Enter the following and save the file. It will run everything for you via Docker.
From the terminal, navigate to a directory containing the docker-compose.yml (which was created in the previous step) and run the below command to start all services:
Now run the below code to get into docker. It will create a new Bash session in the container kafka.
Create Kafka topic named as test_topic by running the below code
Exit from container bash session by using bash-5.1# exit command.
Now we have set-up Kafka and have created Kafka topic to produce and consume dataframe.
Produce CSV data to kafka topic, Consume using PySpark :
Produce CSV data to kafka topic :
For that we need a CSV. Download or create your own CSV file
Install the kafka-python package in a virtual environment. kafka-Python is a python client for the Apache Kafka distributed stream processing system. With pythonic interfaces, kafka-python is intended to operate similarly to the official java client.
In the below code we have configured Kafka producer and created an object with it.
In config we have to give info like bootstrap server and value_serializer. serializer instructs on how to turn the key and value objects the user provides with their ProducerRecord into bytes.
Now read data from CSV file as dictionary :
We have created a producer object and data from CSV, To produce data to kafka, iterate to csv data.
Now create a py file named demo_kafkaproducer.py in pysaprk_demo directory. Copy the below code to py file. It will read data from CSV and produce data to Kafka topic.
We have produced data to Kafka. Now we are going to consume data stream from Kafka
How to read data stream from Kafka topic?
To read the data Stream from the Kafka topic, we have to follow the below steps:
First, set the packages to environment for pysaprk shell, spark stream, and spark sql
We have setup the environment.
Now we have to create a spark session. SparkSession is the entry point to PySpark. Session will be created using SparkSession.builder. Follow the block of code to create sparksession and read stream dataframe from kafka
Now we have consumed stream data from Kafka and created dataframe named as stream_df
Below is the block of code to create schema/StructType,
What is schema/StructType in spark ?
It defines the structure of the DataFrame. We can define it using StructType, which is a collection of StructFields that define the column name, DataType, column nullability, and metadata.
Below code will write the data frame stream on console
Now create a py file named demo_kafkaconsumer.py in pysaprk_demo directory. Copy the below code to the py file. It will read stream dataframe from Kafka topic using pyspark, and write dataframe data in console.
One of the popular tools for working with Big data is Spark. It has the PySpark API for Python users.This article covers the basics of data frames, how to install PySpark on Linux, what spark and PySpark features are, and how to manually generate data frames using the toDF() and createDataFrame() functions in the PySpark shell. Due to its functional similarities to pandas and SQL, PySpark is simple to learn and use. Additionally, we look at setting up Kafka, putting data into Kafka, and using PySpark to read data streams from Kafka. I hope you use this information and put it to use in your work.