Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

import operator 

import time 

from itertools import chain 

from datetime import datetime 

 

from py4j.protocol import Py4JJavaError 

 

from pyspark import RDD 

from pyspark.storagelevel import StorageLevel 

from pyspark.streaming.util import rddToFileName, TransformFunction 

from pyspark.rdd import portable_hash 

from pyspark.resultiterable import ResultIterable 

 

__all__ = ["DStream"] 

 

 

class DStream(object): 

""" 

A Discretized Stream (DStream), the basic abstraction in Spark Streaming, 

is a continuous sequence of RDDs (of the same type) representing a 

continuous stream of data (see :class:`RDD` in the Spark core documentation 

for more details on RDDs). 

 

DStreams can either be created from live data (such as, data from TCP 

sockets, etc.) using a :class:`StreamingContext` or it can be 

generated by transforming existing DStreams using operations such as 

`map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming 

program is running, each DStream periodically generates a RDD, either 

from live data or by transforming the RDD generated by a parent DStream. 

 

DStreams internally is characterized by a few basic properties: 

- A list of other DStreams that the DStream depends on 

- A time interval at which the DStream generates an RDD 

- A function that is used to generate an RDD after each time interval 

""" 

def __init__(self, jdstream, ssc, jrdd_deserializer): 

self._jdstream = jdstream 

self._ssc = ssc 

self._sc = ssc._sc 

self._jrdd_deserializer = jrdd_deserializer 

self.is_cached = False 

self.is_checkpointed = False 

 

def context(self): 

""" 

Return the StreamingContext associated with this DStream 

""" 

return self._ssc 

 

def count(self): 

""" 

Return a new DStream in which each RDD has a single element 

generated by counting each RDD of this DStream. 

""" 

return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add) 

 

def filter(self, f): 

""" 

Return a new DStream containing only the elements that satisfy predicate. 

""" 

def func(iterator): 

return filter(f, iterator) 

return self.mapPartitions(func, True) 

 

def flatMap(self, f, preservesPartitioning=False): 

""" 

Return a new DStream by applying a function to all elements of 

this DStream, and then flattening the results 

""" 

def func(s, iterator): 

return chain.from_iterable(map(f, iterator)) 

return self.mapPartitionsWithIndex(func, preservesPartitioning) 

 

def map(self, f, preservesPartitioning=False): 

""" 

Return a new DStream by applying a function to each element of DStream. 

""" 

def func(iterator): 

return map(f, iterator) 

return self.mapPartitions(func, preservesPartitioning) 

 

def mapPartitions(self, f, preservesPartitioning=False): 

""" 

Return a new DStream in which each RDD is generated by applying 

mapPartitions() to each RDDs of this DStream. 

""" 

def func(s, iterator): 

return f(iterator) 

return self.mapPartitionsWithIndex(func, preservesPartitioning) 

 

def mapPartitionsWithIndex(self, f, preservesPartitioning=False): 

""" 

Return a new DStream in which each RDD is generated by applying 

mapPartitionsWithIndex() to each RDDs of this DStream. 

""" 

return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning)) 

 

def reduce(self, func): 

""" 

Return a new DStream in which each RDD has a single element 

generated by reducing each RDD of this DStream. 

""" 

return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) 

 

def reduceByKey(self, func, numPartitions=None): 

""" 

Return a new DStream by applying reduceByKey to each RDD. 

""" 

if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.combineByKey(lambda x: x, func, func, numPartitions) 

 

def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 

numPartitions=None): 

""" 

Return a new DStream by applying combineByKey to each RDD. 

""" 

if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

 

def func(rdd): 

return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions) 

return self.transform(func) 

 

def partitionBy(self, numPartitions, partitionFunc=portable_hash): 

""" 

Return a copy of the DStream in which each RDD are partitioned 

using the specified partitioner. 

""" 

return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc)) 

 

def foreachRDD(self, func): 

""" 

Apply a function to each RDD in this DStream. 

""" 

if func.__code__.co_argcount == 1: 

old_func = func 

func = lambda t, rdd: old_func(rdd) 

jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) 

api = self._ssc._jvm.PythonDStream 

api.callForeachRDD(self._jdstream, jfunc) 

 

def pprint(self, num=10): 

