Package pyspark :: Module sql :: Class SchemaRDD
[frames] | no frames]

Class SchemaRDD

source code

object --+    
         |    
   rdd.RDD --+
             |
            SchemaRDD

An RDD of Row objects that has an associated schema.

The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL.

For normal pyspark.rdd.RDD operations (map, count, etc.) the SchemaRDD is not operated on directly, as it's underlying implementation is an RDD composed of Java objects. Instead it is converted to a PythonRDD in the JVM, on which Python operations can be done.

This class receives raw tuples from Java but assigns a class to it in all its data-collection methods (mapPartitionsWithIndex, collect, take, etc) so that PySpark sees them as Row objects with named fields.

Instance Methods
 
__init__(self, jschema_rdd, sql_ctx)
x.__init__(...) initializes x; see help(type(x)) for signature
source code
 
saveAsParquetFile(self, path)
Save the contents as a Parquet file, preserving the schema.
source code
 
registerTempTable(self, name)
Registers this RDD as a temporary table using the given name.
source code
 
registerAsTable(self, name) source code
 
insertInto(self, tableName, overwrite=False)
Inserts the contents of this SchemaRDD into the specified table.
source code
 
saveAsTable(self, tableName)
Creates a new table with the contents of this SchemaRDD.
source code
 
schema(self)
Returns the schema of this SchemaRDD (represented by a StructType).
source code
 
schemaString(self)
Returns the output schema in the tree format.
source code
 
printSchema(self)
Prints out the schema in the tree format.
source code
 
count(self)
Return the number of elements in this RDD.
source code
 
collect(self)
Return a list that contains all of the rows in this RDD.
source code
 
mapPartitionsWithIndex(self, f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
source code
 
cache(self)
Persist this RDD with the default storage level (MEMORY_ONLY_SER).
source code
 
persist(self, storageLevel)
Set this RDD's storage level to persist its values across operations after the first time it is computed.
source code
 
unpersist(self, blocking=True)
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
source code
 
checkpoint(self)
Mark this RDD for checkpointing.
source code
 
isCheckpointed(self)
Return whether this RDD has been checkpointed or not
source code
 
getCheckpointFile(self)
Gets the name of the file to which this RDD was checkpointed
source code
 
coalesce(self, numPartitions, shuffle=False)
Return a new RDD that is reduced into `numPartitions` partitions.
source code
 
distinct(self)
Return a new RDD containing the distinct elements in this RDD.
source code
 
intersection(self, other)
Return the intersection of this RDD and another one.
source code
 
repartition(self, numPartitions)
Return a new RDD that has exactly numPartitions partitions.
source code
 
subtract(self, other, numPartitions=None)
Return each value in self that is not contained in other.
source code

Inherited from rdd.RDD: __add__, __repr__, aggregate, aggregateByKey, cartesian, cogroup, collectAsMap, combineByKey, context, countByKey, countByValue, filter, first, flatMap, flatMapValues, fold, foldByKey, foreach, foreachPartition, getNumPartitions, getStorageLevel, glom, groupBy, groupByKey, groupWith, histogram, id, join, keyBy, keys, leftOuterJoin, map, mapPartitions, mapPartitionsWithSplit, mapValues, max, mean, min, name, partitionBy, pipe, reduce, reduceByKey, reduceByKeyLocally, rightOuterJoin, sample, sampleByKey, sampleStdev, sampleVariance, saveAsHadoopDataset, saveAsHadoopFile, saveAsNewAPIHadoopDataset, saveAsNewAPIHadoopFile, saveAsPickleFile, saveAsSequenceFile, saveAsTextFile, setName, sortBy, sortByKey, stats, stdev, subtractByKey, sum, take, takeOrdered, takeSample, toDebugString, top, union, values, variance, zip, zipWithIndex, zipWithUniqueId

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __setattr__, __sizeof__, __str__, __subclasshook__

Properties

Inherited from object: __class__

Method Details

__init__(self, jschema_rdd, sql_ctx)
(Constructor)

source code 

x.__init__(...) initializes x; see help(type(x)) for signature

Overrides: object.__init__
(inherited documentation)

saveAsParquetFile(self, path)

source code 

Save the contents as a Parquet file, preserving the schema.

Files that are written out using this method can be read back in as a SchemaRDD using the SQLContext.parquetFile method.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(srdd2.collect()) == sorted(srdd.collect())
True

registerTempTable(self, name)

source code 

Registers this RDD as a temporary table using the given name.

The lifetime of this temporary table is tied to the SQLContext that was used to create this SchemaRDD.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.registerTempTable("test")
>>> srdd2 = sqlCtx.sql("select * from test")
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True

insertInto(self, tableName, overwrite=False)

source code 

Inserts the contents of this SchemaRDD into the specified table.

Optionally overwriting any existing data.

count(self)

source code 

Return the number of elements in this RDD.

Unlike the base RDD implementation of count, this implementation leverages the query optimizer to compute the count on the SchemaRDD, which supports features such as filter pushdown.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.count()
3L
>>> srdd.count() == srdd.map(lambda x: x).count()
True
Overrides: rdd.RDD.count

collect(self)

source code 

Return a list that contains all of the rows in this RDD.

Each object in the list is on Row, the fields can be accessed as attributes.

Overrides: rdd.RDD.collect

mapPartitionsWithIndex(self, f, preservesPartitioning=False)

source code 

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
6
Overrides: rdd.RDD.mapPartitionsWithIndex

cache(self)

source code 

Persist this RDD with the default storage level (MEMORY_ONLY_SER).

Overrides: rdd.RDD.cache
(inherited documentation)

persist(self, storageLevel)

source code 

Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.

Overrides: rdd.RDD.persist
(inherited documentation)

unpersist(self, blocking=True)

source code 

Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

Overrides: rdd.RDD.unpersist
(inherited documentation)

checkpoint(self)

source code 

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Overrides: rdd.RDD.checkpoint
(inherited documentation)

isCheckpointed(self)

source code 

Return whether this RDD has been checkpointed or not

Overrides: rdd.RDD.isCheckpointed
(inherited documentation)

getCheckpointFile(self)

source code 

Gets the name of the file to which this RDD was checkpointed

Overrides: rdd.RDD.getCheckpointFile
(inherited documentation)

coalesce(self, numPartitions, shuffle=False)

source code 

Return a new RDD that is reduced into `numPartitions` partitions.

>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
[[1], [2, 3], [4, 5]]
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
Overrides: rdd.RDD.coalesce
(inherited documentation)

distinct(self)

source code 

Return a new RDD containing the distinct elements in this RDD.

>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
Overrides: rdd.RDD.distinct
(inherited documentation)

intersection(self, other)

source code 

Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

Note that this method performs a shuffle internally.

>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]
Overrides: rdd.RDD.intersection
(inherited documentation)

repartition(self, numPartitions)

source code 

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`, which can avoid performing a shuffle.

>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2
>>> len(rdd.repartition(10).glom().collect())
10
Overrides: rdd.RDD.repartition
(inherited documentation)

subtract(self, other, numPartitions=None)

source code 

Return each value in self that is not contained in other.

>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
Overrides: rdd.RDD.subtract
(inherited documentation)