PiSpark - Apache Spark in Python

Jul 16, 2024

PiSpark - Interface for Apache Spark in Python

Introduction

  • PiSpark is used for large-scale data processing and machine learning.
  • Course taught by Krish Knack.
  • Focus: Using Apache Spark with Python (PiSpark).

Why Use Apache Spark?

  • Required for large data sets that local systems can't handle.
  • Example: Handles data sets larger than typical RAM capacities by distributed systems.
  • Benefits:
    • Runs workloads 100 times faster than MapReduce.
    • Ease of use with Java, Scala, Python, or R.
    • Supports SQL, streaming, and complex analytics.
    • Runs on multiple environments like Hadoop, Mesos, Kubernetes, standalone, or various cloud platforms (AWS, Data Bricks, etc.).

Apache Spark Versions and Setups

  • Currently using PiSpark 3.1.1.
  • APIs provided for Scala, Java, and Python.

Basic PiSpark Operations

Installation

  • Start by installing PiSpark: pip install pyspark
  • Create a new environment to avoid conflicts.
  • Read CSV data into PiSpark using:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('practice').getOrCreate()
    df = spark.read.option('header', 'true').option('inferSchema', 'true').csv('path/to/csv')
    

SparkSession

  • spark = SparkSession.builder.appName('practice').getOrCreate()
  • Handles cluster and session management

Data Pre-processing

  • Print Schema: df.printSchema()
  • Show data: df.show()
  • Select columns: df.select('column_name').show()
  • Creating new columns:
    df = df.withColumn('new_col', df['existing_col'] + 2)
    
  • Dropping columns:
    df = df.drop('col_name')
    
  • Renaming columns:
    df = df.withColumnRenamed('old_name', 'new_name')
    

Handling Null/Missing Values

  • Dropping rows with null values: df.na.drop()
  • Fill missing values:
    df = df.na.fill({'col_name': 'value'})
    
  • Impute missing values:
    from pyspark.ml.feature import Imputer
    imputer = Imputer(inputCols=['col1', 'col2'], outputCols=['out1', 'out2']).setStrategy('mean')
    df = imputer.fit(df).transform(df)
    

Filter Operations

  • Retrieve records based on conditions using .filter:
    df.filter(df['salary'] > 20000).show()
    
  • Use AND/OR conditions with multiple filters.

GroupBy and Aggregate Functions

  • GroupBy then Aggregate:
    df.groupBy('col_name').sum('salary').show()
    df.groupBy('col_name').avg('salary').show()
    
  • Count occurrences:
    df.groupBy('col_name').count().show()
    

Machine Learning with Spark ML

Initial Setup

  • Input ML libraries from pyspark.ml.
  • Transform features using VectorAssembler.

Example: Linear Regression

  1. Create Spark Session.
  2. Read data and print schema.
  3. Handle categorical features using StringIndexer
  4. Group independent features with VectorAssembler.
  5. Train-test split: 75-25%.
  6. Train model and make predictions:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='label', featuresCol='features')
lrModel = lr.fit(train_data)
lrModel.transform(test_data).show()
  1. Evaluate model: Compute R^2, means squared error, etc.
  2. Save and load models using .save() and .load().

Data Bricks

  • Platform: Open and unified for data engineering, data science, and machine learning.
  • Community vs. Paid Version: Use community version for free clustering features.
  • Cluster Setup: Minimum 15GB, single node in community version.
  • Uploading data from local or cloud sources (AWS S3):
  • Handling large datasets with clusters: Parallel processing in distributed environments.