public class ThreadUtils
extends Object
Constructor and Description |
---|
ThreadUtils() |
Modifier and Type | Method and Description |
---|---|
static <T> scala.concurrent.Awaitable |
awaitReady(scala.concurrent.Awaitable<T> awaitable,
scala.concurrent.duration.Duration atMost)
Preferred alternative to
Await.ready() . |
static <T> T |
awaitResult(scala.concurrent.Awaitable<T> awaitable,
scala.concurrent.duration.Duration atMost)
Preferred alternative to
Await.result() . |
static <T> T |
awaitResult(java.util.concurrent.Future<T> future,
scala.concurrent.duration.Duration atMost) |
static java.util.concurrent.ThreadFactory |
namedThreadFactory(String prefix)
Create a thread factory that names threads with a prefix and also sets the threads to daemon.
|
static java.util.concurrent.ThreadPoolExecutor |
newDaemonCachedThreadPool(String prefix)
Wrapper over newCachedThreadPool.
|
static java.util.concurrent.ThreadPoolExecutor |
newDaemonCachedThreadPool(String prefix,
int maxThreadNumber,
int keepAliveSeconds)
Create a cached thread pool whose max number of threads is
maxThreadNumber . |
static java.util.concurrent.ThreadPoolExecutor |
newDaemonFixedThreadPool(int nThreads,
String prefix)
Wrapper over newFixedThreadPool.
|
static java.util.concurrent.ThreadPoolExecutor |
newDaemonSingleThreadExecutor(String threadName)
Wrapper over newSingleThreadExecutor.
|
static java.util.concurrent.ThreadPoolExecutor |
newDaemonSingleThreadExecutorWithRejectedExecutionHandler(String threadName,
int taskQueueCapacity,
java.util.concurrent.RejectedExecutionHandler rejectedExecutionHandler)
Wrapper over newSingleThreadExecutor that allows the specification
of a RejectedExecutionHandler
|
static java.util.concurrent.ScheduledExecutorService |
newDaemonSingleThreadScheduledExecutor(String threadName)
Wrapper over ScheduledThreadPoolExecutor.
|
static java.util.concurrent.ScheduledExecutorService |
newDaemonThreadPoolScheduledExecutor(String threadNamePrefix,
int numThreads)
Wrapper over ScheduledThreadPoolExecutor.
|
static java.util.concurrent.ForkJoinPool |
newForkJoinPool(String prefix,
int maxThreadNumber)
Construct a new ForkJoinPool with a specified max parallelism and name prefix.
|
static <I,O> scala.collection.Seq<O> |
parmap(scala.collection.Seq<I> in,
String prefix,
int maxThreads,
scala.Function1<I,O> f)
Transforms input collection by applying the given function to each element in parallel fashion.
|
static <T> T |
runInNewThread(String threadName,
boolean isDaemon,
scala.Function0<T> body)
Run a piece of code in a new thread and return the result.
|
static scala.concurrent.ExecutionContextExecutor |
sameThread()
An
ExecutionContextExecutor that runs each task in the thread that invokes execute/submit . |
static java.util.concurrent.ExecutorService |
sameThreadExecutorService() |
static void |
shutdown(java.util.concurrent.ExecutorService executor,
scala.concurrent.duration.Duration gracePeriod) |
public static java.util.concurrent.ExecutorService sameThreadExecutorService()
public static scala.concurrent.ExecutionContextExecutor sameThread()
ExecutionContextExecutor
that runs each task in the thread that invokes execute/submit
.
The caller should make sure the tasks running in this ExecutionContextExecutor
are short and
never block.public static java.util.concurrent.ThreadFactory namedThreadFactory(String prefix)
prefix
- (undocumented)public static java.util.concurrent.ThreadPoolExecutor newDaemonCachedThreadPool(String prefix)
prefix
- (undocumented)public static java.util.concurrent.ThreadPoolExecutor newDaemonCachedThreadPool(String prefix, int maxThreadNumber, int keepAliveSeconds)
maxThreadNumber
. Thread names
are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.prefix
- (undocumented)maxThreadNumber
- (undocumented)keepAliveSeconds
- (undocumented)public static java.util.concurrent.ThreadPoolExecutor newDaemonFixedThreadPool(int nThreads, String prefix)
nThreads
- (undocumented)prefix
- (undocumented)public static java.util.concurrent.ThreadPoolExecutor newDaemonSingleThreadExecutor(String threadName)
threadName
- (undocumented)public static java.util.concurrent.ThreadPoolExecutor newDaemonSingleThreadExecutorWithRejectedExecutionHandler(String threadName, int taskQueueCapacity, java.util.concurrent.RejectedExecutionHandler rejectedExecutionHandler)
threadName
- (undocumented)taskQueueCapacity
- (undocumented)rejectedExecutionHandler
- (undocumented)public static java.util.concurrent.ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName)
threadName
- (undocumented)public static java.util.concurrent.ScheduledExecutorService newDaemonThreadPoolScheduledExecutor(String threadNamePrefix, int numThreads)
threadNamePrefix
- (undocumented)numThreads
- (undocumented)public static <T> T runInNewThread(String threadName, boolean isDaemon, scala.Function0<T> body)
SomeException: exception-message at CallerClass.body-method (sourcefile.scala) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at CallerClass.caller-method (sourcefile.scala) ...
threadName
- (undocumented)isDaemon
- (undocumented)body
- (undocumented)public static java.util.concurrent.ForkJoinPool newForkJoinPool(String prefix, int maxThreadNumber)
prefix
- (undocumented)maxThreadNumber
- (undocumented)public static <T> T awaitResult(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) throws SparkException
Await.result()
.
This method wraps and re-throws any exceptions thrown by the underlying Await
call, ensuring
that this thread's stack trace appears in logs.
In addition, it calls Awaitable.result
directly to avoid using ForkJoinPool
's
BlockingContext
. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
As concurrent executions in ForkJoinPool may see some ThreadLocal
value unexpectedly, this
method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
In general, we should use this method because many places in Spark use ThreadLocal
and it's
hard to debug when ThreadLocal
s leak to other tasks.
awaitable
- (undocumented)atMost
- (undocumented)SparkException
public static <T> T awaitResult(java.util.concurrent.Future<T> future, scala.concurrent.duration.Duration atMost) throws SparkException
SparkException
public static <T> scala.concurrent.Awaitable awaitReady(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) throws SparkException
Await.ready()
.
awaitable
- (undocumented)atMost
- (undocumented)SparkException
awaitResult
public static void shutdown(java.util.concurrent.ExecutorService executor, scala.concurrent.duration.Duration gracePeriod)
public static <I,O> scala.collection.Seq<O> parmap(scala.collection.Seq<I> in, String prefix, int maxThreads, scala.Function1<I,O> f)
Functions are guaranteed to be executed in freshly-created threads that inherit the calling thread's Spark thread-local variables. These threads also inherit the calling thread's active SparkSession.
in
- - the input collection which should be transformed in parallel.prefix
- - the prefix assigned to the underlying thread pool.maxThreads
- - maximum number of thread can be created during execution.f
- - the lambda function will be applied to each element of in
.in
by
applying the lambda function f
.