Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

import sys 

 

from collections import namedtuple 

 

from pyspark import since 

from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc 

from pyspark.mllib.util import JavaSaveable, JavaLoader, inherit_doc 

 

__all__ = ['FPGrowth', 'FPGrowthModel', 'PrefixSpan', 'PrefixSpanModel'] 

 

 

@inherit_doc 

class FPGrowthModel(JavaModelWrapper, JavaSaveable, JavaLoader): 

""" 

A FP-Growth model for mining frequent itemsets 

using the Parallel FP-Growth algorithm. 

 

.. versionadded:: 1.4.0 

 

Examples 

-------- 

>>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] 

>>> rdd = sc.parallelize(data, 2) 

>>> model = FPGrowth.train(rdd, 0.6, 2) 

>>> sorted(model.freqItemsets().collect()) 

[FreqItemset(items=['a'], freq=4), FreqItemset(items=['c'], freq=3), ... 

>>> model_path = temp_path + "/fpm" 

>>> model.save(sc, model_path) 

>>> sameModel = FPGrowthModel.load(sc, model_path) 

>>> sorted(model.freqItemsets().collect()) == sorted(sameModel.freqItemsets().collect()) 

True 

""" 

 

@since("1.4.0") 

def freqItemsets(self): 

""" 

Returns the frequent itemsets of this model. 

""" 

return self.call("getFreqItemsets").map(lambda x: (FPGrowth.FreqItemset(x[0], x[1]))) 

 

@classmethod 

@since("2.0.0") 

def load(cls, sc, path): 

""" 

Load a model from the given path. 

""" 

model = cls._load_java(sc, path) 

wrapper = sc._jvm.org.apache.spark.mllib.api.python.FPGrowthModelWrapper(model) 

return FPGrowthModel(wrapper) 

 

 

class FPGrowth(object): 

""" 

A Parallel FP-growth algorithm to mine frequent itemsets. 

 

.. versionadded:: 1.4.0 

""" 

 

@classmethod 

def train(cls, data, minSupport=0.3, numPartitions=-1): 

""" 

Computes an FP-Growth model that contains frequent itemsets. 

 

.. versionadded:: 1.4.0 

 

Parameters 

---------- 

data : :py:class:`pyspark.RDD` 

The input data set, each element contains a transaction. 

minSupport : float, optional 

The minimal support level. 

(default: 0.3) 

numPartitions : int, optional 

The number of partitions used by parallel FP-growth. A value 

of -1 will use the same number as input data. 

(default: -1) 

""" 

model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions)) 

return FPGrowthModel(model) 

 

class FreqItemset(namedtuple("FreqItemset", ["items", "freq"])): 

""" 

Represents an (items, freq) tuple. 

 

.. versionadded:: 1.4.0 

""" 

 

 

@inherit_doc 

class PrefixSpanModel(JavaModelWrapper): 

""" 

Model fitted by PrefixSpan 

 

.. versionadded:: 1.6.0 

 

Examples 

-------- 

>>> data = [ 

... [["a", "b"], ["c"]], 

... [["a"], ["c", "b"], ["a", "b"]], 

... [["a", "b"], ["e"]], 

... [["f"]]] 

>>> rdd = sc.parallelize(data, 2) 

>>> model = PrefixSpan.train(rdd) 

>>> sorted(model.freqSequences().collect()) 

[FreqSequence(sequence=[['a']], freq=3), FreqSequence(sequence=[['a'], ['a']], freq=1), ... 

""" 

 

@since("1.6.0") 

def freqSequences(self): 

"""Gets frequent sequences""" 

return self.call("getFreqSequences").map(lambda x: PrefixSpan.FreqSequence(x[0], x[1])) 

 

 

class PrefixSpan(object): 

""" 

A parallel PrefixSpan algorithm to mine frequent sequential patterns. 

The PrefixSpan algorithm is described in Jian Pei et al (2001) [1]_ 

 

.. versionadded:: 1.6.0 

 

.. [1] Jian Pei et al., 

"PrefixSpan,: mining sequential patterns efficiently by prefix-projected pattern growth," 

Proceedings 17th International Conference on Data Engineering, Heidelberg, 

Germany, 2001, pp. 215-224, 

doi: https://doi.org/10.1109/ICDE.2001.914830 

""" 

 

@classmethod 

def train(cls, data, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000): 

""" 

Finds the complete set of frequent sequential patterns in the 

input sequences of itemsets. 

 

.. versionadded:: 1.6.0 

 

Parameters 

---------- 

data : :py:class:`pyspark.RDD` 

The input data set, each element contains a sequence of 

itemsets. 

minSupport : float, optional 

The minimal support level of the sequential pattern, any 

pattern that appears more than (minSupport * 

size-of-the-dataset) times will be output. 

(default: 0.1) 

maxPatternLength : int, optional 

The maximal length of the sequential pattern, any pattern 

that appears less than maxPatternLength will be output. 

(default: 10) 

maxLocalProjDBSize : int, optional 

The maximum number of items (including delimiters used in the 

internal storage format) allowed in a projected database before 

local processing. If a projected database exceeds this size, 

another iteration of distributed prefix growth is run. 

(default: 32000000) 

""" 

model = callMLlibFunc("trainPrefixSpanModel", 

data, minSupport, maxPatternLength, maxLocalProjDBSize) 

return PrefixSpanModel(model) 

 

class FreqSequence(namedtuple("FreqSequence", ["sequence", "freq"])): 

""" 

Represents a (sequence, freq) tuple. 

 

.. versionadded:: 1.6.0 

""" 

 

 

def _test(): 

import doctest 

from pyspark.sql import SparkSession 

import pyspark.mllib.fpm 

globs = pyspark.mllib.fpm.__dict__.copy() 

spark = SparkSession.builder\ 

.master("local[4]")\ 

.appName("mllib.fpm tests")\ 

.getOrCreate() 

globs['sc'] = spark.sparkContext 

import tempfile 

 

temp_path = tempfile.mkdtemp() 

globs['temp_path'] = temp_path 

try: 

(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 

spark.stop() 

finally: 

from shutil import rmtree 

try: 

rmtree(temp_path) 

except OSError: 

pass 

210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true if failure_count: 

sys.exit(-1) 

 

 

if __name__ == "__main__": 

_test()