Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #
"DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"]
"""Base class for data types."""
return hash(str(self))
def typeName(cls):
separators=(',', ':'), sort_keys=True)
""" Does this type needs conversion between Python object and internal SQL object.
This is used to avoid the unnecessary conversion for ArrayType/MapType/StructType. """
""" Converts a Python object into an internal SQL object. """
""" Converts an internal SQL object into a native Python object. """
# This singleton pattern does not work with pickle, you will get # another object after pickle and unpickle """Metaclass for DataType"""
"""Null type.
The data type representing None, used for the types that cannot be inferred. """
"""An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps."""
"""Numeric data types. """
"""Integral data types. """
"""Fractional data types. """
"""String data type. """
"""Binary (byte array) data type. """
"""Boolean data type. """
"""Date (datetime.date) data type. """
"""Timestamp (datetime.datetime) data type. """
else time.mktime(dt.timetuple()))
# using int to avoid precision loss in float
"""Decimal (decimal.Decimal) data type.
The DecimalType must have fixed precision (the maximum total number of digits) and scale (the number of digits on the right of dot). For example, (5, 2) can support the value from [-999.99 to 999.99].
The precision can be up to 38, the scale must be less or equal to precision.
When creating a DecimalType, the default precision and scale is (10, 0). When inferring schema from decimal.Decimal objects, it will be DecimalType(38, 18).
Parameters ---------- precision : int, optional the maximum (i.e. total) number of digits (default: 10) scale : int, optional the number of digits on right side of dot. (default: 0) """
"""Double data type, representing double precision floats. """
"""Float data type, representing single precision floats. """
"""Byte data type, i.e. a signed integer in a single byte. """ return 'tinyint'
"""Int data type, i.e. a signed 32-bit integer. """
"""Long data type, i.e. a signed 64-bit integer.
If the values are beyond the range of [-9223372036854775808, 9223372036854775807], please use :class:`DecimalType`. """
"""Short data type, i.e. a signed 16-bit integer. """ return 'smallint'
"""Array data type.
Parameters ---------- elementType : :class:`DataType` :class:`DataType` of each element in the array. containsNull : bool, optional whether the array can contain null (None) values.
Examples -------- >>> ArrayType(StringType()) == ArrayType(StringType(), True) True >>> ArrayType(StringType(), False) == ArrayType(StringType()) False """
"elementType %s should be an instance of %s" % (elementType, DataType)
str(self.containsNull).lower())
"elementType": self.elementType.jsonValue(), "containsNull": self.containsNull}
def fromJson(cls, json): json["containsNull"])
"""Map data type.
Parameters ---------- keyType : :class:`DataType` :class:`DataType` of the keys in the map. valueType : :class:`DataType` :class:`DataType` of the values in the map. valueContainsNull : bool, optional indicates whether values can contain null (None) values.
Notes ----- Keys in a map data type are not allowed to be null (None).
Examples -------- >>> (MapType(StringType(), IntegerType()) ... == MapType(StringType(), IntegerType(), True)) True >>> (MapType(StringType(), IntegerType(), False) ... == MapType(StringType(), FloatType())) False """
"keyType %s should be an instance of %s" % (keyType, DataType) "valueType %s should be an instance of %s" % (valueType, DataType)
str(self.valueContainsNull).lower())
"keyType": self.keyType.jsonValue(), "valueType": self.valueType.jsonValue(), "valueContainsNull": self.valueContainsNull}
def fromJson(cls, json): _parse_datatype_json_value(json["valueType"]), json["valueContainsNull"])
return obj for k, v in obj.items())
return obj for k, v in obj.items())
"""A field in :class:`StructType`.
Parameters ---------- name : str name of the field. dataType : :class:`DataType` :class:`DataType` of the field. nullable : bool, optional whether the field can be null (None) or not. metadata : dict, optional a dict from string to simple type that can be toInternald to JSON automatically
Examples -------- >>> (StructField("f1", StringType(), True) ... == StructField("f1", StringType(), True)) True >>> (StructField("f1", StringType(), True) ... == StructField("f2", StringType(), True)) False """
"dataType %s should be an instance of %s" % (dataType, DataType)
str(self.nullable).lower())
"type": self.dataType.jsonValue(), "nullable": self.nullable, "metadata": self.metadata}
def fromJson(cls, json): _parse_datatype_json_value(json["type"]), json["nullable"], json["metadata"])
"StructField does not have typeName. " "Use typeName on its type explicitly instead.")
"""Struct type, consisting of a list of :class:`StructField`.
This is the data type representing a :class:`Row`.
Iterating a :class:`StructType` will iterate over its :class:`StructField`\\s. A contained :class:`StructField` can be accessed by its name or position.
Examples -------- >>> struct1 = StructType([StructField("f1", StringType(), True)]) >>> struct1["f1"] StructField(f1,StringType,true) >>> struct1[0] StructField(f1,StringType,true)
>>> struct1 = StructType([StructField("f1", StringType(), True)]) >>> struct2 = StructType([StructField("f1", StringType(), True)]) >>> struct1 == struct2 True >>> struct1 = StructType([StructField("f1", StringType(), True)]) >>> struct2 = StructType([StructField("f1", StringType(), True), ... StructField("f2", IntegerType(), False)]) >>> struct1 == struct2 False """ else: "fields should be a list of StructField" # Precalculated list of fields that need conversion with fromInternal/toInternal functions
""" Construct a :class:`StructType` by adding new elements to it, to define the schema. The method accepts either:
a) A single parameter which is a :class:`StructField` object. b) Between 2 and 4 parameters as (name, data_type, nullable (optional), metadata(optional). The data_type parameter may be either a String or a :class:`DataType` object.
Parameters ---------- field : str or :class:`StructField` Either the name of the field or a :class:`StructField` object data_type : :class:`DataType`, optional If present, the DataType of the :class:`StructField` to create nullable : bool, optional Whether the field to add should be nullable (default True) metadata : dict, optional Any additional metadata (default None)
Returns ------- :class:`StructType`
Examples -------- >>> struct1 = StructType().add("f1", StringType(), True).add("f2", StringType(), True, None) >>> struct2 = StructType([StructField("f1", StringType(), True), \\ ... StructField("f2", StringType(), True, None)]) >>> struct1 == struct2 True >>> struct1 = StructType().add(StructField("f1", StringType(), True)) >>> struct2 = StructType([StructField("f1", StringType(), True)]) >>> struct1 == struct2 True >>> struct1 = StructType().add("f1", "string", True) >>> struct2 = StructType([StructField("f1", StringType(), True)]) >>> struct1 == struct2 True """ else:
else: # Precalculated list of fields that need conversion with fromInternal/toInternal functions
"""Iterate the fields"""
"""Return the number of fields."""
"""Access fields by name or slice.""" else:
",".join(str(field) for field in self))
"fields": [f.jsonValue() for f in self]}
def fromJson(cls, json):
""" Returns all field names in a list.
Examples -------- >>> struct = StructType([StructField("f1", StringType(), True)]) >>> struct.fieldNames() ['f1'] """
# We need convert Row()/namedtuple into tuple()
return
# Only calling toInternal function for fields that need conversion return tuple(f.toInternal(obj.get(n)) if c else obj.get(n) for n, f, c in zip(self.names, self.fields, self._needConversion)) for f, v, c in zip(self.fields, obj, self._needConversion)) elif hasattr(obj, "__dict__"): d = obj.__dict__ return tuple(f.toInternal(d.get(n)) if c else d.get(n) for n, f, c in zip(self.names, self.fields, self._needConversion)) else: raise ValueError("Unexpected tuple %r with StructType" % obj) else: elif hasattr(obj, "__dict__"): d = obj.__dict__ return tuple(d.get(n) for n in self.names) else: raise ValueError("Unexpected tuple %r with StructType" % obj)
return # it's already converted by pickler # Only calling fromInternal function for fields that need conversion for f, v, c in zip(self.fields, obj, self._needConversion)] else:
"""User-defined type (UDT).
.. note:: WARN: Spark Internal Use Only """
def typeName(cls): return cls.__name__.lower()
def sqlType(cls): """ Underlying SQL storage type for this UDT. """ raise NotImplementedError("UDT must implement sqlType().")
def module(cls): """ The Python module of the UDT. """ raise NotImplementedError("UDT must implement module().")
def scalaUDT(cls): """ The class name of the paired Scala UDT (could be '', if there is no corresponding one). """
def _cachedSqlType(cls): """ Cache the sqlType() into class, because it's heavily used in `toInternal`. """
""" Converts a user-type object into a SQL datum. """ raise NotImplementedError("UDT must implement toInternal().")
""" Converts a SQL datum into a user-type object. """ raise NotImplementedError("UDT must implement fromInternal().")
return 'udt'
"type": "udt", "class": self.scalaUDT(), "pyClass": "%s.%s" % (self.module(), type(self).__name__), "sqlType": self.sqlType().jsonValue() } else: "type": "udt", "pyClass": "%s.%s" % (self.module(), type(self).__name__), "serializedClass": base64.b64encode(b).decode('utf8'), "sqlType": self.sqlType().jsonValue() }
def fromJson(cls, json): s = base64.b64decode(json['serializedClass'].encode('utf-8')) UDT = CloudPickleSerializer().loads(s) else:
ByteType, ShortType, IntegerType, LongType, DateType, TimestampType, NullType] for v in [ArrayType, MapType, StructType])
""" Parses the given data type string to a :class:`DataType`. The data type string format equals :class:`DataType.simpleString`, except that the top level struct type can omit the ``struct<>`` and atomic types use ``typeName()`` as their format, e.g. use ``byte`` instead of ``tinyint`` for :class:`ByteType`. We can also use ``int`` as a short name for :class:`IntegerType`. Since Spark 2.3, this also supports a schema in a DDL-formatted string and case-insensitive strings.
Examples -------- >>> _parse_datatype_string("int ") IntegerType >>> _parse_datatype_string("INT ") IntegerType >>> _parse_datatype_string("a: byte, b: decimal( 16 , 8 ) ") StructType(List(StructField(a,ByteType,true),StructField(b,DecimalType(16,8),true))) >>> _parse_datatype_string("a DOUBLE, b STRING") StructType(List(StructField(a,DoubleType,true),StructField(b,StringType,true))) >>> _parse_datatype_string("a: array< short>") StructType(List(StructField(a,ArrayType(ShortType,true),true))) >>> _parse_datatype_string(" map<string , string > ") MapType(StringType,StringType,true)
>>> # Error cases >>> _parse_datatype_string("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ParseException:... >>> _parse_datatype_string("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ParseException:... >>> _parse_datatype_string("array<int") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ParseException:... >>> _parse_datatype_string("map<int, boolean>>") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ParseException:... """
sc._jvm.org.apache.spark.sql.types.StructType.fromDDL(type_str).json())
sc._jvm.org.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(type_str).json())
# DDL format, "fieldname datatype, fieldname datatype". # For backwards compatibility, "integer", "struct<fieldname: datatype>" and etc. # For backwards compatibility, "fieldname: datatype, fieldname: datatype" case.
"""Parses the given data type JSON string.
Examples -------- >>> import pickle >>> def check_datatype(datatype): ... pickled = pickle.loads(pickle.dumps(datatype)) ... assert datatype == pickled ... scala_datatype = spark._jsparkSession.parseDataType(datatype.json()) ... python_datatype = _parse_datatype_json_string(scala_datatype.json()) ... assert datatype == python_datatype >>> for cls in _all_atomic_types.values(): ... check_datatype(cls())
>>> # Simple ArrayType. >>> simple_arraytype = ArrayType(StringType(), True) >>> check_datatype(simple_arraytype)
>>> # Simple MapType. >>> simple_maptype = MapType(StringType(), LongType()) >>> check_datatype(simple_maptype)
>>> # Simple StructType. >>> simple_structtype = StructType([ ... StructField("a", DecimalType(), False), ... StructField("b", BooleanType(), True), ... StructField("c", LongType(), True), ... StructField("d", BinaryType(), False)]) >>> check_datatype(simple_structtype)
>>> # Complex StructType. >>> complex_structtype = StructType([ ... StructField("simpleArray", simple_arraytype, True), ... StructField("simpleMap", simple_maptype, True), ... StructField("simpleStruct", simple_structtype, True), ... StructField("boolean", BooleanType(), False), ... StructField("withMeta", DoubleType(), False, {"name": "age"})]) >>> check_datatype(complex_structtype)
>>> # Complex ArrayType. >>> complex_arraytype = ArrayType(complex_structtype, True) >>> check_datatype(complex_arraytype)
>>> # Complex MapType. >>> complex_maptype = MapType(complex_structtype, ... complex_arraytype, False) >>> check_datatype(complex_maptype) """
return DecimalType() else: raise ValueError("Could not parse datatype: %s" % json_value) else: else: raise ValueError("not supported type: %s" % tpe)
# Mapping Python types to Spark SQL DataType type(None): NullType, bool: BooleanType, int: LongType, float: DoubleType, str: StringType, bytearray: BinaryType, decimal.Decimal: DecimalType, datetime.date: DateType, datetime.datetime: TimestampType, datetime.time: TimestampType, bytes: BinaryType, }
# Mapping Python array types to Spark SQL DataType # We should be careful here. The size of these types in python depends on C # implementation. We need to make sure that this conversion does not lose any # precision. Also, JVM only support signed types, when converting unsigned types, # keep in mind that it require 1 more bit when stored as signed types. # # Reference for C integer size, see: # ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types <limits.h>. # Reference for python array typecode, see: # https://docs.python.org/2/library/array.html # https://docs.python.org/3.6/library/array.html # Reference for JVM's supported integral types: # http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1
'b': ctypes.c_byte, 'h': ctypes.c_short, 'i': ctypes.c_int, 'l': ctypes.c_long, }
'B': ctypes.c_ubyte, 'H': ctypes.c_ushort, 'I': ctypes.c_uint, 'L': ctypes.c_ulong }
""" Return the Catalyst datatype from the size of integers. """
# The list of all supported array typecodes, is stored here # Warning: Actual properties for float and double in C is not specified in C. # On almost every system supported by both python and JVM, they are IEEE 754 # single-precision binary floating-point format and IEEE 754 double-precision # binary floating-point format. And we do assume the same thing here for now. 'f': FloatType, 'd': DoubleType }
# compute array typecode mappings for signed integer types
# compute array typecode mappings for unsigned integer types # JVM does not have unsigned types, so use signed types that is at least 1 # bit larger to store
# Type code 'u' in Python's array is deprecated since version 3.3, and will be # removed in version 4.0. See: https://docs.python.org/3/library/array.html
"""Infer the DataType from obj """
# the precision and scale of `obj` may be different from row to row.
else: _infer_type(value, infer_dict_as_struct), True) else: else: except TypeError: raise TypeError("not supported type: %s" % type(obj))
"""Infer the schema from dict/namedtuple/object"""
else:
else: raise TypeError("Can not infer schema for type: %s" % type(row))
""" Return whether there is a NullType in `dt` or not """ else:
else:
# TODO: type cast (such as int -> long)
# same type name=new_name(f.name))) for f in a.fields]
name='element in array %s' % name), True)
_merge_type(a.valueType, b.valueType, name='value of map %s' % name), True) else:
return True else:
"""Create a converter to drop the names of fields in obj """
return lambda x: None
return lambda x: x
# dataType must be StructType
return
else:
else: raise TypeError("Unexpected obj type: %s" % type(obj))
return tuple([conv(d.get(name)) for name, conv in zip(names, converters)]) else:
BooleanType: (bool,), ByteType: (int,), ShortType: (int,), IntegerType: (int,), LongType: (int,), FloatType: (float,), DoubleType: (float,), DecimalType: (decimal.Decimal,), StringType: (str,), BinaryType: (bytearray, bytes), DateType: (datetime.date, datetime.datetime), TimestampType: (datetime.datetime,), ArrayType: (list, tuple, array), MapType: (dict,), StructType: (tuple, list, dict), }
""" Make a verifier that checks the type of obj against dataType and raises a TypeError if they do not match.
This verifier also checks the value of obj against datatype and raises a ValueError if it's not within the allowed range, e.g. using 128 as ByteType will overflow. Note that, Python float is not checked, so it will become infinity when cast to Java float, if it overflows.
Examples -------- >>> _make_type_verifier(StructType([]))(None) >>> _make_type_verifier(StringType())("") >>> _make_type_verifier(LongType())(0) >>> _make_type_verifier(LongType())(1 << 64) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> _make_type_verifier(ArrayType(ShortType()))(list(range(3))) >>> _make_type_verifier(ArrayType(StringType()))(set()) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... TypeError:... >>> _make_type_verifier(MapType(StringType(), IntegerType()))({}) >>> _make_type_verifier(StructType([]))(()) >>> _make_type_verifier(StructType([]))([]) >>> _make_type_verifier(StructType([]))([1]) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> # Check if numeric values are within the allowed range. >>> _make_type_verifier(ByteType())(12) >>> _make_type_verifier(ByteType())(1234) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> _make_type_verifier(ByteType(), False)(None) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> _make_type_verifier( ... ArrayType(ShortType(), False))([1, None]) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> _make_type_verifier(MapType(StringType(), IntegerType()))({None: 1}) Traceback (most recent call last): ... ValueError:... >>> schema = StructType().add("a", IntegerType()).add("b", StringType(), False) >>> _make_type_verifier(schema)((1, None)) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... """
else:
else: else:
new_msg("unknown datatype: %s for object %r" % (dataType, obj))
# subclass of them can not be fromInternal in JVM % (dataType, obj, type(obj))))
# StringType can work with any types
new_msg("object of IntegerType out of range, got: %s" % obj))
new_msg("object of LongType out of range, got: %s" % obj))
dataType.elementType, dataType.containsNull, name="element in array %s" % name)
dataType.valueType, dataType.valueContainsNull, name="value of map %s" % name)
new_msg("Length of object (%d) does not match with " "length of fields (%d)" % (len(obj), len(verifiers)))) else: raise TypeError(new_msg("StructType can not accept object %r in type %s" % (obj, type(obj))))
else:
# This is used to unpickle a Row from JVM
""" A row in :class:`DataFrame`. The fields in it can be accessed:
* like attributes (``row.key``) * like dictionary values (``row[key]``)
``key in row`` will search through row keys.
Row can be used to create a row object by using named arguments. It is not allowed to omit a named argument to represent that the value is None or missing. This should be explicitly set to None in this case.
.. versionchanged:: 3.0.0 Rows created from named arguments no longer have field names sorted alphabetically and will be ordered in the position as entered.
Examples -------- >>> row = Row(name="Alice", age=11) >>> row Row(name='Alice', age=11) >>> row['name'], row['age'] ('Alice', 11) >>> row.name, row.age ('Alice', 11) >>> 'name' in row True >>> 'wrong_key' in row False
Row also can be used to create another Row like class, then it could be used to create Row objects, such as
>>> Person = Row("name", "age") >>> Person <Row('name', 'age')> >>> 'name' in Person True >>> 'wrong_key' in Person False >>> Person("Alice", 11) Row(name='Alice', age=11)
This form can also be used to create rows as tuple values, i.e. with unnamed fields.
>>> row1 = Row("Alice", 11) >>> row2 = Row(name="Alice", age=11) >>> row1 == row2 True """
raise ValueError("Can not use both args " "and kwargs to create Row") # create row objects else: # create row class or objects
""" Return as a dict
Parameters ---------- recursive : bool, optional turns the nested Rows to dict (default: False).
Notes ----- If a row contains duplicate field names, e.g., the rows of a join between two :class:`DataFrame` that both have the fields of same names, one of the duplicate fields will be selected by ``asDict``. ``__getitem__`` will also return one of the duplicate fields, however returned value might be different to ``asDict``.
Examples -------- >>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11} True >>> row = Row(key=1, value=Row(name='a', age=2)) >>> row.asDict() == {'key': 1, 'value': Row(name='a', age=2)} True >>> row.asDict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}} True """ raise TypeError("Cannot convert a Row class into dict")
return [conv(o) for o in obj] return dict((k, conv(v)) for k, v in obj.items()) else: else:
else:
# let object acts like class """create new Row object""" "but got %s" % (self, len(self), args))
# it will be slow when it has many fields, # but this will not be used in normal cases except IndexError: raise KeyError(item) except ValueError: raise ValueError(item)
# it will be slow when it has many fields, # but this will not be used in normal cases raise AttributeError(item)
raise RuntimeError("Row is read-only")
"""Returns a tuple so Python knows how to pickle Row.""" else:
"""Printable representation of Row used in Python REPL.""" for k, v in zip(self.__fields__, tuple(self))) else:
else time.mktime(obj.timetuple()))
# datetime is a subclass of date, we should register DatetimeConverter first
sys.exit(-1)
|