Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

from py4j.java_gateway import java_import, is_instance_of 

 

from pyspark import RDD, SparkConf 

from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer 

from pyspark.context import SparkContext 

from pyspark.storagelevel import StorageLevel 

from pyspark.streaming.dstream import DStream 

from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer 

 

__all__ = ["StreamingContext"] 

 

 

class StreamingContext(object): 

""" 

Main entry point for Spark Streaming functionality. A StreamingContext 

represents the connection to a Spark cluster, and can be used to create 

:class:`DStream` various input sources. It can be from an existing :class:`SparkContext`. 

After creating and transforming DStreams, the streaming computation can 

be started and stopped using `context.start()` and `context.stop()`, 

respectively. `context.awaitTermination()` allows the current thread 

to wait for the termination of the context by `stop()` or by an exception. 

 

Parameters 

---------- 

sparkContext : :class:`SparkContext` 

SparkContext object. 

batchDuration : int, optional 

the time interval (in seconds) at which streaming 

data will be divided into batches 

""" 

_transformerSerializer = None 

 

# Reference to a currently active StreamingContext 

_activeContext = None 

 

def __init__(self, sparkContext, batchDuration=None, jssc=None): 

 

self._sc = sparkContext 

self._jvm = self._sc._jvm 

self._jssc = jssc or self._initialize_context(self._sc, batchDuration) 

 

def _initialize_context(self, sc, duration): 

self._ensure_initialized() 

return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) 

 

def _jduration(self, seconds): 

""" 

Create Duration object given number of seconds 

""" 

return self._jvm.Duration(int(seconds * 1000)) 

 

@classmethod 

def _ensure_initialized(cls): 

SparkContext._ensure_initialized() 

gw = SparkContext._gateway 

 

java_import(gw.jvm, "org.apache.spark.streaming.*") 

java_import(gw.jvm, "org.apache.spark.streaming.api.java.*") 

java_import(gw.jvm, "org.apache.spark.streaming.api.python.*") 

 

from pyspark.java_gateway import ensure_callback_server_started 

ensure_callback_server_started(gw) 

 

# register serializer for TransformFunction 

# it happens before creating SparkContext when loading from checkpointing 

cls._transformerSerializer = TransformFunctionSerializer( 

SparkContext._active_spark_context, CloudPickleSerializer(), gw) 

 

@classmethod 

def getOrCreate(cls, checkpointPath, setupFunc): 

""" 

Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. 

If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be 

recreated from the checkpoint data. If the data does not exist, then the provided setupFunc 

will be used to create a new context. 

 

Parameters 

---------- 

checkpointPath : str 

Checkpoint directory used in an earlier streaming program 

setupFunc : function 

Function to create a new context and setup DStreams 

""" 

cls._ensure_initialized() 

gw = SparkContext._gateway 

 

# Check whether valid checkpoint information exists in the given path 

ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath) 

if ssc_option.isEmpty(): 

ssc = setupFunc() 

ssc.checkpoint(checkpointPath) 

return ssc 

 

jssc = gw.jvm.JavaStreamingContext(ssc_option.get()) 

 

# If there is already an active instance of Python SparkContext use it, or create a new one 

if not SparkContext._active_spark_context: 

jsc = jssc.sparkContext() 

conf = SparkConf(_jconf=jsc.getConf()) 

SparkContext(conf=conf, gateway=gw, jsc=jsc) 

 

sc = SparkContext._active_spark_context 

 

# update ctx in serializer 

cls._transformerSerializer.ctx = sc 

return StreamingContext(sc, None, jssc) 

 

@classmethod 

def getActive(cls): 

""" 

Return either the currently active StreamingContext (i.e., if there is a context started 

but not stopped) or None. 

""" 

activePythonContext = cls._activeContext 

if activePythonContext is not None: 

# Verify that the current running Java StreamingContext is active and is the same one 

# backing the supposedly active Python context 

activePythonContextJavaId = activePythonContext._jssc.ssc().hashCode() 

activeJvmContextOption = activePythonContext._jvm.StreamingContext.getActive() 

 

if activeJvmContextOption.isEmpty(): 

cls._activeContext = None 

140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true elif activeJvmContextOption.get().hashCode() != activePythonContextJavaId: 

cls._activeContext = None 

raise RuntimeError( 

"JVM's active JavaStreamingContext is not the JavaStreamingContext " 

"backing the action Python StreamingContext. This is unexpected.") 