""" 

Print the first num elements of each RDD generated in this DStream. 

 

Parameters 

---------- 

num : int, optional 

the number of elements from the first will be printed. 

""" 

def takeAndPrint(time, rdd): 

taken = rdd.take(num + 1) 

print("-------------------------------------------") 

print("Time: %s" % time) 

print("-------------------------------------------") 

for record in taken[:num]: 

print(record) 

if len(taken) > num: 

print("...") 

print("") 

 

self.foreachRDD(takeAndPrint) 

 

def mapValues(self, f): 

""" 

Return a new DStream by applying a map function to the value of 

each key-value pairs in this DStream without changing the key. 

""" 

map_values_fn = lambda kv: (kv[0], f(kv[1])) 

return self.map(map_values_fn, preservesPartitioning=True) 

 

def flatMapValues(self, f): 

""" 

Return a new DStream by applying a flatmap function to the value 

of each key-value pairs in this DStream without changing the key. 

""" 

flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) 

return self.flatMap(flat_map_fn, preservesPartitioning=True) 

 

def glom(self): 

""" 

Return a new DStream in which RDD is generated by applying glom() 

to RDD of this DStream. 

""" 

def func(iterator): 

yield list(iterator) 

return self.mapPartitions(func) 

 

def cache(self): 

""" 

Persist the RDDs of this DStream with the default storage level 

(`MEMORY_ONLY`). 

""" 

self.is_cached = True 

self.persist(StorageLevel.MEMORY_ONLY) 

return self 

 

def persist(self, storageLevel): 

""" 

Persist the RDDs of this DStream with the given storage level 

""" 

self.is_cached = True 

javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) 

self._jdstream.persist(javaStorageLevel) 

return self 

 

def checkpoint(self, interval): 

""" 

Enable periodic checkpointing of RDDs of this DStream 

 

Parameters 

---------- 

interval : int 

time in seconds, after each period of that, generated 

RDD will be checkpointed 

""" 

self.is_checkpointed = True 

self._jdstream.checkpoint(self._ssc._jduration(interval)) 

return self 

 

def groupByKey(self, numPartitions=None): 

""" 

Return a new DStream by applying groupByKey on each RDD. 

""" 

243 ↛ 245line 243 didn't jump to line 245, because the condition on line 243 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.transform(lambda rdd: rdd.groupByKey(numPartitions)) 

 

def countByValue(self): 

""" 

Return a new DStream in which each RDD contains the counts of each 

distinct value in each RDD of this DStream. 

""" 

return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) 

 

def saveAsTextFiles(self, prefix, suffix=None): 

""" 

Save each RDD in this DStream as at text file, using string 

representation of elements. 

""" 

def saveAsTextFile(t, rdd): 

path = rddToFileName(prefix, suffix, t) 

try: 

rdd.saveAsTextFile(path) 

except Py4JJavaError as e: 

# after recovered from checkpointing, the foreachRDD may 

# be called twice 

if 'FileAlreadyExistsException' not in str(e): 

raise 

return self.foreachRDD(saveAsTextFile) 

 

# TODO: uncomment this until we have ssc.pickleFileStream() 

# def saveAsPickleFiles(self, prefix, suffix=None): 

# """ 

# Save each RDD in this DStream as at binary file, the elements are 

# serialized by pickle. 

# """ 

# def saveAsPickleFile(t, rdd): 

# path = rddToFileName(prefix, suffix, t) 

# try: 

# rdd.saveAsPickleFile(path) 

# except Py4JJavaError as e: 

# # after recovered from checkpointing, the foreachRDD may 

# # be called twice 

# if 'FileAlreadyExistsException' not in str(e): 

# raise 

# return self.foreachRDD(saveAsPickleFile) 

 

def transform(self, func): 

""" 

Return a new DStream in which each RDD is generated by applying a function 

on each RDD of this DStream. 

 

`func` can have one argument of `rdd`, or have two arguments of 

(`time`, `rdd`) 

""" 

295 ↛ 298line 295 didn't jump to line 298, because the condition on line 295 was never false if func.__code__.co_argcount == 1: 

oldfunc = func 

func = lambda t, rdd: oldfunc(rdd) 

assert func.__code__.co_argcount == 2, "func should take one or two arguments" 

return TransformedDStream(self, func) 

 

