Package pyspark :: Module sql :: Class SchemaRDD
[frames] | no frames]

Class SchemaRDD

source code

object --+    
         |    
   rdd.RDD --+
             |
            SchemaRDD

An RDD of Row objects that has an associated schema.

The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL.

For normal pyspark.rdd.RDD operations (map, count, etc.) the SchemaRDD is not operated on directly, as it's underlying implementation is an RDD composed of Java objects. Instead it is converted to a PythonRDD in the JVM, on which Python operations can be done.

Instance Methods
 
__init__(self, jschema_rdd, sql_ctx)
x.__init__(...) initializes x; see help(type(x)) for signature
source code
 
saveAsParquetFile(self, path)
Save the contents as a Parquet file, preserving the schema.
source code
 
registerAsTable(self, name)
Registers this RDD as a temporary table using the given name.
source code
 
insertInto(self, tableName, overwrite=False)
Inserts the contents of this SchemaRDD into the specified table.
source code
 
saveAsTable(self, tableName)
Creates a new table with the contents of this SchemaRDD.
source code
 
schemaString(self)
Returns the output schema in the tree format.
source code
 
printSchema(self)
Prints out the schema in the tree format.
source code
 
count(self)
Return the number of elements in this RDD.
source code
 
cache(self)
Persist this RDD with the default storage level (MEMORY_ONLY).
source code
 
persist(self, storageLevel)
Set this RDD's storage level to persist its values across operations after the first time it is computed.
source code
 
unpersist(self)
Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
source code
 
checkpoint(self)
Mark this RDD for checkpointing.
source code
 
isCheckpointed(self)
Return whether this RDD has been checkpointed or not
source code
 
getCheckpointFile(self)
Gets the name of the file to which this RDD was checkpointed
source code
 
coalesce(self, numPartitions, shuffle=False)
Return a new RDD that is reduced into `numPartitions` partitions.
source code
 
distinct(self)
Return a new RDD containing the distinct elements in this RDD.
source code
 
intersection(self, other)
Return the intersection of this RDD and another one.
source code
 
repartition(self, numPartitions)
Return a new RDD that has exactly numPartitions partitions.
source code
 
subtract(self, other, numPartitions=None)
Return each value in self that is not contained in other.
source code

Inherited from rdd.RDD: __add__, __repr__, aggregate, cartesian, cogroup, collect, collectAsMap, combineByKey, context, countByKey, countByValue, filter, first, flatMap, flatMapValues, fold, foldByKey, foreach, foreachPartition, getStorageLevel, glom, groupBy, groupByKey, groupWith, id, join, keyBy, keys, leftOuterJoin, map, mapPartitions, mapPartitionsWithIndex, mapPartitionsWithSplit, mapValues, max, mean, min, name, partitionBy, pipe, reduce, reduceByKey, reduceByKeyLocally, rightOuterJoin, sample, sampleStdev, sampleVariance, saveAsTextFile, setName, sortByKey, stats, stdev, subtractByKey, sum, take, takeOrdered, takeSample, toDebugString, top, union, values, variance, zip

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __setattr__, __sizeof__, __str__, __subclasshook__

Properties

Inherited from object: __class__

Method Details

__init__(self, jschema_rdd, sql_ctx)
(Constructor)

source code 

x.__init__(...) initializes x; see help(type(x)) for signature

Overrides: object.__init__
(inherited documentation)

saveAsParquetFile(self, path)

source code 

Save the contents as a Parquet file, preserving the schema.

Files that are written out using this method can be read back in as a SchemaRDD using the SQLContext.parquetFile method.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(srdd2.collect()) == sorted(srdd.collect())
True

registerAsTable(self, name)

source code 

Registers this RDD as a temporary table using the given name.

The lifetime of this temporary table is tied to the SQLContext that was used to create this SchemaRDD.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.registerAsTable("test")
>>> srdd2 = sqlCtx.sql("select * from test")
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True

insertInto(self, tableName, overwrite=False)

source code 

Inserts the contents of this SchemaRDD into the specified table.

Optionally overwriting any existing data.

count(self)

source code 

Return the number of elements in this RDD.

Unlike the base RDD implementation of count, this implementation leverages the query optimizer to compute the count on the SchemaRDD, which supports features such as filter pushdown.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.count()
3L
>>> srdd.count() == srdd.map(lambda x: x).count()
True
Overrides: rdd.RDD.count

cache(self)

source code 

Persist this RDD with the default storage level (MEMORY_ONLY).

Overrides: rdd.RDD.cache
(inherited documentation)

persist(self, storageLevel)

source code 

Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.

Overrides: rdd.RDD.persist
(inherited documentation)

unpersist(self)

source code 

Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

Overrides: rdd.RDD.unpersist
(inherited documentation)

checkpoint(self)

source code 

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Overrides: rdd.RDD.checkpoint
(inherited documentation)

isCheckpointed(self)

source code 

Return whether this RDD has been checkpointed or not

Overrides: rdd.RDD.isCheckpointed
(inherited documentation)

getCheckpointFile(self)

source code 

Gets the name of the file to which this RDD was checkpointed

Overrides: rdd.RDD.getCheckpointFile
(inherited documentation)

coalesce(self, numPartitions, shuffle=False)

source code 

Return a new RDD that is reduced into `numPartitions` partitions. >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() [[1], [2, 3], [4, 5]] >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]]

Overrides: rdd.RDD.coalesce
(inherited documentation)

distinct(self)

source code 

Return a new RDD containing the distinct elements in this RDD.

>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
Overrides: rdd.RDD.distinct
(inherited documentation)

intersection(self, other)

source code 

Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

Note that this method performs a shuffle internally.

>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]
Overrides: rdd.RDD.intersection
(inherited documentation)

repartition(self, numPartitions)

source code 

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`, which can avoid performing a shuffle. >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) >>> sorted(rdd.glom().collect()) [[1], [2, 3], [4, 5], [6, 7]] >>> len(rdd.repartition(2).glom().collect()) 2 >>> len(rdd.repartition(10).glom().collect()) 10

Overrides: rdd.RDD.repartition
(inherited documentation)

subtract(self, other, numPartitions=None)

source code 

Return each value in self that is not contained in other.

>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
Overrides: rdd.RDD.subtract
(inherited documentation)