return cls._activeContext 

 

@classmethod 

def getActiveOrCreate(cls, checkpointPath, setupFunc): 

""" 

Either return the active StreamingContext (i.e. currently started but not stopped), 

or recreate a StreamingContext from checkpoint data or create a new StreamingContext 

using the provided setupFunc function. If the checkpointPath is None or does not contain 

valid checkpoint data, then setupFunc will be called to create a new context and setup 

DStreams. 

 

Parameters 

---------- 

checkpointPath : str 

Checkpoint directory used in an earlier streaming program. Can be 

None if the intention is to always create a new context when there 

is no active context. 

setupFunc : function 

Function to create a new JavaStreamingContext and setup DStreams 

""" 

 

166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true if not callable(setupFunc): 

raise TypeError("setupFunc should be callable.") 

activeContext = cls.getActive() 

if activeContext is not None: 

return activeContext 

elif checkpointPath is not None: 

return cls.getOrCreate(checkpointPath, setupFunc) 

else: 

return setupFunc() 

 

@property 

def sparkContext(self): 

""" 

Return SparkContext which is associated with this StreamingContext. 

""" 

return self._sc 

 

def start(self): 

""" 

Start the execution of the streams. 

""" 

self._jssc.start() 

StreamingContext._activeContext = self 

 

def awaitTermination(self, timeout=None): 

""" 

Wait for the execution to stop. 

 

Parameters 

---------- 

timeout : int, optional 

time to wait in seconds 

""" 

if timeout is None: 

self._jssc.awaitTermination() 

else: 

self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) 

 

def awaitTerminationOrTimeout(self, timeout): 

""" 

Wait for the execution to stop. Return `true` if it's stopped; or 

throw the reported error during the execution; or `false` if the 

waiting time elapsed before returning from the method. 

 

Parameters 

---------- 

timeout : int 

time to wait in seconds 

""" 

return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) 

 

def stop(self, stopSparkContext=True, stopGraceFully=False): 

""" 

Stop the execution of the streams, with option of ensuring all 

received data has been processed. 

 

Parameters 

---------- 

stopSparkContext : bool, optional 

Stop the associated SparkContext or not 

stopGracefully : bool, optional 

Stop gracefully by waiting for the processing of all received 

data to be completed 

""" 

self._jssc.stop(stopSparkContext, stopGraceFully) 

StreamingContext._activeContext = None 

if stopSparkContext: 

self._sc.stop() 

 

def remember(self, duration): 

""" 

Set each DStreams in this context to remember RDDs it generated 

in the last given duration. DStreams remember RDDs only for a 

limited duration of time and releases them for garbage collection. 

This method allows the developer to specify how long to remember 

the RDDs (if the developer wishes to query old data outside the 

DStream computation). 

 

Parameters 

---------- 

duration : int 

Minimum duration (in seconds) that each DStream should remember its RDDs 

""" 

self._jssc.remember(self._jduration(duration)) 

 

def checkpoint(self, directory): 

""" 

Sets the context to periodically checkpoint the DStream operations for master 

fault-tolerance. The graph will be checkpointed every batch interval. 

 

Parameters 

---------- 

directory : str 

HDFS-compatible directory where the checkpoint data will be reliably stored 

""" 

self._jssc.checkpoint(directory) 

 

def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2): 

""" 

Create an input from TCP source hostname:port. Data is received using 

a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited 

lines. 

 

Parameters 

---------- 

hostname : str 

Hostname to connect to for receiving data 

port : int 

Port to connect to for receiving data 

storageLevel : :class:`pyspark.StorageLevel`, optional 

Storage level to use for storing the received objects 

""" 

jlevel = self._sc._getJavaStorageLevel(storageLevel) 

return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, 

UTF8Deserializer()) 

 

def textFileStream(self, directory): 

""" 

Create an input stream that monitors a Hadoop-compatible file system 

for new files and reads them as text files. Files must be written to the 

monitored directory by "moving" them from another location within the same 

file system. File names starting with . are ignored. 

The text files must be encoded as UTF-8. 

""" 

return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) 

 

def binaryRecordsStream(self, directory, recordLength): 

""" 

Create an input stream that monitors a Hadoop-compatible file system 

for new files and reads them as flat binary files with records of 

fixed length. Files must be written to the monitored directory by "moving" 

them from another location within the same file system. 

File names starting with . are ignored. 

 

Parameters 

---------- 

directory : str 

Directory to load data from 

recordLength : int 

Length of each record in bytes 

""" 

