Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

# 

# Licensed to the Apache Software Foundation (ASF) under one or more 

# contributor license agreements. See the NOTICE file distributed with 

# this work for additional information regarding copyright ownership. 

# The ASF licenses this file to You under the Apache License, Version 2.0 

# (the "License"); you may not use this file except in compliance with 

# the License. You may obtain a copy of the License at 

# 

# http://www.apache.org/licenses/LICENSE-2.0 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

# 

 

""" 

A collections of builtin avro functions 

""" 

 

 

from pyspark import SparkContext 

from pyspark.sql.column import Column, _to_java_column 

from pyspark.util import _print_missing_jar 

 

 

def from_avro(data, jsonFormatSchema, options=None): 

""" 

Converts a binary column of Avro format into its corresponding catalyst value. 

The specified schema must match the read data, otherwise the behavior is undefined: 

it may fail or return arbitrary result. 

To deserialize the data with a compatible and evolved schema, the expected Avro schema can be 

set via the option avroSchema. 

 

.. versionadded:: 3.0.0 

 

Parameters 

---------- 

data : :class:`~pyspark.sql.Column` or str 

the binary column. 

jsonFormatSchema : str 

the avro schema in JSON string format. 

options : dict, optional 

options to control how the Avro record is parsed. 

 

Notes 

----- 

Avro is built-in but external data source module since Spark 2.4. Please deploy the 

application as per the deployment section of "Apache Avro Data Source Guide". 

 

Examples 

-------- 

>>> from pyspark.sql import Row 

>>> from pyspark.sql.avro.functions import from_avro, to_avro 

>>> data = [(1, Row(age=2, name='Alice'))] 

>>> df = spark.createDataFrame(data, ("key", "value")) 

>>> avroDf = df.select(to_avro(df.value).alias("avro")) 

>>> avroDf.collect() 

[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] 

 

>>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": 

... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", 

... "fields":[{"name":"age","type":["long","null"]}, 

... {"name":"name","type":["string","null"]}]},"null"]}]}''' 

>>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() 

[Row(value=Row(avro=Row(age=2, name='Alice')))] 

""" 

 

sc = SparkContext._active_spark_context 

try: 

jc = sc._jvm.org.apache.spark.sql.avro.functions.from_avro( 

_to_java_column(data), jsonFormatSchema, options or {}) 

except TypeError as e: 

if str(e) == "'JavaPackage' object is not callable": 

_print_missing_jar("Avro", "avro", "avro", sc.version) 

raise 

return Column(jc) 

 

 

def to_avro(data, jsonFormatSchema=""): 

""" 

Converts a column into binary of avro format. 

 

.. versionadded:: 3.0.0 

 

Parameters 

---------- 

data : :class:`~pyspark.sql.Column` or str 

the data column. 

jsonFormatSchema : str, optional 

user-specified output avro schema in JSON string format. 

 

Notes 

----- 

Avro is built-in but external data source module since Spark 2.4. Please deploy the 

application as per the deployment section of "Apache Avro Data Source Guide". 

 

Examples 

-------- 

>>> from pyspark.sql import Row 

>>> from pyspark.sql.avro.functions import to_avro 

>>> data = ['SPADES'] 

>>> df = spark.createDataFrame(data, "string") 

>>> df.select(to_avro(df.value).alias("suite")).collect() 

[Row(suite=bytearray(b'\\x00\\x0cSPADES'))] 

 

>>> jsonFormatSchema = '''["null", {"type": "enum", "name": "value", 

... "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}]''' 

>>> df.select(to_avro(df.value, jsonFormatSchema).alias("suite")).collect() 

[Row(suite=bytearray(b'\\x02\\x00'))] 

""" 

 

sc = SparkContext._active_spark_context 

try: 

if jsonFormatSchema == "": 

jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data)) 

else: 

jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro( 

_to_java_column(data), jsonFormatSchema) 

except TypeError as e: 

if str(e) == "'JavaPackage' object is not callable": 

_print_missing_jar("Avro", "avro", "avro", sc.version) 

raise 

return Column(jc) 

 

 

def _test(): 

import os 

import sys 

from pyspark.testing.utils import search_jar 

avro_jar = search_jar("external/avro", "spark-avro", "spark-avro") 

133 ↛ 134line 133 didn't jump to line 134, because the condition on line 133 was never true if avro_jar is None: 

print( 

"Skipping all Avro Python tests as the optional Avro project was " 

"not compiled into a JAR. To run these tests, " 

"you need to build Spark with 'build/sbt -Pavro package' or " 

"'build/mvn -Pavro package' before running this test.") 

sys.exit(0) 

else: 

existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") 

jars_args = "--jars %s" % avro_jar 

os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) 

 

import doctest 

from pyspark.sql import SparkSession 

import pyspark.sql.avro.functions 

globs = pyspark.sql.avro.functions.__dict__.copy() 

spark = SparkSession.builder\ 

.master("local[4]")\ 

.appName("sql.avro.functions tests")\ 

.getOrCreate() 

globs['spark'] = spark 

(failure_count, test_count) = doctest.testmod( 

pyspark.sql.avro.functions, globs=globs, 

optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) 

spark.stop() 

158 ↛ 159line 158 didn't jump to line 159, because the condition on line 158 was never true if failure_count: 

sys.exit(-1) 

 

 

if __name__ == "__main__": 

_test()