Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

from pyspark.util import _parse_memory 

 

 

class ExecutorResourceRequest(object): 

""" 

An Executor resource request. This is used in conjunction with the ResourceProfile to 

programmatically specify the resources needed for an RDD that will be applied at the 

stage level. 

 

This is used to specify what the resource requirements are for an Executor and how 

Spark can find out specific details about those resources. Not all the parameters are 

required for every resource type. Resources like GPUs are supported and have same limitations 

as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript, 

and vendor parameters for resources are all the same parameters a user would specify through the 

configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}. 

 

For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has 

to specify the resource name (gpu), the amount or number of GPUs per Executor, 

the discovery script would be specified so that when the Executor starts up it can 

discovery what GPU addresses are available for it to use because YARN doesn't tell 

Spark that, then vendor would not be used because its specific for Kubernetes. 

 

See the configuration and cluster specific docs for more details. 

 

Use :py:class:`pyspark.ExecutorResourceRequests` class as a convenience API. 

 

.. versionadded:: 3.1.0 

 

Parameters 

---------- 

resourceName : str 

Name of the resource 

amount : str 

Amount requesting 

discoveryScript : str, optional 

Optional script used to discover the resources. This is required on some 

cluster managers that don't tell Spark the addresses of the resources 

allocated. The script runs on Executors startup to discover the addresses 

of the resources available. 

vendor : str, optional 

Vendor, required for some cluster managers 

 

Notes 

----- 

This API is evolving. 

""" 

def __init__(self, resourceName, amount, discoveryScript="", vendor=""): 

self._name = resourceName 

self._amount = amount 

self._discovery_script = discoveryScript 

self._vendor = vendor 

 

@property 

def resourceName(self): 

return self._name 

 

@property 

def amount(self): 

return self._amount 

 

@property 

def discoveryScript(self): 

return self._discovery_script 

 

@property 

def vendor(self): 

return self._vendor 

 

 

class ExecutorResourceRequests(object): 

 

""" 

A set of Executor resource requests. This is used in conjunction with the 

:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the 

resources needed for an RDD that will be applied at the stage level. 

 

.. versionadded:: 3.1.0 

 

Notes 

----- 

This API is evolving. 

""" 

_CORES = "cores" 

_MEMORY = "memory" 

_OVERHEAD_MEM = "memoryOverhead" 

_PYSPARK_MEM = "pyspark.memory" 

_OFFHEAP_MEM = "offHeap" 

 

def __init__(self, _jvm=None, _requests=None): 

from pyspark import SparkContext 

_jvm = _jvm or SparkContext._jvm 

if _jvm is not None: 

self._java_executor_resource_requests = \ 

_jvm.org.apache.spark.resource.ExecutorResourceRequests() 

if _requests is not None: 

for k, v in _requests.items(): 

if k == self._MEMORY: 

self._java_executor_resource_requests.memory(str(v.amount)) 

elif k == self._OVERHEAD_MEM: 

self._java_executor_resource_requests.memoryOverhead(str(v.amount)) 

elif k == self._PYSPARK_MEM: 

self._java_executor_resource_requests.pysparkMemory(str(v.amount)) 

elif k == self._CORES: 

self._java_executor_resource_requests.cores(v.amount) 

else: 

self._java_executor_resource_requests.resource(v.resourceName, v.amount, 

v.discoveryScript, v.vendor) 

else: 

self._java_executor_resource_requests = None 

self._executor_resources = {} 

 

def memory(self, amount): 

if self._java_executor_resource_requests is not None: 

self._java_executor_resource_requests.memory(amount) 

else: 

self._executor_resources[self._MEMORY] = ExecutorResourceRequest(self._MEMORY, 

_parse_memory(amount)) 

return self 

 

def memoryOverhead(self, amount): 

if self._java_executor_resource_requests is not None: 

self._java_executor_resource_requests.memoryOverhead(amount) 

else: 

self._executor_resources[self._OVERHEAD_MEM] = \ 

