PiSpark - Interface for Apache Spark in Python
Introduction
- PiSpark is used for large-scale data processing and machine learning.
- Course taught by Krish Knack.
- Focus: Using Apache Spark with Python (PiSpark).
Why Use Apache Spark?
- Required for large data sets that local systems can't handle.
- Example: Handles data sets larger than typical RAM capacities by distributed systems.
- Benefits:
- Runs workloads 100 times faster than MapReduce.
- Ease of use with Java, Scala, Python, or R.
- Supports SQL, streaming, and complex analytics.
- Runs on multiple environments like Hadoop, Mesos, Kubernetes, standalone, or various cloud platforms (AWS, Data Bricks, etc.).
Apache Spark Versions and Setups
- Currently using PiSpark 3.1.1.
- APIs provided for Scala, Java, and Python.
Basic PiSpark Operations
Installation
SparkSession
spark = SparkSession.builder.appName('practice').getOrCreate()
- Handles cluster and session management
Data Pre-processing
- Print Schema:
df.printSchema()
- Show data:
df.show()
- Select columns:
df.select('column_name').show()
- Creating new columns:
df = df.withColumn('new_col', df['existing_col'] + 2)
- Dropping columns:
df = df.drop('col_name')
- Renaming columns:
df = df.withColumnRenamed('old_name', 'new_name')
Handling Null/Missing Values
- Dropping rows with null values:
df.na.drop()
- Fill missing values:
df = df.na.fill({'col_name': 'value'})
- Impute missing values:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['col1', 'col2'], outputCols=['out1', 'out2']).setStrategy('mean')
df = imputer.fit(df).transform(df)
Filter Operations
GroupBy and Aggregate Functions
Machine Learning with Spark ML
Initial Setup
- Input ML libraries from
pyspark.ml
.
- Transform features using
VectorAssembler
.
Example: Linear Regression
- Create Spark Session.
- Read data and print schema.
- Handle categorical features using
StringIndexer
- Group independent features with
VectorAssembler
.
- Train-test split: 75-25%.
- Train model and make predictions:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='label', featuresCol='features')
lrModel = lr.fit(train_data)
lrModel.transform(test_data).show()
- Evaluate model: Compute R^2, means squared error, etc.
- Save and load models using
.save()
and .load()
.
Data Bricks
- Platform: Open and unified for data engineering, data science, and machine learning.
- Community vs. Paid Version: Use community version for free clustering features.
- Cluster Setup: Minimum 15GB, single node in community version.
- Uploading data from local or cloud sources (AWS S3):
- Handling large datasets with clusters: Parallel processing in distributed environments.