Code example to help you understand how to modularize your PySpark code:
Let's assume we have a PySpark project for text classification. We'll modularize the code into three modules: data_preprocessing.py, feature_engineering.py, and model_training.py.
In this example, each module (data_preprocessing.py, feature_engineering.py, and model_training.py) encapsulates a specific functionality and contains related functions. The main script imports these modules and calls the functions to execute the desired tasks. This modular approach makes the code more organized, reusable, and easier to maintain.
data_preprocessing.py module:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace
def load_data(spark, input_path):
# Load data from input path and return DataFrame
data = spark.read.csv(input_path, header=True, inferSchema=True)
return data
def clean_text(data, text_column):
# Perform text cleaning operations on the specified text column
cleaned_data = data.withColumn("cleaned_text", regexp_replace(col(text_column), "[^a-zA-Z\\s]", ""))
return cleaned_data
feature_engineering.py module:
from pyspark.ml.feature import CountVectorizer, StringIndexer
from pyspark.ml import Pipeline
def create_features(data, text_column):
# Create features using CountVectorizer and StringIndexer
vectorizer = CountVectorizer(inputCol=text_column, outputCol="features")
indexer = StringIndexer(inputCol="label", outputCol="label_index")
pipeline = Pipeline(stages=[vectorizer, indexer])
transformed_data = pipeline.fit(data).transform(data)
return transformed_data
model_training.py module:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def train_model(data):
# Train a random forest classifier on the input data
rf = RandomForestClassifier(labelCol="label_index", featuresCol="features")
model = rf.fit(data)
return model
def evaluate_model(model, data):
# Evaluate the trained model on the input data
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model.transform(data))
return accuracy
Main script:
from pyspark.sql import SparkSession
from data_preprocessing import load_data, clean_text
from feature_engineering import create_features
from model_training import train_model, evaluate_model
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
# Load data
data = load_data(spark, "data.csv")
# Clean text
cleaned_data = clean_text(data, "text_column")
# Create features
transformed_data = create_features(cleaned_data, "cleaned_text")
# Train model
model = train_model(transformed_data)
# Evaluate model
accuracy = evaluate_model(model, transformed_data)
print("Accuracy:", accuracy)