Tabela de Conteúdos
Tutorial de PySpark e MLlib
Nesse tutorial de Spark vamos utilizar PySpark e MLLib para uma atividade simples de processamento de Machine Learning.
Uma breve introdução ao Hadoop e Spark
Com o advento do Big Data, faz-se necessário inserir novas técnicas de processamento de dados. Estas técnicas incluem tanto armazenamento, quando velocidade de processamento e operações matemáticas.
Neste cenário, 2 ferramentas se destacam no mercado. São elas, o ecossistema Hadoop, e Spark.
Hadoop combina diversas ferramentas para armazenamento de dados e queries, como Hive, Pig, MapReduce, entre outras. Já o Spark surgiu como uma alternativa para o MapReduce do Hadoop, mas tem-se provado uma excelente ferramenta para realização de computação distribuída, ou seja, processamento paralelo entre diversos nós de um cluster de computadores.
Iniciando o Tutorial de PySpark
O objetivo deste artigo é a exploração da API do spark para Python, o Pyspark, e da biblioteca de Machine Learning, MLlib, para realizar a análise exploratória de um conjunto de dados e fazer uma Regressão Linear.
Vemos o PySpark como um excelente alternativa para programação em Spark, já que une uma linguagem que está se tornando muito popular : o Python e o Spark
Importe o FindSpark
Primeiro, deve-se importar o módúlo findspark de modo à utilizar o método .init, responsável por inicializar o spark.
# Import findspark
import findspark
# Initialize and provide path
findspark.init("/usr/local/spark/spark-2.2.0-bin-hadoop2.7/")
Importe SparkSession
Após o spark ter sido encontrado no sistema, é necessário criar a Sessão Spark, onde é possível configurar os nós do cluster, bem como a memória alocada para cada um deles.
# Import SparkSession
from pyspark.sql import SparkSession
# Build the SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Linear Regression Model") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
sc = spark.sparkContext
Iniciando o Desenvolvimento com Spark
Com a sessão spark criada, pode-se trabalhar no ambiente de desenvolvimento. A primeira etapa é importar o conjunto de dados, neste caso, o arquivo chama-se “Salary_Data.csv”, contendo dados de determinados funcionários, com os seus salários e anos de experiência em determinada função.
Note que os dados foram salvos em uma variável chamada rdd, que significa Resilient Distributed Dataset, a principal estrutura de dados do Spark. Essa estrutura permite trabalhar com computação distribuída, ou seja, os dados serão distribuídos entre os nós do cluster, e controlados pelo nó master. Desta forma pode-se processá-los em paralelo, aumentando a velocidade de processamento.
rdd = sc.textFile('<path-to-data>/Salary_Data.csv')
O método .take é indicado para visualização de uma parcela dos dados. Neste caso, o retorno da função trás 2 entradas do dataset. Note que a primeira entrada é ‘1.1,39343.00’ e a segunda entrada é ‘1.3,46205.00’. Os valores 1.1 e 1.3 representam os anos de experiência de um funcionário, já as entradas 39343.00 e 46205.00 representam os seus respectivos salários.
rdd.take(2)
Como as entradas vieram juntas na mesma string, é necessário separar os valores pela vírgula. Deste modo, usa-se o método split(“,”), responsável por isso. Também usa-se a função map, que mapeia a operação entre parênteses para todas as linhas do rdd.
Note que foi usado paradigma funcional de programação com a função lambda line: line.split(“,”). O Spark se beneficia deste paradigma, portanto é necessário utilizá-lo.
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))
# Inspect the first line
rdd.take(1)
Observe que os valores foram separados pela vírgula, como esperado.
Existem os métodos .first e .top, que mostram a primeira linha, e a linha do topo, respectivamente. São métodos semelhantes.
# Inspect the first line
rdd.first()
# Take top elements
rdd.top(1)
Nesta etapa, importa-se o módulo Row, onde o rdd faz a transformação para linhas do tipo Row. Essa transformação é necessária pois serão tratados os nomes das colunas, como YearsExperience e Salary, representados pela linha 0 e linha 1 respectivamente.
Mapeou-se todas as linhas para a formatação de colunas especificadas, e chamou-se o método .toDF(), onde é feita a transformação do rdd para DataFrame (semelhante ao DataFrame da biblioteca pandas).
# Import the necessary modules
from pyspark.sql import Row
# Map the RDD to a DF
df = rdd.map(lambda line: Row(YearsExperience=line[0], Salary=line[1])).toDF()
Usando o método .show, é possível inspecionar como o DataFrame está.
# Show the top 20 rows
df.show()
O método .printSchema mostra algumas informações sobre os tipos de dados presentes nas colunas, conforme linha abaixo.
df.printSchema()
Criou-se uma função chamada convertColumn, que recebe como argumento o dataframe df, os nomes das colunas, e o novo tipo para as quais serão feitos os casts das colunas.
Logo após a criação da função, define-se a variável columns, como uma lista contendo os nomes das colunas do df, e aplica-se a função para o dataframe em si, convertendo os valores para FloatType.
# Import all from `sql.types`
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
for name in names:
df = df.withColumn(name, df[name].cast(newType))
return df
# Assign all column names to `columns`
columns = ['YearsExperience', 'Salary']
# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())
df.show()
Também é possível mostrar apenas uma coluna com o método .select.
df.select('Salary').show(10)
Outra operação bastante conhecida é o groupby, onde pode-se agrupar os dados por um determinado pivô. Neste caso, usa-se a coluna Salary como pivô, efetuando a contagem dos valores e ordenando-os em ordem decrescente.
df.groupBy("Salary").count().sort("Salary",ascending=False).show()
Por último, temos o método .describe, que faz a descrição do df baseado nas colunas, retornando uma contagem dos elementos, a média, desvio padrão, valores mínimo e máximo.
df.describe().show()
Assim, mapeia-se as linhas do df transformando-as em DenseVector, e cria-se um novo dataframe, chamado df, com as colunas ‘label’ e ‘features’.
Recordando que uma Regressão Linear é um problema de Aprendizado Supervisionado, ou seja, o algoritmo necessita do ‘ground truth’, os rótulos das entradas, de modo que ele possa comparar com sua saída e calcular alguma métrica de erro, como Erro Quadrático Médio (do inglês, Mean Squared Error, MSE), bastante empregado em problemas de Regressão.
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector
# Define the `input_data`
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])
Outra etapa importante é deixar os dados na mesma escala. Isso se faz necessário pelo fato de que o algoritmo de Regressão Linear trabalha com distâncias euclidianas, ou seja, ele realiza operações de distância entre pontos no plano cartesiano, e isso exige que os dados estejam na mesma escala.
Para isso, usou-se o método de normalização conhecido como StantardScaler, onde subtrai-se a média do valor x definido, e divide-se pela diverença (xmax – xmin). Desta forma, os dados estarão distribuidos ao longo de 0 na mesma escala.
Realiza-se o fit e o transform em cima do df, desta forma a variável scaled_df contém nosso label, nossas features, e nossas features já escaladas, conforme output desta célula.
# Import `StandardScaler`
from pyspark.ml.feature import StandardScaler
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)
# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)
# Inspect the result
scaled_df.take(2)
Aqui é o ponto onde o Machine Learning começa. Como primeira etapa, é necessário dividir nosso conjunto de dados em treino e teste. Para isso, divide-se de forma aleatória com tamanhos 75% para treino e 25% para teste, com seed 1234.
Essa etapa é necessária, porque o principal objetivo para o algoritmo de Machine Learning é que ele tenha capacidade de generalização, ou seja, consiga generalizar bem, obtendo boas métricas para dados não presentes na etapa de treinamento. Desta forma, o workflow desejado seria que o algoritmo fosse treinado em dados conhecidos (conjunto de treino), e atingisse boas métricas para o conjunto de testes (dados nunca antes vistos pelo algoritmo).
O elemento seed 1234 insere um elemento de randomização padrão, ou seja, a divisão de treino e testes será feita de maneira aleatória, porém, sempre terá o mesmo resultado para o mesmo seed. Isso é importante para reprodução de resultados.
# Machine Learning Begins
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.75,.25],seed=1234)
Importa-se o módulo LinearRegression, e instancia-se a classe com o objeto lr. Deve-se passar os parâmetros labelCol=’label’ sendo estes os labels, ou rótulos do nosso problema de aprendizado supervisionado. O parâmetro maxIter é o número máximo de iterações que o algoritmo deve considerar para convergir.
Criado o objeto da classe LinearRegression, pode-se aplicar o método fit, que é responsável pelo treinamento do algoritmo no train_data, conjunto de treino.
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression
# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10)
# Fit the data to the model
linearModel = lr.fit(train_data)
Cria-se a variável predicted, que é a predição do algoritmo para o conjunto de testes (test_data).
É realizada a extração de predictions da variável predicted, e também a extração dos labels, de modo que se possa compará-los lado a lado na variável predictionAndLabel.
Essa variável contém ambas as predições e os rótulos verdadeiros.
# Generate predictions
predicted = linearModel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()
# Print out first 5 instances of `predictionAndLabel`
predictionAndLabel[:5]
Pode-se extrair os coeficientes da equação da reta:
y = ao.x + a1
Onde coefficients é o valor de ao, e intercept é o valor de a1.
# Coefficients for the model
linearModel.coefficients
# Intercept for the model
linearModel.intercept
Também é possível extrair o Erro Quadrático Médio, em inglês MSE, neste caso sendo representado pela raiz quadrada deste valor(RMSE).
# Get the RMSE
linearModel.summary.rootMeanSquaredError
E como métrica de avaliação do modelo, extrai-se o R2, ou Coeficiente de Determinação, uma métrica estatística de proximidade de pontos e reta sobreposta.
# Get the R2
linearModel.summary.r2
spark.stop()
Encerrando Tutorial
Esperamos ter ajudado com uma introdução simples sobre PySpark, MLLib e talvez até mesmo o primeiro contato com Spark.