return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, 

NoOpSerializer()) 

 

def _check_serializers(self, rdds): 

# make sure they have same serializer 

if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1: 

for i in range(len(rdds)): 

# reset them to sc.serializer 

rdds[i] = rdds[i]._reserialize() 

 

def queueStream(self, rdds, oneAtATime=True, default=None): 

""" 

Create an input stream from a queue of RDDs or list. In each batch, 

it will process either one or all of the RDDs returned by the queue. 

 

Parameters 

---------- 

rdds : list 

Queue of RDDs 

oneAtATime : bool, optional 

pick one rdd each time or pick all of them once. 

default : :class:`pyspark.RDD`, optional 

The default rdd if no more in rdds 

 

Notes 

----- 

Changes to the queue after the stream is created will not be recognized. 

""" 

335 ↛ 336line 335 didn't jump to line 336, because the condition on line 335 was never true if default and not isinstance(default, RDD): 

default = self._sc.parallelize(default) 

 

338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true if not rdds and default: 

rdds = [rdds] 

 

if rdds and not isinstance(rdds[0], RDD): 

rdds = [self._sc.parallelize(input) for input in rdds] 

self._check_serializers(rdds) 

 

queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds]) 

346 ↛ 347line 346 didn't jump to line 347, because the condition on line 346 was never true if default: 

default = default._reserialize(rdds[0]._jrdd_deserializer) 

jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd) 

else: 

jdstream = self._jssc.queueStream(queue, oneAtATime) 

return DStream(jdstream, self, rdds[0]._jrdd_deserializer) 

 

def transform(self, dstreams, transformFunc): 

""" 

Create a new DStream in which each RDD is generated by applying 

a function on RDDs of the DStreams. The order of the JavaRDDs in 

the transform function parameter will be the same as the order 

of corresponding DStreams in the list. 

""" 

jdstreams = [d._jdstream for d in dstreams] 

# change the final serializer to sc.serializer 

func = TransformFunction(self._sc, 

lambda t, *rdds: transformFunc(rdds), 

*[d._jrdd_deserializer for d in dstreams]) 

jfunc = self._jvm.TransformFunction(func) 

jdstream = self._jssc.transform(jdstreams, jfunc) 

return DStream(jdstream, self, self._sc.serializer) 

 

def union(self, *dstreams): 

""" 

Create a unified DStream from multiple DStreams of the same 

type and same slide duration. 

""" 

374 ↛ 375line 374 didn't jump to line 375, because the condition on line 374 was never true if not dstreams: 

raise ValueError("should have at least one DStream to union") 

376 ↛ 377line 376 didn't jump to line 377, because the condition on line 376 was never true if len(dstreams) == 1: 

return dstreams[0] 

378 ↛ 379line 378 didn't jump to line 379, because the condition on line 378 was never true if len(set(s._jrdd_deserializer for s in dstreams)) > 1: 

raise ValueError("All DStreams should have same serializer") 

380 ↛ 381line 380 didn't jump to line 381, because the condition on line 380 was never true if len(set(s._slideDuration for s in dstreams)) > 1: 

raise ValueError("All DStreams should have same slide duration") 

jdstream_cls = SparkContext._jvm.org.apache.spark.streaming.api.java.JavaDStream 

jpair_dstream_cls = SparkContext._jvm.org.apache.spark.streaming.api.java.JavaPairDStream 

gw = SparkContext._gateway 

385 ↛ 387line 385 didn't jump to line 387, because the condition on line 385 was never false if is_instance_of(gw, dstreams[0]._jdstream, jdstream_cls): 

cls = jdstream_cls 

elif is_instance_of(gw, dstreams[0]._jdstream, jpair_dstream_cls): 

cls = jpair_dstream_cls 

else: 

cls_name = dstreams[0]._jdstream.getClass().getCanonicalName() 

raise TypeError("Unsupported Java DStream class %s" % cls_name) 

jdstreams = gw.new_array(cls, len(dstreams)) 

for i in range(0, len(dstreams)): 

jdstreams[i] = dstreams[i]._jdstream 

return DStream(self._jssc.union(jdstreams), self, dstreams[0]._jrdd_deserializer) 

 

def addStreamingListener(self, streamingListener): 

""" 

Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for 

receiving system events related to streaming. 

""" 

self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper( 

self._jvm.PythonStreamingListenerWrapper(streamingListener)))