Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

import sys 

import random 

import math 

 

 

class RDDSamplerBase(object): 

 

def __init__(self, withReplacement, seed=None): 

self._seed = seed if seed is not None else random.randint(0, sys.maxsize) 

self._withReplacement = withReplacement 

self._random = None 

 

def initRandomGenerator(self, split): 

self._random = random.Random(self._seed ^ split) 

 

# mixing because the initial seeds are close to each other 

for _ in range(10): 

self._random.randint(0, 1) 

 

def getUniformSample(self): 

return self._random.random() 

 

def getPoissonSample(self, mean): 

# Using Knuth's algorithm described in 

# http://en.wikipedia.org/wiki/Poisson_distribution 

if mean < 20.0: 

# one exp and k+1 random calls 

l = math.exp(-mean) 

p = self._random.random() 

k = 0 

while p > l: 

k += 1 

p *= self._random.random() 

else: 

# switch to the log domain, k+1 expovariate (random + log) calls 

p = self._random.expovariate(mean) 

k = 0 

while p < 1.0: 

k += 1 

p += self._random.expovariate(mean) 

return k 

 

def func(self, split, iterator): 

raise NotImplementedError 

 

 

class RDDSampler(RDDSamplerBase): 

 

def __init__(self, withReplacement, fraction, seed=None): 

RDDSamplerBase.__init__(self, withReplacement, seed) 

self._fraction = fraction 

 

def func(self, split, iterator): 

self.initRandomGenerator(split) 

if self._withReplacement: 

for obj in iterator: 

# For large datasets, the expected number of occurrences of each element in 

# a sample with replacement is Poisson(frac). We use that to get a count for 

# each element. 

count = self.getPoissonSample(self._fraction) 

for _ in range(0, count): 

yield obj 

else: 

for obj in iterator: 

if self.getUniformSample() < self._fraction: 

yield obj 

 

 

class RDDRangeSampler(RDDSamplerBase): 

 

def __init__(self, lowerBound, upperBound, seed=None): 

RDDSamplerBase.__init__(self, False, seed) 

self._lowerBound = lowerBound 

self._upperBound = upperBound 

 

def func(self, split, iterator): 

self.initRandomGenerator(split) 

for obj in iterator: 

if self._lowerBound <= self.getUniformSample() < self._upperBound: 

yield obj 

 

 

class RDDStratifiedSampler(RDDSamplerBase): 

 

def __init__(self, withReplacement, fractions, seed=None): 

RDDSamplerBase.__init__(self, withReplacement, seed) 

self._fractions = fractions 

 

def func(self, split, iterator): 

self.initRandomGenerator(split) 

if self._withReplacement: 

for key, val in iterator: 

# For large datasets, the expected number of occurrences of each element in 

# a sample with replacement is Poisson(frac). We use that to get a count for 

# each element. 

count = self.getPoissonSample(self._fractions[key]) 

for _ in range(0, count): 

yield key, val 

else: 

for key, val in iterator: 

if self.getUniformSample() < self._fractions[key]: 

yield key, val