pyspark.sql.protobuf.functions.from_protobuf

pyspark.sql.protobuf.functions.from_protobuf(data: ColumnOrName, messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None) → pyspark.sql.column.Column[source]

Converts a binary column of Protobuf format into its corresponding catalyst value. The Protobuf definition is provided in one of these two ways:

  • Protobuf descriptor file: E.g. a descriptor file created with

    protoc –include_imports –descriptor_set_out=abc.desc abc.proto

  • Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.*. https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars.

New in version 3.4.0.

Parameters
dataColumn or str

the binary column.

messageName: str, optional

the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent.

descFilePathstr, optional

The protobuf descriptor file.

optionsdict, optional

options to control how the protobuf record is parsed.

Notes

Protobuf functionality is provided as an pluggable external module.

Examples

>>> import tempfile
>>> data = [("1", (2, "Alice", 109200))]
>>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
...    '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
...    '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
...    '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
...    '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
...    '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(
...             to_protobuf(df.value, message_name, desc_file_path).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df = proto_df.select(
...             from_protobuf(proto_df.value, message_name, desc_file_path).alias("value"))
...         proto_df.show(truncate=False)
+----------------------------------------+
|value                                   |
+----------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
+----------------------------------------+
+------------------+
|value             |
+------------------+
|{2, Alice, 109200}|
+------------------+
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value"))
>>> from_proto_df = to_proto_df.select(
...     from_protobuf(to_proto_df.value, message_class_name).alias("value"))
>>> from_proto_df.show(truncate=False)
+------------------+
|value             |
+------------------+
|{1668035962, 2020}|
+------------------+