pyspark.sql.protobuf.functions.from_protobuf

pyspark.sql.protobuf.functions.from_protobuf(data: ColumnOrName, messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None, binaryDescriptorSet: Optional[bytes] = None) → pyspark.sql.column.Column[source]

Converts a binary column of Protobuf format into its corresponding catalyst value. The Protobuf definition is provided in one of these ways:

  • Protobuf descriptor file: E.g. a descriptor file created with

    protoc –include_imports –descriptor_set_out=abc.desc abc.proto

  • Protobuf descriptor as binary: Rather than file path as in previous option, we can provide the binary content of the file. This allows flexibility in how the descriptor set is created and fetched.

  • Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.*. https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars.

New in version 3.4.0.

Changed in version 3.5.0: Supports binaryDescriptorSet arg to pass binary descriptor directly. Supports Spark Connect.

Parameters
dataColumn or str

the binary column.

messageName: str, optional

the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent.

descFilePathstr, optional

The Protobuf descriptor file.

optionsdict, optional

options to control how the protobuf record is parsed.

binaryDescriptorSet: bytes, optional

The Protobuf FileDescriptorSet serialized as binary.

Notes

Protobuf functionality is provided as an pluggable external module.

Examples

>>> import tempfile
>>> data = [("1", (2, "Alice", 109200))]
>>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
...    '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
...    '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
...    '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
...    '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
...    '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(
...             to_protobuf(df.value, message_name, desc_file_path).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df_1 = proto_df.select( # With file name for descriptor
...             from_protobuf(proto_df.value, message_name, desc_file_path).alias("value"))
...         proto_df_1.show(truncate=False)
...         proto_df_2 = proto_df.select( # With binary for descriptor
...             from_protobuf(proto_df.value, message_name,
...                           binaryDescriptorSet = bytearray.fromhex(desc_hex))
...             .alias("value"))
...         proto_df_2.show(truncate=False)
+----------------------------------------+
|value                                   |
+----------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
+----------------------------------------+
+------------------+
|value             |
+------------------+
|{2, Alice, 109200}|
+------------------+
+------------------+
|value             |
+------------------+
|{2, Alice, 109200}|
+------------------+
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value"))
>>> from_proto_df = to_proto_df.select(
...     from_protobuf(to_proto_df.value, message_class_name).alias("value"))
>>> from_proto_df.show(truncate=False)
+------------------+
|value             |
+------------------+
|{1668035962, 2020}|
+------------------+