def transformWith(self, func, other, keepSerializer=False): 

""" 

Return a new DStream in which each RDD is generated by applying a function 

on each RDD of this DStream and 'other' DStream. 

 

`func` can have two arguments of (`rdd_a`, `rdd_b`) or have three 

arguments of (`time`, `rdd_a`, `rdd_b`) 

""" 

309 ↛ 312line 309 didn't jump to line 312, because the condition on line 309 was never false if func.__code__.co_argcount == 2: 

oldfunc = func 

func = lambda t, a, b: oldfunc(a, b) 

assert func.__code__.co_argcount == 3, "func should take two or three arguments" 

jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer) 

dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(), 

other._jdstream.dstream(), jfunc) 

jrdd_serializer = self._jrdd_deserializer if keepSerializer else self._sc.serializer 

return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer) 

 

def repartition(self, numPartitions): 

""" 

Return a new DStream with an increased or decreased level of parallelism. 

""" 

return self.transform(lambda rdd: rdd.repartition(numPartitions)) 

 

@property 

def _slideDuration(self): 

""" 

Return the slideDuration in seconds of this DStream 

""" 

return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0 

 

def union(self, other): 

""" 

Return a new DStream by unifying data of another DStream with this DStream. 

 

Parameters 

---------- 

other : :class:`DStream` 

Another DStream having the same interval (i.e., slideDuration) 

as this DStream. 

""" 

342 ↛ 343line 342 didn't jump to line 343, because the condition on line 342 was never true if self._slideDuration != other._slideDuration: 

raise ValueError("the two DStream should have same slide duration") 

return self.transformWith(lambda a, b: a.union(b), other, True) 

 

def cogroup(self, other, numPartitions=None): 

""" 

Return a new DStream by applying 'cogroup' between RDDs of this 

DStream and `other` DStream. 

 

Hash partitioning is used to generate the RDDs with `numPartitions` partitions. 

""" 

353 ↛ 355line 353 didn't jump to line 355, because the condition on line 353 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other) 

 

def join(self, other, numPartitions=None): 

""" 

Return a new DStream by applying 'join' between RDDs of this DStream and 

`other` DStream. 

 

Hash partitioning is used to generate the RDDs with `numPartitions` 

partitions. 

""" 

365 ↛ 367line 365 didn't jump to line 367, because the condition on line 365 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.transformWith(lambda a, b: a.join(b, numPartitions), other) 

 

def leftOuterJoin(self, other, numPartitions=None): 

""" 

Return a new DStream by applying 'left outer join' between RDDs of this DStream and 

`other` DStream. 

 

Hash partitioning is used to generate the RDDs with `numPartitions` 

partitions. 

""" 

377 ↛ 379line 377 didn't jump to line 379, because the condition on line 377 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.transformWith(lambda a, b: a.leftOuterJoin(b, numPartitions), other) 

 

def rightOuterJoin(self, other, numPartitions=None): 

""" 

Return a new DStream by applying 'right outer join' between RDDs of this DStream and 

`other` DStream. 

 

Hash partitioning is used to generate the RDDs with `numPartitions` 

partitions. 

""" 

389 ↛ 391line 389 didn't jump to line 391, because the condition on line 389 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other) 

 

def fullOuterJoin(self, other, numPartitions=None): 

""" 

Return a new DStream by applying 'full outer join' between RDDs of this DStream and 

`other` DStream. 

 

Hash partitioning is used to generate the RDDs with `numPartitions` 

partitions. 

""" 

401 ↛ 403line 401 didn't jump to line 403, because the condition on line 401 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other) 

 

def _jtime(self, timestamp): 

""" Convert datetime or unix_timestamp into Time 

""" 

408 ↛ 410line 408 didn't jump to line 410, because the condition on line 408 was never false if isinstance(timestamp, datetime): 

timestamp = time.mktime(timestamp.timetuple()) 

return self._sc._jvm.Time(int(timestamp * 1000)) 

 

def slice(self, begin, end): 

""" 

Return all the RDDs between 'begin' to 'end' (both included) 

 

`begin`, `end` could be datetime.datetime() or unix_timestamp 

""" 

jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end)) 

return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds] 

 

def _validate_window_param(self, window, slide): 

duration = self._jdstream.dstream().slideDuration().milliseconds() 

