Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

from collections import namedtuple 

 

__all__ = ["SparkJobInfo", "SparkStageInfo", "StatusTracker"] 

 

 

class SparkJobInfo(namedtuple("SparkJobInfo", "jobId stageIds status")): 

""" 

Exposes information about Spark Jobs. 

""" 

 

 

class SparkStageInfo(namedtuple("SparkStageInfo", 

"stageId currentAttemptId name numTasks numActiveTasks " 

"numCompletedTasks numFailedTasks")): 

""" 

Exposes information about Spark Stages. 

""" 

 

 

class StatusTracker(object): 

""" 

Low-level status reporting APIs for monitoring job and stage progress. 

 

These APIs intentionally provide very weak consistency semantics; 

consumers of these APIs should be prepared to handle empty / missing 

information. For example, a job's stage ids may be known but the status 

API may not have any information about the details of those stages, so 

`getStageInfo` could potentially return `None` for a valid stage id. 

 

To limit memory usage, these APIs only provide information on recent 

jobs / stages. These APIs will provide information for the last 

`spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs. 

""" 

def __init__(self, jtracker): 

self._jtracker = jtracker 

 

def getJobIdsForGroup(self, jobGroup=None): 

""" 

Return a list of all known jobs in a particular job group. If 

`jobGroup` is None, then returns all known jobs that are not 

associated with a job group. 

 

The returned list may contain running, failed, and completed jobs, 

and may vary across invocations of this method. This method does 

not guarantee the order of the elements in its result. 

""" 

return list(self._jtracker.getJobIdsForGroup(jobGroup)) 

 

def getActiveStageIds(self): 

""" 

Returns an array containing the ids of all active stages. 

""" 

return sorted(list(self._jtracker.getActiveStageIds())) 

 

def getActiveJobsIds(self): 

""" 

Returns an array containing the ids of all active jobs. 

""" 

return sorted((list(self._jtracker.getActiveJobIds()))) 

 

def getJobInfo(self, jobId): 

""" 

Returns a :class:`SparkJobInfo` object, or None if the job info 

could not be found or was garbage collected. 

""" 

job = self._jtracker.getJobInfo(jobId) 

84 ↛ exitline 84 didn't return from function 'getJobInfo', because the condition on line 84 was never false if job is not None: 

return SparkJobInfo(jobId, job.stageIds(), str(job.status())) 

 

def getStageInfo(self, stageId): 

""" 

Returns a :class:`SparkStageInfo` object, or None if the stage 

info could not be found or was garbage collected. 

""" 

stage = self._jtracker.getStageInfo(stageId) 

93 ↛ exitline 93 didn't return from function 'getStageInfo', because the condition on line 93 was never false if stage is not None: 

# TODO: fetch them in batch for better performance 

attrs = [getattr(stage, f)() for f in SparkStageInfo._fields[1:]] 

return SparkStageInfo(stageId, *attrs)