Package pyspark :: Module sql :: Class SQLContext
[frames] | no frames]

Class SQLContext

source code

Main entry point for SparkSQL functionality.

A SQLContext can be used create SchemaRDDs, register SchemaRDDs as tables, execute SQL over tables, cache tables, and read parquet files.

Instance Methods
 
__init__(self, sparkContext, sqlContext=None)
Create a new SQLContext.
source code
 
registerFunction(self, name, f, returnType=StringType())
Registers a lambda function as a UDF so it can be used in SQL statements.
source code
 
inferSchema(self, rdd)
Infer and apply a schema to an RDD of Rows.
source code
 
applySchema(self, rdd, schema)
Applies the given schema to the given RDD of tuple or lists.
source code
 
registerRDDAsTable(self, rdd, tableName)
Registers the given RDD as a temporary table in the catalog.
source code
 
parquetFile(self, path)
Loads a Parquet file, returning the result as a SchemaRDD.
source code
 
jsonFile(self, path, schema=None)
Loads a text file storing one JSON object per line as a SchemaRDD.
source code
 
jsonRDD(self, rdd, schema=None)
Loads an RDD storing one JSON object per string as a SchemaRDD.
source code
 
sql(self, sqlQuery)
Return a SchemaRDD representing the result of the given query.
source code
 
table(self, tableName)
Returns the specified table as a SchemaRDD.
source code
 
cacheTable(self, tableName)
Caches the specified table in-memory.
source code
 
uncacheTable(self, tableName)
Removes the specified table from the in-memory cache.
source code
Method Details

__init__(self, sparkContext, sqlContext=None)
(Constructor)

source code 

Create a new SQLContext.

Parameters:
  • sparkContext - The SparkContext to wrap.
  • sqlContext - An optional JVM Scala SQLContext. If set, we do not instatiate a new SQLContext in the JVM, instead we make all calls to this object.
    >>> srdd = sqlCtx.inferSchema(rdd)
    >>> sqlCtx.inferSchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    TypeError:...
    >>> bad_rdd = sc.parallelize([1,2,3])
    >>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    ValueError:...
    >>> from datetime import datetime
    >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L,
    ...     b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
    ...     time=datetime(2014, 8, 1, 14, 1, 5))])
    >>> srdd = sqlCtx.inferSchema(allTypes)
    >>> srdd.registerTempTable("allTypes")
    >>> sqlCtx.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
    ...            'from allTypes where b and i > 0').collect()
    [Row(c0=2, c1=2.0, c2=False, c3=2, c4=0...8, 1, 14, 1, 5), a=1)]
    >>> srdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time,
    ...                     x.row.a, x.list)).collect()
    [(1, u'string', 1.0, 1, True, ...(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]

registerFunction(self, name, f, returnType=StringType())

source code 

Registers a lambda function as a UDF so it can be used in SQL statements.

In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type.

>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
>>> sqlCtx.registerFunction("twoArgs", lambda x, y: len(x) + y, IntegerType())
>>> sqlCtx.sql("SELECT twoArgs('test', 1)").collect()
[Row(c0=5)]

inferSchema(self, rdd)

source code 

Infer and apply a schema to an RDD of Rows.

We peek at the first row of the RDD to determine the fields' names and types. Nested collections are supported, which include array, dict, list, Row, tuple, namedtuple, or object.

All the rows in `rdd` should have the same type with the first one, or it will cause runtime exceptions.

Each row could be pyspark.sql.Row object or namedtuple or objects, using dict is deprecated.

>>> rdd = sc.parallelize(
...     [Row(field1=1, field2="row1"),
...      Row(field1=2, field2="row2"),
...      Row(field1=3, field2="row3")])
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect()[0]
Row(field1=1, field2=u'row1')
>>> NestedRow = Row("f1", "f2")
>>> nestedRdd1 = sc.parallelize([
...     NestedRow(array('i', [1, 2]), {"row1": 1.0}),
...     NestedRow(array('i', [2, 3]), {"row2": 2.0})])
>>> srdd = sqlCtx.inferSchema(nestedRdd1)
>>> srdd.collect()
[Row(f1=[1, 2], f2={u'row1': 1.0}), ..., f2={u'row2': 2.0})]
>>> nestedRdd2 = sc.parallelize([
...     NestedRow([[1, 2], [2, 3]], [1, 2]),
...     NestedRow([[2, 3], [3, 4]], [2, 3])])
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
>>> srdd.collect()
[Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])]

applySchema(self, rdd, schema)

source code 

Applies the given schema to the given RDD of tuple or lists.

These tuples or lists can contain complex nested structures like lists, maps or nested rows.

The schema should be a StructType.

It is important that the schema matches the types of the objects in each row or exceptions could be thrown at runtime.

