pyspark.sql.protobuf.functions.to_protobuf

pyspark.sql.protobuf.functions.to_protobuf(data: ColumnOrName, messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None, binaryDescriptorSet: Optional[bytes] = None) → pyspark.sql.column.Column[source]

Converts a column into binary of protobuf format. The Protobuf definition is provided in one of these ways:

  • Protobuf descriptor file: E.g. a descriptor file created with

    protoc –include_imports –descriptor_set_out=abc.desc abc.proto

  • Protobuf descriptor as binary: Rather than file path as in previous option, we can provide the binary content of the file. This allows flexibility in how the descriptor set is created and fetched.

  • Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.*. https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars.

New in version 3.4.0.

Changed in version 3.5.0: Supports binaryDescriptorSet arg to pass binary descriptor directly. Supports Spark Connect.

Parameters
dataColumn or str

the data column.

messageName: str, optional

the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent.

descFilePathstr, optional

the Protobuf descriptor file.

optionsdict, optional
binaryDescriptorSet: bytes, optional

The Protobuf FileDescriptorSet serialized as binary.

Notes

Protobuf functionality is provided as a pluggable external module

Examples

>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
...    '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
...    '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
...    '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
...    '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
...    '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select( # With file name for descriptor
...             to_protobuf(df.value, message_name, desc_file_path).alias("suite"))
...         proto_df.show(truncate=False)
...         proto_df_2 = df.select( # With binary for descriptor
...             to_protobuf(df.value, message_name,
...                         binaryDescriptorSet=bytearray.fromhex(desc_hex))
...             .alias("suite"))
...         proto_df_2.show(truncate=False)
+-------------------------------------------+
|suite                                      |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
+-------------------------------------------+
|suite                                      |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> proto_df = df.select(to_protobuf(df.value, message_class_name).alias("suite"))
>>> proto_df.show(truncate=False)
+----------------------------+
|suite                       |
+----------------------------+
|[08 FA EA B0 9B 06 10 E4 0F]|
+----------------------------+