Search

Wednesday, 19 July 2023

How to modularize your PySpark code

Code example to help you understand how to modularize your PySpark code:

Let's assume we have a PySpark project for text classification. We'll modularize the code into three modules: data_preprocessing.py, feature_engineering.py, and model_training.py.

In this example, each module (data_preprocessing.py, feature_engineering.py, and model_training.py) encapsulates a specific functionality and contains related functions. The main script imports these modules and calls the functions to execute the desired tasks. This modular approach makes the code more organized, reusable, and easier to maintain.

data_preprocessing.py module:

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, lower, regexp_replace

def load_data(spark, input_path):

    # Load data from input path and return DataFrame

    data = spark.read.csv(input_path, header=True, inferSchema=True)

    return data

def clean_text(data, text_column):

    # Perform text cleaning operations on the specified text column

    cleaned_data = data.withColumn("cleaned_text", regexp_replace(col(text_column), "[^a-zA-Z\\s]", ""))

    return cleaned_data


feature_engineering.py module:

from pyspark.ml.feature import CountVectorizer, StringIndexer

from pyspark.ml import Pipeline

def create_features(data, text_column):

    # Create features using CountVectorizer and StringIndexer

    vectorizer = CountVectorizer(inputCol=text_column, outputCol="features")

    indexer = StringIndexer(inputCol="label", outputCol="label_index")

    pipeline = Pipeline(stages=[vectorizer, indexer])

    transformed_data = pipeline.fit(data).transform(data)

    return transformed_data


model_training.py module:

from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def train_model(data):

    # Train a random forest classifier on the input data

    rf = RandomForestClassifier(labelCol="label_index", featuresCol="features")

    model = rf.fit(data)

    return model

def evaluate_model(model, data):

    # Evaluate the trained model on the input data

    evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")

    accuracy = evaluator.evaluate(model.transform(data))

    return accuracy


Main script:

from pyspark.sql import SparkSession

from data_preprocessing import load_data, clean_text

from feature_engineering import create_features

from model_training import train_model, evaluate_model

# Create SparkSession

spark = SparkSession.builder.getOrCreate()

# Load data

data = load_data(spark, "data.csv")

# Clean text

cleaned_data = clean_text(data, "text_column")

# Create features

transformed_data = create_features(cleaned_data, "cleaned_text")

# Train model

model = train_model(transformed_data)

# Evaluate model

accuracy = evaluate_model(model, transformed_data)

print("Accuracy:", accuracy)