#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from shutil import rmtree
import tempfile
import unittest
import numpy as np
from pyspark.ml.classification import FMClassifier, LogisticRegression, \
MultilayerPerceptronClassifier, OneVsRest
from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.linalg import Matrices, Vectors, DenseVector
from pyspark.ml.recommendation import ALS
from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
from pyspark.sql import Row
from pyspark.testing.mlutils import SparkSessionTestCase
class LogisticRegressionTest(SparkSessionTestCase):
def test_binomial_logistic_regression_with_bound(self):
df = self.spark.createDataFrame(
[(1.0, 1.0, Vectors.dense(0.0, 5.0)),
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
(0.0, 4.0, Vectors.dense(3.0, 3.0)), ], ["label", "weight", "features"])
lor = LogisticRegression(regParam=0.01, weightCol="weight",
lowerBoundsOnCoefficients=Matrices.dense(1, 2, [-1.0, -1.0]),
upperBoundsOnIntercepts=Vectors.dense(0.0))
model = lor.fit(df)
self.assertTrue(
np.allclose(model.coefficients.toArray(), [-0.2944, -0.0484], atol=1E-4))
self.assertTrue(np.isclose(model.intercept, 0.0, atol=1E-4))
def test_multinomial_logistic_regression_with_bound(self):
data_path = "data/mllib/sample_multiclass_classification_data.txt"
df = self.spark.read.format("libsvm").load(data_path)
lor = LogisticRegression(regParam=0.01,
lowerBoundsOnCoefficients=Matrices.dense(3, 4, range(12)),
upperBoundsOnIntercepts=Vectors.dense(0.0, 0.0, 0.0))
model = lor.fit(df)
expected = [[4.593, 4.5516, 9.0099, 12.2904],
[1.0, 8.1093, 7.0, 10.0],
[3.041, 5.0, 8.0, 11.0]]
for i in range(0, len(expected)):
self.assertTrue(
np.allclose(model.coefficientMatrix.toArray()[i], expected[i], atol=1E-4))
self.assertTrue(
np.allclose(model.interceptVector.toArray(), [-0.9057, -1.1392, -0.0033], atol=1E-4))
class MultilayerPerceptronClassifierTest(SparkSessionTestCase):
def test_raw_and_probability_prediction(self):
data_path = "data/mllib/sample_multiclass_classification_data.txt"
df = self.spark.read.format("libsvm").load(data_path)
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[4, 5, 4, 3],
blockSize=128, seed=123)
model = mlp.fit(df)
test = self.sc.parallelize([Row(features=Vectors.dense(0.1, 0.1, 0.25, 0.25))]).toDF()
result = model.transform(test).head()
expected_prediction = 2.0
expected_probability = [0.0, 0.0, 1.0]
expected_rawPrediction = [-11.6081922998, -8.15827998691, 22.17757045]
self.assertTrue(result.prediction, expected_prediction)
self.assertTrue(np.allclose(result.probability, expected_probability, atol=1E-4))
self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, rtol=0.1))
class OneVsRestTests(SparkSessionTestCase):
def test_copy(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
ovr1 = ovr.copy({lr.maxIter: 10})
self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
model = ovr.fit(df)
model1 = model.copy({model.predictionCol: "indexed"})
self.assertEqual(model1.getPredictionCol(), "indexed")
def test_output_columns(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr, parallelism=1)
model = ovr.fit(df)
output = model.transform(df)
self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"])
def test_raw_prediction_column_is_of_vector_type(self):
# SPARK-35142: `OneVsRestModel` outputs raw prediction as a string column
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr, parallelism=1)
model = ovr.fit(df)
row = model.transform(df).head()
self.assertIsInstance(row["rawPrediction"], DenseVector)
def test_parallelism_does_not_change_output(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
modelPar1 = ovrPar1.fit(df)
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
modelPar2 = ovrPar2.fit(df)
for i, model in enumerate(modelPar1.models):
self.assertTrue(np.allclose(model.coefficients.toArray(),
modelPar2.models[i].coefficients.toArray(), atol=1E-4))
self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))
def test_support_for_weightCol(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
(1.0, Vectors.sparse(2, [], []), 1.0),
(2.0, Vectors.dense(0.5, 0.5), 1.0)],
["label", "features", "weight"])
# classifier inherits hasWeightCol
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr, weightCol="weight")
self.assertIsNotNone(ovr.fit(df))
# classifier doesn't inherit hasWeightCol
dt = FMClassifier()
ovr2 = OneVsRest(classifier=dt, weightCol="weight")
self.assertIsNotNone(ovr2.fit(df))
class KMeansTests(SparkSessionTestCase):
def test_kmeans_cosine_distance(self):
data = [(Vectors.dense([1.0, 1.0]),), (Vectors.dense([10.0, 10.0]),),
(Vectors.dense([1.0, 0.5]),), (Vectors.dense([10.0, 4.4]),),
(Vectors.dense([-1.0, 1.0]),), (Vectors.dense([-100.0, 90.0]),)]
df = self.spark.createDataFrame(data, ["features"])
kmeans = KMeans(k=3, seed=1, distanceMeasure="cosine")
model = kmeans.fit(df)
result = model.transform(df).collect()
self.assertTrue(result[0].prediction == result[1].prediction)
self.assertTrue(result[2].prediction == result[3].prediction)
self.assertTrue(result[4].prediction == result[5].prediction)
class LDATest(SparkSessionTestCase):
def _compare(self, m1, m2):
"""
Temp method for comparing instances.
TODO: Replace with generic implementation once SPARK-14706 is merged.
"""
self.assertEqual(m1.uid, m2.uid)
self.assertEqual(type(m1), type(m2))
self.assertEqual(len(m1.params), len(m2.params))
for p in m1.params:
if m1.isDefined(p):
self.assertEqual(m1.getOrDefault(p), m2.getOrDefault(p))
self.assertEqual(p.parent, m2.getParam(p.name).parent)
if isinstance(m1, LDAModel):
self.assertEqual(m1.vocabSize(), m2.vocabSize())
self.assertEqual(m1.topicsMatrix(), m2.topicsMatrix())
def test_persistence(self):
# Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
df = self.spark.createDataFrame([
[1, Vectors.dense([0.0, 1.0])],
[2, Vectors.sparse(2, {0: 1.0})],
], ["id", "features"])
# Fit model
lda = LDA(k=2, seed=1, optimizer="em")
distributedModel = lda.fit(df)
self.assertTrue(distributedModel.isDistributed())
localModel = distributedModel.toLocal()
self.assertFalse(localModel.isDistributed())
# Define paths
path = tempfile.mkdtemp()
lda_path = path + "/lda"
dist_model_path = path + "/distLDAModel"
local_model_path = path + "/localLDAModel"
# Test LDA
lda.save(lda_path)
lda2 = LDA.load(lda_path)
self._compare(lda, lda2)
# Test DistributedLDAModel
distributedModel.save(dist_model_path)
distributedModel2 = DistributedLDAModel.load(dist_model_path)
self._compare(distributedModel, distributedModel2)
# Test LocalLDAModel
localModel.save(local_model_path)
localModel2 = LocalLDAModel.load(local_model_path)
self._compare(localModel, localModel2)
# Clean up
try:
rmtree(path)
except OSError:
pass
class FPGrowthTests(SparkSessionTestCase):
def setUp(self):
super(FPGrowthTests, self).setUp()
self.data = self.spark.createDataFrame(
[([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )],
["items"])
def test_association_rules(self):
fp = FPGrowth()
fpm = fp.fit(self.data)
expected_association_rules = self.spark.createDataFrame(
[([3], [1], 1.0, 1.0, 0.5), ([2], [1], 1.0, 1.0, 0.75)],
["antecedent", "consequent", "confidence", "lift", "support"]
)
actual_association_rules = fpm.associationRules
self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0)
self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0)
def test_freq_itemsets(self):
fp = FPGrowth()
fpm = fp.fit(self.data)
expected_freq_itemsets = self.spark.createDataFrame(
[([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)],
["items", "freq"]
)
actual_freq_itemsets = fpm.freqItemsets
self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0)
self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0)
def tearDown(self):
del self.data
class ALSTest(SparkSessionTestCase):
def test_storage_levels(self):
df = self.spark.createDataFrame(
[(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
["user", "item", "rating"])
als = ALS().setMaxIter(1).setRank(1)
# test default params
als.fit(df)
self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
self.assertEqual(als.getFinalStorageLevel(), "MEMORY_AND_DISK")
self.assertEqual(als._java_obj.getFinalStorageLevel(), "MEMORY_AND_DISK")
# test non-default params
als.setIntermediateStorageLevel("MEMORY_ONLY_2")
als.setFinalStorageLevel("DISK_ONLY")
als.fit(df)
self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
self.assertEqual(als.getFinalStorageLevel(), "DISK_ONLY")
self.assertEqual(als._java_obj.getFinalStorageLevel(), "DISK_ONLY")
class GeneralizedLinearRegressionTest(SparkSessionTestCase):
def test_tweedie_distribution(self):
df = self.spark.createDataFrame(
[(1.0, Vectors.dense(0.0, 0.0)),
(1.0, Vectors.dense(1.0, 2.0)),
(2.0, Vectors.dense(0.0, 0.0)),
(2.0, Vectors.dense(1.0, 1.0)), ], ["label", "features"])
glr = GeneralizedLinearRegression(family="tweedie", variancePower=1.6)
model = glr.fit(df)
self.assertTrue(np.allclose(model.coefficients.toArray(), [-0.4645, 0.3402], atol=1E-4))
self.assertTrue(np.isclose(model.intercept, 0.7841, atol=1E-4))
model2 = glr.setLinkPower(-1.0).fit(df)
self.assertTrue(np.allclose(model2.coefficients.toArray(), [-0.6667, 0.5], atol=1E-4))
self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4))
def test_offset(self):
df = self.spark.createDataFrame(
[(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0)),
(0.5, 2.1, 0.5, Vectors.dense(1.0, 2.0)),
(0.9, 0.4, 1.0, Vectors.dense(2.0, 1.0)),
(0.7, 0.7, 0.0, Vectors.dense(3.0, 3.0))], ["label", "weight", "offset", "features"])
glr = GeneralizedLinearRegression(family="poisson", weightCol="weight", offsetCol="offset")
model = glr.fit(df)
self.assertTrue(np.allclose(model.coefficients.toArray(), [0.664647, -0.3192581],
atol=1E-4))
self.assertTrue(np.isclose(model.intercept, -1.561613, atol=1E-4))
class LinearRegressionTest(SparkSessionTestCase):
def test_linear_regression_with_huber_loss(self):
data_path = "data/mllib/sample_linear_regression_data.txt"
df = self.spark.read.format("libsvm").load(data_path)
lir = LinearRegression(loss="huber", epsilon=2.0)
model = lir.fit(df)
expectedCoefficients = [0.136, 0.7648, -0.7761, 2.4236, 0.537,
1.2612, -0.333, -0.5694, -0.6311, 0.6053]
expectedIntercept = 0.1607
expectedScale = 9.758
self.assertTrue(
np.allclose(model.coefficients.toArray(), expectedCoefficients, atol=1E-3))
self.assertTrue(np.isclose(model.intercept, expectedIntercept, atol=1E-3))
self.assertTrue(np.isclose(model.scale, expectedScale, atol=1E-3))
if __name__ == "__main__":
from pyspark.ml.tests.test_algorithms import * # noqa: F401
try:
import xmlrunner # type: ignore[import]
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
|