Spark Streaming Custom Receivers

A “Spark Streaming” receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.

This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.

Writing a Simple Receiver

This starts with implementing NetworkReceiver.

The following is a simple socket text-stream receiver.

       class SocketTextStreamReceiver(host: String, port: Int(
         extends NetworkReceiver[String]
       {
         protected lazy val blocksGenerator: BlockGenerator =
           new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)

         protected def onStart() = {
           blocksGenerator.start()
           val socket = new Socket(host, port)
           val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
           var data: String = dataInputStream.readLine()
           while (data != null) {
             blocksGenerator += data
             data = dataInputStream.readLine()
           }
         }

         protected def onStop() {
           blocksGenerator.stop()
         }
       }

All we did here is extended NetworkReceiver and called blockGenerator’s API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.

An Actor as Receiver

This starts with implementing Actor

Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka’s socket.io api.

       class SocketTextStreamReceiver (host:String,
         port:Int,
         bytesToString: ByteString => String) extends Actor with Receiver {

          override def preStart = IOManager(context.system).connect(host, port)

          def receive = {
           case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
         }

       }

All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.

A Sample Spark Application

    val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
      Seconds(batchDuration))
    val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
      "localhost", 8445))
    val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
      "localhost",8445, z => z.utf8String)),"SocketReceiver")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()
    ssc.start()

Multiple Homogeneous/Heterogeneous Receivers.

A DStream union operation is provided for taking union on multiple input streams.

    val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
      "localhost",8445, z => z.utf8String)),"SocketReceiver")

    // Another socket stream receiver
    val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
      "localhost",8446, z => z.utf8String)),"SocketReceiver")

    val union = lines.union(lines2)

Above stream can be easily process as described earlier.

A more comprehensive example is provided in the spark streaming examples

References

1.Akka Actor documentation 2.NetworkReceiver