Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

from pyspark import SparkContext 

from pyspark.sql.column import Column, _to_java_column 

 

 

def vector_to_array(col, dtype="float64"): 

""" 

Converts a column of MLlib sparse/dense vectors into a column of dense arrays. 

 

.. versionadded:: 3.0.0 

 

Parameters 

---------- 

col : :py:class:`pyspark.sql.Column` or str 

Input column 

dtype : str, optional 

The data type of the output array. Valid values: "float64" or "float32". 

 

Returns 

------- 

:py:class:`pyspark.sql.Column` 

The converted column of dense arrays. 

 

Examples 

-------- 

>>> from pyspark.ml.linalg import Vectors 

>>> from pyspark.ml.functions import vector_to_array 

>>> from pyspark.mllib.linalg import Vectors as OldVectors 

>>> df = spark.createDataFrame([ 

... (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)), 

... (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]), 

... OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))], 

... ["vec", "oldVec"]) 

>>> df1 = df.select(vector_to_array("vec").alias("vec"), 

... vector_to_array("oldVec").alias("oldVec")) 

>>> df1.collect() 

[Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), 

Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] 

>>> df2 = df.select(vector_to_array("vec", "float32").alias("vec"), 

... vector_to_array("oldVec", "float32").alias("oldVec")) 

>>> df2.collect() 

[Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), 

Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] 

>>> df1.schema.fields 

[StructField(vec,ArrayType(DoubleType,false),false), 

StructField(oldVec,ArrayType(DoubleType,false),false)] 

>>> df2.schema.fields 

[StructField(vec,ArrayType(FloatType,false),false), 

StructField(oldVec,ArrayType(FloatType,false),false)] 

""" 

sc = SparkContext._active_spark_context 

return Column( 

sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col), dtype)) 

 

 

def array_to_vector(col): 

""" 

Converts a column of array of numeric type into a column of pyspark.ml.linalg.DenseVector 

instances 

 

.. versionadded:: 3.1.0 

 

Parameters 

---------- 

col : :py:class:`pyspark.sql.Column` or str 

Input column 

 

Returns 

------- 

:py:class:`pyspark.sql.Column` 

The converted column of dense vectors. 

 

Examples 

-------- 

>>> from pyspark.ml.functions import array_to_vector 

>>> df1 = spark.createDataFrame([([1.5, 2.5],),], schema='v1 array<double>') 

>>> df1.select(array_to_vector('v1').alias('vec1')).collect() 

[Row(vec1=DenseVector([1.5, 2.5]))] 

>>> df2 = spark.createDataFrame([([1.5, 3.5],),], schema='v1 array<float>') 

>>> df2.select(array_to_vector('v1').alias('vec1')).collect() 

[Row(vec1=DenseVector([1.5, 3.5]))] 

>>> df3 = spark.createDataFrame([([1, 3],),], schema='v1 array<int>') 

>>> df3.select(array_to_vector('v1').alias('vec1')).collect() 

[Row(vec1=DenseVector([1.0, 3.0]))] 

""" 

sc = SparkContext._active_spark_context 

return Column( 

sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) 

 

 

def _test(): 

import doctest 

from pyspark.sql import SparkSession 

import pyspark.ml.functions 

import sys 

globs = pyspark.ml.functions.__dict__.copy() 

spark = SparkSession.builder \ 

.master("local[2]") \ 

.appName("ml.functions tests") \ 

.getOrCreate() 

sc = spark.sparkContext 

globs['sc'] = sc 

globs['spark'] = spark 

 

(failure_count, test_count) = doctest.testmod( 

pyspark.ml.functions, globs=globs, 

optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) 

spark.stop() 

125 ↛ 126line 125 didn't jump to line 126, because the condition on line 125 was never true if failure_count: 

sys.exit(-1) 

 

 

if __name__ == "__main__": 

_test()