if int(window * 1000) % duration != 0: 

raise ValueError("windowDuration must be multiple of the parent " 

"dstream's slide (batch) duration (%d ms)" 

% duration) 

if slide and int(slide * 1000) % duration != 0: 

raise ValueError("slideDuration must be multiple of the parent " 

"dstream's slide (batch) duration (%d ms)" 

% duration) 

 

def window(self, windowDuration, slideDuration=None): 

""" 

Return a new DStream in which each RDD contains all the elements in seen in a 

sliding window of time over this DStream. 

 

Parameters 

---------- 

windowDuration : int 

width of the window; must be a multiple of this DStream's 

batching interval 

slideDuration : int, optional 

sliding interval of the window (i.e., the interval after which 

the new DStream will generate RDDs); must be a multiple of this 

DStream's batching interval 

""" 

self._validate_window_param(windowDuration, slideDuration) 

d = self._ssc._jduration(windowDuration) 

449 ↛ 450line 449 didn't jump to line 450, because the condition on line 449 was never true if slideDuration is None: 

return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) 

s = self._ssc._jduration(slideDuration) 

return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer) 

 

def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): 

""" 

Return a new DStream in which each RDD has a single element generated by reducing all 

elements in a sliding window over this DStream. 

 

if `invReduceFunc` is not None, the reduction is done incrementally 

using the old window's reduced value : 

 

1. reduce the new values that entered the window (e.g., adding new counts) 

 

2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) 

This is more efficient than `invReduceFunc` is None. 

 

Parameters 

---------- 

reduceFunc : function 

associative and commutative reduce function 

invReduceFunc : function 

inverse reduce function of `reduceFunc`; such that for all y, 

and invertible x: 

`invReduceFunc(reduceFunc(x, y), x) = y` 

windowDuration : int 

width of the window; must be a multiple of this DStream's 

batching interval 

slideDuration : int 

sliding interval of the window (i.e., the interval after which 

the new DStream will generate RDDs); must be a multiple of this 

DStream's batching interval 

""" 

keyed = self.map(lambda x: (1, x)) 

reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, 

windowDuration, slideDuration, 1) 

return reduced.map(lambda kv: kv[1]) 

 

def countByWindow(self, windowDuration, slideDuration): 

""" 

Return a new DStream in which each RDD has a single element generated 

by counting the number of elements in a window over this DStream. 

windowDuration and slideDuration are as defined in the window() operation. 

 

This is equivalent to window(windowDuration, slideDuration).count(), 

but will be more efficient if window is large. 

""" 

return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub, 

windowDuration, slideDuration) 

 

def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None): 

""" 

Return a new DStream in which each RDD contains the count of distinct elements in 

RDDs in a sliding window over this DStream. 

 

Parameters 

---------- 

windowDuration : int 

width of the window; must be a multiple of this DStream's 

batching interval 

slideDuration : int 

sliding interval of the window (i.e., the interval after which 

the new DStream will generate RDDs); must be a multiple of this 

DStream's batching interval 

numPartitions : int, optional 

number of partitions of each RDD in the new DStream. 

""" 

keyed = self.map(lambda x: (x, 1)) 

counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, 

windowDuration, slideDuration, numPartitions) 

return counted.filter(lambda kv: kv[1] > 0) 

 

def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): 

""" 

Return a new DStream by applying `groupByKey` over a sliding window. 

Similar to `DStream.groupByKey()`, but applies it over a sliding window. 

 

Parameters 

---------- 

windowDuration : int 

width of the window; must be a multiple of this DStream's 

batching interval 

slideDuration : int 

sliding interval of the window (i.e., the interval after which 

the new DStream will generate RDDs); must be a multiple of this 

DStream's batching interval 

numPartitions : int, optional 

Number of partitions of each RDD in the new DStream. 

""" 

ls = self.mapValues(lambda x: [x]) 

grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], 

windowDuration, slideDuration, numPartitions) 

return grouped.mapValues(ResultIterable) 

 

def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None, 

numPartitions=None, filterFunc=None): 

