Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

# This file is ported from spark/util/StatCounter.scala 

 

import copy 

import math 

 

try: 

from numpy import maximum, minimum, sqrt 

except ImportError: 

maximum = max 

minimum = min 

sqrt = math.sqrt 

 

 

class StatCounter(object): 

 

def __init__(self, values=None): 

if values is None: 

values = list() 

self.n = 0 # Running count of our values 

self.mu = 0.0 # Running mean of our values 

self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) 

self.maxValue = float("-inf") 

self.minValue = float("inf") 

 

for v in values: 

self.merge(v) 

 

# Add a value into this StatCounter, updating the internal statistics. 

def merge(self, value): 

delta = value - self.mu 

self.n += 1 

self.mu += delta / self.n 

self.m2 += delta * (value - self.mu) 

self.maxValue = maximum(self.maxValue, value) 

self.minValue = minimum(self.minValue, value) 

 

return self 

 

# Merge another StatCounter into this one, adding up the internal statistics. 

def mergeStats(self, other): 

58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true if not isinstance(other, StatCounter): 

raise TypeError("Can only merge StatCounter but got %s" % type(other)) 

 

61 ↛ 62line 61 didn't jump to line 62, because the condition on line 61 was never true if other is self: # reference equality holds 

self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order 

else: 

if self.n == 0: 

self.mu = other.mu 

self.m2 = other.m2 

self.n = other.n 

self.maxValue = other.maxValue 

self.minValue = other.minValue 

 

71 ↛ 85line 71 didn't jump to line 85, because the condition on line 71 was never false elif other.n != 0: 

delta = other.mu - self.mu 

73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true if other.n * 10 < self.n: 

self.mu = self.mu + (delta * other.n) / (self.n + other.n) 

75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true elif self.n * 10 < other.n: 

self.mu = other.mu - (delta * self.n) / (self.n + other.n) 

else: 

self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) 

 

self.maxValue = maximum(self.maxValue, other.maxValue) 

self.minValue = minimum(self.minValue, other.minValue) 

 

self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) 

self.n += other.n 

return self 

 

# Clone this StatCounter 

def copy(self): 

return copy.deepcopy(self) 

 

def count(self): 

return int(self.n) 

 

def mean(self): 

return self.mu 

 

def sum(self): 

return self.n * self.mu 

 

def min(self): 

return self.minValue 

 

def max(self): 

return self.maxValue 

 

# Return the variance of the values. 

def variance(self): 

108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true if self.n == 0: 

return float('nan') 

else: 

return self.m2 / self.n 

 

# 

# Return the sample variance, which corrects for bias in estimating the variance by dividing 

# by N-1 instead of N. 

# 

def sampleVariance(self): 

118 ↛ 119line 118 didn't jump to line 119, because the condition on line 118 was never true if self.n <= 1: 

return float('nan') 

else: 

return self.m2 / (self.n - 1) 

 

# Return the standard deviation of the values. 

def stdev(self): 

return sqrt(self.variance()) 

 

# 

# Return the sample standard deviation of the values, which corrects for bias in estimating the 

# variance by dividing by N-1 instead of N. 

# 

def sampleStdev(self): 

return sqrt(self.sampleVariance()) 

 

def asDict(self, sample=False): 

"""Returns the :class:`StatCounter` members as a ``dict``. 

 

Examples 

-------- 

>>> sc.parallelize([1., 2., 3., 4.]).stats().asDict() 

{'count': 4L, 

'max': 4.0, 

'mean': 2.5, 

'min': 1.0, 

'stdev': 1.2909944487358056, 

'sum': 10.0, 

'variance': 1.6666666666666667} 

""" 

return { 

'count': self.count(), 

'mean': self.mean(), 

'sum': self.sum(), 

'min': self.min(), 

'max': self.max(), 

'stdev': self.stdev() if sample else self.sampleStdev(), 

'variance': self.variance() if sample else self.sampleVariance() 

} 

 

def __repr__(self): 

return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % 

(self.count(), self.mean(), self.stdev(), self.max(), self.min()))