pyspark.sql.protobuf.functions.to_protobuf

pyspark.sql.protobuf.functions.to_protobuf(data: ColumnOrName, messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None) → pyspark.sql.column.Column[source]

Converts a column into binary of protobuf format. The Protobuf definition is provided in one of these two ways:

  • Protobuf descriptor file: E.g. a descriptor file created with

    protoc –include_imports –descriptor_set_out=abc.desc abc.proto

  • Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.*. https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars.

New in version 3.4.0.

Parameters
dataColumn or str

the data column.

messageName: str, optional

the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent.

descFilePathstr, optional

the Protobuf descriptor file.

optionsdict, optional

Notes

Protobuf functionality is provided as a pluggable external module

Examples

>>> import tempfile
>>> data = [([(2, "Alice", 13093020)])]
>>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
...    '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
...    '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
...    '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
...    '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
...    '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(
...             to_protobuf(df.value, message_name, desc_file_path).alias("suite"))
...         proto_df.show(truncate=False)
+-------------------------------------------+
|suite                                      |
+-------------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+-------------------------------------------+
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> proto_df = df.select(to_protobuf(df.value, message_class_name).alias("suite"))
>>> proto_df.show(truncate=False)
+----------------------------+
|suite                       |
+----------------------------+
|[08 FA EA B0 9B 06 10 E4 0F]|
+----------------------------+