public abstract class Receiver<T>
extends Object
implements scala.Serializable
onStart()
and onStop()
. onStart()
should define the setup steps necessary to start receiving data,
and onStop()
should define the cleanup steps necessary to stop receiving data.
Exceptions while receiving can be handled either by restarting the receiver with restart(...)
or stopped completely by stop(...)
.
A custom receiver in Scala would look like this.
class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
def onStart() {
// Setup stuff (start threads, open sockets, etc.) to start receiving data.
// Must start new thread to receive data, as onStart() must be non-blocking.
// Call store(...) in those threads to store received data into Spark's memory.
// Call stop(...), restart(...) or reportError(...) on any thread based on how
// different errors need to be handled.
// See corresponding method documentation for more details
}
def onStop() {
// Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
}
}
A custom receiver in Java would look like this.
class MyReceiver extends Receiver<String> {
public MyReceiver(StorageLevel storageLevel) {
super(storageLevel);
}
public void onStart() {
// Setup stuff (start threads, open sockets, etc.) to start receiving data.
// Must start new thread to receive data, as onStart() must be non-blocking.
// Call store(...) in those threads to store received data into Spark's memory.
// Call stop(...), restart(...) or reportError(...) on any thread based on how
// different errors need to be handled.
// See corresponding method documentation for more details
}
public void onStop() {
// Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
}
}
Constructor and Description |
---|
Receiver(StorageLevel storageLevel) |
Modifier and Type | Method and Description |
---|---|
boolean |
isStarted()
Check if the receiver has started or not.
|
boolean |
isStopped()
Check if receiver has been marked for stopping.
|
abstract void |
onStart()
This method is called by the system when the receiver is started.
|
abstract void |
onStop()
This method is called by the system when the receiver is stopped.
|
scala.Option<String> |
preferredLocation()
Override this to specify a preferred location (hostname).
|
void |
reportError(String message,
Throwable throwable)
Report exceptions in receiving data.
|
void |
restart(String message)
Restart the receiver.
|
void |
restart(String message,
Throwable error)
Restart the receiver.
|
void |
restart(String message,
Throwable error,
int millisecond)
Restart the receiver.
|
void |
stop(String message)
Stop the receiver completely.
|
void |
stop(String message,
Throwable error)
Stop the receiver completely due to an exception
|
StorageLevel |
storageLevel() |
void |
store(scala.collection.mutable.ArrayBuffer<T> dataBuffer)
Store an ArrayBuffer of received data as a data block into Spark's memory.
|
void |
store(scala.collection.mutable.ArrayBuffer<T> dataBuffer,
Object metadata)
Store an ArrayBuffer of received data as a data block into Spark's memory.
|
void |
store(java.nio.ByteBuffer bytes)
Store the bytes of received data as a data block into Spark's memory.
|
void |
store(java.nio.ByteBuffer bytes,
Object metadata)
Store the bytes of received data as a data block into Spark's memory.
|
void |
store(scala.collection.Iterator<T> dataIterator)
Store an iterator of received data as a data block into Spark's memory.
|
void |
store(java.util.Iterator<T> dataIterator)
Store an iterator of received data as a data block into Spark's memory.
|
void |
store(java.util.Iterator<T> dataIterator,
Object metadata)
Store an iterator of received data as a data block into Spark's memory.
|
void |
store(scala.collection.Iterator<T> dataIterator,
Object metadata)
Store an iterator of received data as a data block into Spark's memory.
|
void |
store(T dataItem)
Store a single item of received data to Spark's memory.
|
int |
streamId()
Get the unique identifier the receiver input stream that this
receiver is associated with.
|
public Receiver(StorageLevel storageLevel)
public boolean isStarted()
public boolean isStopped()
public abstract void onStart()
store(data)
.
If there are errors in threads started here, then following options can be done
(i) reportError(...)
can be called to report the error to the driver.
The receiving of data will continue uninterrupted.
(ii) stop(...)
can be called to stop receiving data. This will call onStop()
to
clear up all resources allocated (threads, buffers, etc.) during onStart()
.
(iii) restart(...)
can be called to restart the receiver. This will call onStop()
immediately, and then onStart()
after a delay.
public abstract void onStop()
onStart()
must be cleaned up in this method.public scala.Option<String> preferredLocation()
public void reportError(String message, Throwable throwable)
public void restart(String message)
onStop()
and onStart()
) is performed asynchronously
in a background thread. The delay between the stopping and the starting
is defined by the Spark configuration spark.streaming.receiverRestartDelay
.
The message
will be reported to the driver.message
- (undocumented)public void restart(String message, Throwable error)
onStop()
and onStart()
) is performed asynchronously
in a background thread. The delay between the stopping and the starting
is defined by the Spark configuration spark.streaming.receiverRestartDelay
.
The message
and exception
will be reported to the driver.message
- (undocumented)error
- (undocumented)public void restart(String message, Throwable error, int millisecond)
onStop()
and onStart()
) is performed asynchronously
in a background thread.message
- (undocumented)error
- (undocumented)millisecond
- (undocumented)public void stop(String message)
public void stop(String message, Throwable error)
public StorageLevel storageLevel()
public void store(T dataItem)
dataItem
- (undocumented)public void store(scala.collection.mutable.ArrayBuffer<T> dataBuffer)
public void store(scala.collection.mutable.ArrayBuffer<T> dataBuffer, Object metadata)
dataBuffer
- (undocumented)metadata
- (undocumented)public void store(scala.collection.Iterator<T> dataIterator)
public void store(java.util.Iterator<T> dataIterator, Object metadata)
dataIterator
- (undocumented)metadata
- (undocumented)public void store(java.util.Iterator<T> dataIterator)
public void store(scala.collection.Iterator<T> dataIterator, Object metadata)
dataIterator
- (undocumented)metadata
- (undocumented)public void store(java.nio.ByteBuffer bytes)
bytes
- (undocumented)public void store(java.nio.ByteBuffer bytes, Object metadata)
bytes
- (undocumented)metadata
- (undocumented)public int streamId()