""" 

Return a new DStream by applying incremental `reduceByKey` over a sliding window. 

 

The reduced value of over a new window is calculated using the old window's reduce value : 

1. reduce the new values that entered the window (e.g., adding new counts) 

2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) 

 

`invFunc` can be None, then it will reduce all the RDDs in window, could be slower 

than having `invFunc`. 

 

Parameters 

---------- 

func : function 

associative and commutative reduce function 

invFunc : function 

inverse function of `reduceFunc` 

windowDuration : int 

width of the window; must be a multiple of this DStream's 

batching interval 

slideDuration : int, optional 

sliding interval of the window (i.e., the interval after which 

the new DStream will generate RDDs); must be a multiple of this 

DStream's batching interval 

numPartitions : int, optional 

number of partitions of each RDD in the new DStream. 

filterFunc : function, optional 

function to filter expired key-value pairs; 

only pairs that satisfy the function are retained 

set this to null if you do not want to filter 

""" 

self._validate_window_param(windowDuration, slideDuration) 

if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

 

reduced = self.reduceByKey(func, numPartitions) 

 

if invFunc: 

def reduceFunc(t, a, b): 

b = b.reduceByKey(func, numPartitions) 

r = a.union(b).reduceByKey(func, numPartitions) if a else b 

586 ↛ 587line 586 didn't jump to line 587, because the condition on line 586 was never true if filterFunc: 

r = r.filter(filterFunc) 

return r 

 

def invReduceFunc(t, a, b): 

b = b.reduceByKey(func, numPartitions) 

joined = a.leftOuterJoin(b, numPartitions) 

return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) 

if kv[1] is not None else kv[0]) 

 

jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) 

jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) 

598 ↛ 599line 598 didn't jump to line 599, because the condition on line 598 was never true if slideDuration is None: 

slideDuration = self._slideDuration 

dstream = self._sc._jvm.PythonReducedWindowedDStream( 

reduced._jdstream.dstream(), 

jreduceFunc, jinvReduceFunc, 

self._ssc._jduration(windowDuration), 

self._ssc._jduration(slideDuration)) 

return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) 

else: 

return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) 

 

def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): 

""" 

Return a new "state" DStream where the state for each key is updated by applying 

the given function on the previous state of the key and the new values of the key. 

 

Parameters 

---------- 

updateFunc : function 

State update function. If this function returns None, then 

corresponding state key-value pair will be eliminated. 

""" 

620 ↛ 623line 620 didn't jump to line 623, because the condition on line 620 was never false if numPartitions is None: 

numPartitions = self._sc.defaultParallelism 

 

623 ↛ 624line 623 didn't jump to line 624, because the condition on line 623 was never true if initialRDD and not isinstance(initialRDD, RDD): 

initialRDD = self._sc.parallelize(initialRDD) 

 

def reduceFunc(t, a, b): 

if a is None: 

g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) 

else: 

g = a.cogroup(b.partitionBy(numPartitions), numPartitions) 

g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None)) 

state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1])) 

return state.filter(lambda k_v: k_v[1] is not None) 

 

jreduceFunc = TransformFunction(self._sc, reduceFunc, 

self._sc.serializer, self._jrdd_deserializer) 

if initialRDD: 

initialRDD = initialRDD._reserialize(self._jrdd_deserializer) 

dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc, 

initialRDD._jrdd) 

else: 

dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc) 

 

return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) 

 

 

class TransformedDStream(DStream): 

""" 

TransformedDStream is a DStream generated by an Python function 

transforming each RDD of a DStream to another RDDs. 

 

Multiple continuous transformations of DStream can be combined into 

one transformation. 

""" 

def __init__(self, prev, func): 

self._ssc = prev._ssc 

self._sc = self._ssc._sc 

self._jrdd_deserializer = self._sc.serializer 

self.is_cached = False 

self.is_checkpointed = False 

self._jdstream_val = None 

 

# Using type() to avoid folding the functions and compacting the DStreams which is not 

# not strictly an object of TransformedDStream. 

if (type(prev) is TransformedDStream and 

not prev.is_cached and not prev.is_checkpointed): 

prev_func = prev.func 

self.func = lambda t, rdd: func(t, prev_func(t, rdd)) 

self.prev = prev.prev 

else: 

self.prev = prev 

self.func = func 

 

@property 

def _jdstream(self): 

if self._jdstream_val is not None: 

return self._jdstream_val 

 

jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer) 

dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc) 

self._jdstream_val = dstream.asJavaDStream() 

return self._jdstream_val