ExecutorResourceRequest(self._OVERHEAD_MEM, _parse_memory(amount)) 

return self 

 

def pysparkMemory(self, amount): 

if self._java_executor_resource_requests is not None: 

self._java_executor_resource_requests.pysparkMemory(amount) 

else: 

self._executor_resources[self._PYSPARK_MEM] = \ 

ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount)) 

return self 

 

def offheapMemory(self, amount): 

154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true if self._java_executor_resource_requests is not None: 

self._java_executor_resource_requests.offHeapMemory(amount) 

else: 

self._executor_resources[self._OFFHEAP_MEM] = \ 

ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount)) 

return self 

 

def cores(self, amount): 

if self._java_executor_resource_requests is not None: 

self._java_executor_resource_requests.cores(amount) 

else: 

self._executor_resources[self._CORES] = ExecutorResourceRequest(self._CORES, amount) 

return self 

 

def resource(self, resourceName, amount, discoveryScript="", vendor=""): 

if self._java_executor_resource_requests is not None: 

self._java_executor_resource_requests.resource(resourceName, amount, discoveryScript, 

vendor) 

else: 

self._executor_resources[resourceName] = \ 

ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor) 

return self 

 

@property 

def requests(self): 

if self._java_executor_resource_requests is not None: 

result = {} 

execRes = self._java_executor_resource_requests.requestsJMap() 

for k, v in execRes.items(): 

result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(), 

v.discoveryScript(), v.vendor()) 

return result 

else: 

return self._executor_resources 

 

 

class TaskResourceRequest(object): 

""" 

A task resource request. This is used in conjunction with the 

:class:`pyspark.resource.ResourceProfile` to programmatically specify the resources 

needed for an RDD that will be applied at the stage level. The amount is specified 

as a Double to allow for saying you want more than 1 task per resource. Valid values 

are less than or equal to 0.5 or whole numbers. 

Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API. 

 

Parameters 

---------- 

resourceName : str 

Name of the resource 

amount : float 

Amount requesting as a float to support fractional resource requests. 

Valid values are less than or equal to 0.5 or whole numbers. 

 

.. versionadded:: 3.1.0 

 

Notes 

----- 

This API is evolving. 

""" 

def __init__(self, resourceName, amount): 

self._name = resourceName 

self._amount = float(amount) 

 

@property 

def resourceName(self): 

return self._name 

 

@property 

def amount(self): 

return self._amount 

 

 

class TaskResourceRequests(object): 

 

""" 

A set of task resource requests. This is used in conjunction with the 

:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources 

needed for an RDD that will be applied at the stage level. 

 

.. versionadded:: 3.1.0 

 

Notes 

----- 

This API is evolving. 

""" 

 

_CPUS = "cpus" 

 

def __init__(self, _jvm=None, _requests=None): 

from pyspark import SparkContext 

_jvm = _jvm or SparkContext._jvm 

if _jvm is not None: 

self._java_task_resource_requests = \ 

SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests() 

if _requests is not None: 

for k, v in _requests.items(): 

if k == self._CPUS: 

self._java_task_resource_requests.cpus(int(v.amount)) 

else: 

self._java_task_resource_requests.resource(v.resourceName, v.amount) 

else: 

self._java_task_resource_requests = None 

self._task_resources = {} 

 

def cpus(self, amount): 

if self._java_task_resource_requests is not None: 

self._java_task_resource_requests.cpus(amount) 

else: 

self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount) 

return self 

 

def resource(self, resourceName, amount): 

if self._java_task_resource_requests is not None: 

self._java_task_resource_requests.resource(resourceName, float(amount)) 

else: 

self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount) 

return self 

 

@property 

def requests(self): 

if self._java_task_resource_requests is not None: 

result = {} 

taskRes = self._java_task_resource_requests.requestsJMap() 

for k, v in taskRes.items(): 

result[k] = TaskResourceRequest(v.resourceName(), v.amount()) 

return result 

else: 

return self._task_resources