Package pyspark :: Module sql :: Class SQLContext
[frames] | no frames]

Class SQLContext

source code

Main entry point for SparkSQL functionality.

A SQLContext can be used create SchemaRDDs, register SchemaRDDs as tables, execute SQL over tables, cache tables, and read parquet files.

Instance Methods
 
__init__(self, sparkContext, sqlContext=None)
Create a new SQLContext.
source code
 
inferSchema(self, rdd)
Infer and apply a schema to an RDD of dicts.
source code
 
registerRDDAsTable(self, rdd, tableName)
Registers the given RDD as a temporary table in the catalog.
source code
 
parquetFile(self, path)
Loads a Parquet file, returning the result as a SchemaRDD.
source code
 
sql(self, sqlQuery)
Return a SchemaRDD representing the result of the given query.
source code
 
table(self, tableName)
Returns the specified table as a SchemaRDD.
source code
 
cacheTable(self, tableName)
Caches the specified table in-memory.
source code
 
uncacheTable(self, tableName)
Removes the specified table from the in-memory cache.
source code
Method Details

__init__(self, sparkContext, sqlContext=None)
(Constructor)

source code 

Create a new SQLContext.

Parameters:
  • sparkContext - The SparkContext to wrap.
    >>> srdd = sqlCtx.inferSchema(rdd)
    >>> sqlCtx.inferSchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    ValueError:...
    >>> bad_rdd = sc.parallelize([1,2,3])
    >>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    ValueError:...
    >>> allTypes = sc.parallelize([{"int" : 1, "string" : "string", "double" : 1.0, "long": 1L,
    ... "boolean" : True}])
    >>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long,
    ... x.boolean))
    >>> srdd.collect()[0]
    (1, u'string', 1.0, 1, True)

inferSchema(self, rdd)

source code 

Infer and apply a schema to an RDD of dicts.

We peek at the first row of the RDD to determine the fields names and types, and then use that to extract all the dictionaries.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
...                    {"field1" : 3, "field2": "row3"}]
True

registerRDDAsTable(self, rdd, tableName)

source code 

Registers the given RDD as a temporary table in the catalog.

Temporary tables exist only during the lifetime of this instance of SQLContext.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")

parquetFile(self, path)

source code 

Loads a Parquet file, returning the result as a SchemaRDD.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> srdd.collect() == srdd2.collect()
True

sql(self, sqlQuery)

source code 

Return a SchemaRDD representing the result of the given query.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"},
...                     {"f1" : 3, "f2": "row3"}]
True

table(self, tableName)

source code 

Returns the specified table as a SchemaRDD.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.table("table1")
>>> srdd.collect() == srdd2.collect()
True