>>> rdd2 = sc.parallelize([(1, "row1"), (2, "row2"), (3, "row3")])
>>> schema = StructType([StructField("field1", IntegerType(), False),
...     StructField("field2", StringType(), False)])
>>> srdd = sqlCtx.applySchema(rdd2, schema)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT * from table1")
>>> srdd2.collect()
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
>>> from datetime import datetime
>>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0,
...     datetime(2010, 1, 1, 1, 1, 1),
...     {"a": 1}, (2,), [1, 2, 3], None)])
>>> schema = StructType([
...     StructField("byte1", ByteType(), False),
...     StructField("byte2", ByteType(), False),
...     StructField("short1", ShortType(), False),
...     StructField("short2", ShortType(), False),
...     StructField("int", IntegerType(), False),
...     StructField("float", FloatType(), False),
...     StructField("time", TimestampType(), False),
...     StructField("map",
...         MapType(StringType(), IntegerType(), False), False),
...     StructField("struct",
...         StructType([StructField("b", ShortType(), False)]), False),
...     StructField("list", ArrayType(ByteType(), False), False),
...     StructField("null", DoubleType(), True)])
>>> srdd = sqlCtx.applySchema(rdd, schema)
>>> results = srdd.map(
...     lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int, x.float, x.time,
...         x.map["a"], x.struct.b, x.list, x.null))
>>> results.collect()[0]
(127, -128, -32768, 32767, 2147483647, 1.0, ...(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
>>> srdd.registerTempTable("table2")
>>> sqlCtx.sql(
...   "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " +
...     "short1 + 1 AS short1, short2 - 1 AS short2, int - 1 AS int, " +
...     "float + 1.5 as float FROM table2").collect()
[Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int=2147483646, float=2.5)]
>>> rdd = sc.parallelize([(127, -32768, 1.0,
...     datetime(2010, 1, 1, 1, 1, 1),
...     {"a": 1}, (2,), [1, 2, 3])])
>>> abstract = "byte short float time map{} struct(b) list[]"
>>> schema = _parse_schema_abstract(abstract)
>>> typedSchema = _infer_schema_type(rdd.first(), schema)
>>> srdd = sqlCtx.applySchema(rdd, typedSchema)
>>> srdd.collect()
[Row(byte=127, short=-32768, float=1.0, time=..., list=[1, 2, 3])]

registerRDDAsTable(self, rdd, tableName)

source code 

Registers the given RDD as a temporary table in the catalog.

Temporary tables exist only during the lifetime of this instance of SQLContext.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")

parquetFile(self, path)

source code 

Loads a Parquet file, returning the result as a SchemaRDD.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True

jsonFile(self, path, schema=None)

source code 

Loads a text file storing one JSON object per line as a SchemaRDD.

If the schema is provided, applies the given schema to this JSON dataset.

Otherwise, it goes through the entire dataset once to determine the schema.

>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> ofn = open(jsonFile, 'w')
>>> for json in jsonStrings:
...   print>>ofn, json
>>> ofn.close()
>>> srdd1 = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(srdd1, "table1")
>>> srdd2 = sqlCtx.sql(
...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
...   "field6 as f4 from table1")
>>> for r in srdd2.collect():
...     print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> srdd3 = sqlCtx.jsonFile(jsonFile, srdd1.schema())
>>> sqlCtx.registerRDDAsTable(srdd3, "table2")
>>> srdd4 = sqlCtx.sql(
...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
...   "field6 as f4 from table2")
>>> for r in srdd4.collect():
...    print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> schema = StructType([
...     StructField("field2", StringType(), True),
...     StructField("field3",
...         StructType([
...             StructField("field5",
...                 ArrayType(IntegerType(), False), True)]), False)])
>>> srdd5 = sqlCtx.jsonFile(jsonFile, schema)
>>> sqlCtx.registerRDDAsTable(srdd5, "table3")
>>> srdd6 = sqlCtx.sql(
...   "SELECT field2 AS f1, field3.field5 as f2, "
...   "field3.field5[0] as f3 from table3")
>>> srdd6.collect()
[Row(f1=u'row1', f2=None, f3=None)...Row(f1=u'row3', f2=[], f3=None)]

jsonRDD(self, rdd, schema=None)

source code 

Loads an RDD storing one JSON object per string as a SchemaRDD.

If the schema is provided, applies the given schema to this JSON dataset.

Otherwise, it goes through the entire dataset once to determine the schema.

>>> srdd1 = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd1, "table1")
>>> srdd2 = sqlCtx.sql(
...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
...   "field6 as f4 from table1")
>>> for r in srdd2.collect():
...     print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> srdd3 = sqlCtx.jsonRDD(json, srdd1.schema())
>>> sqlCtx.registerRDDAsTable(srdd3, "table2")
>>> srdd4 = sqlCtx.sql(
...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
...   "field6 as f4 from table2")
>>> for r in srdd4.collect():
...     print r
Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> schema = StructType([
...     StructField("field2", StringType(), True),
...     StructField("field3",
...         StructType([
...             StructField("field5",
...                 ArrayType(IntegerType(), False), True)]), False)])
>>> srdd5 = sqlCtx.jsonRDD(json, schema)
>>> sqlCtx.registerRDDAsTable(srdd5, "table3")
>>> srdd6 = sqlCtx.sql(
...   "SELECT field2 AS f1, field3.field5 as f2, "
...   "field3.field5[0] as f3 from table3")
>>> srdd6.collect()
[Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)]
>>> sqlCtx.jsonRDD(sc.parallelize(['{}',
...         '{"key0": {"key1": "value1"}}'])).collect()
[Row(key0=None), Row(key0=Row(key1=u'value1'))]
>>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}',
...         '{"key0": {"key1": "value1"}}'])).collect()
[Row(key0=None), Row(key0=Row(key1=u'value1'))]

sql(self, sqlQuery)

source code 

Return a SchemaRDD representing the result of the given query.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> srdd2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]

table(self, tableName)

source code 

Returns the specified table as a SchemaRDD.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.table("table1")
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True