Class ThreadUtils
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T> scala.concurrent.Awaitable
awaitReady
(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) Preferred alternative toAwait.ready()
.static <T> T
awaitResult
(Future<T> future, scala.concurrent.duration.Duration atMost) static <T> T
awaitResult
(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) Preferred alternative toAwait.result()
.static ThreadFactory
namedThreadFactory
(String prefix) Create a thread factory that names threads with a prefix and also sets the threads to daemon.static ThreadPoolExecutor
newDaemonCachedThreadPool
(String prefix) Wrapper over newCachedThreadPool.static ThreadPoolExecutor
newDaemonCachedThreadPool
(String prefix, int maxThreadNumber, int keepAliveSeconds) Create a cached thread pool whose max number of threads ismaxThreadNumber
.static ThreadPoolExecutor
newDaemonFixedThreadPool
(int nThreads, String prefix) Wrapper over newFixedThreadPool.static ThreadPoolExecutor
newDaemonSingleThreadExecutor
(String threadName) Wrapper over newFixedThreadPool with single daemon thread.static ThreadPoolExecutor
newDaemonSingleThreadExecutorWithRejectedExecutionHandler
(String threadName, int taskQueueCapacity, RejectedExecutionHandler rejectedExecutionHandler) Wrapper over newSingleThreadExecutor that allows the specification of a RejectedExecutionHandlerstatic ScheduledExecutorService
newDaemonSingleThreadScheduledExecutor
(String threadName) Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.static ScheduledExecutorService
newDaemonThreadPoolScheduledExecutor
(String threadNamePrefix, int numThreads) Wrapper over ScheduledThreadPoolExecutor.static ForkJoinPool
newForkJoinPool
(String prefix, int maxThreadNumber) Construct a new ForkJoinPool with a specified max parallelism and name prefix.static ScheduledThreadPoolExecutor
newSingleThreadScheduledExecutor
(String threadName) Wrapper over ScheduledThreadPoolExecutor the pool with non-daemon threads.static <I,
O> scala.collection.immutable.Seq<O> Transforms input collection by applying the given function to each element in parallel fashion.static <T> T
runInNewThread
(String threadName, boolean isDaemon, scala.Function0<T> body) Run a piece of code in a new thread and return the result.static scala.concurrent.ExecutionContextExecutor
AnExecutionContextExecutor
that runs each task in the thread that invokesexecute/submit
.static ExecutorService
static void
shutdown
(ExecutorService executor, scala.concurrent.duration.Duration gracePeriod) static <T extends Throwable>
TwrapCallerStacktrace
(T realException, String combineMessage, int dropStacks) Adjust exception stack stace to wrap with caller side thread stack trace.
-
Constructor Details
-
ThreadUtils
public ThreadUtils()
-
-
Method Details
-
sameThreadExecutorService
-
sameThread
public static scala.concurrent.ExecutionContextExecutor sameThread()AnExecutionContextExecutor
that runs each task in the thread that invokesexecute/submit
. The caller should make sure the tasks running in thisExecutionContextExecutor
are short and never block.- Returns:
- (undocumented)
-
namedThreadFactory
Create a thread factory that names threads with a prefix and also sets the threads to daemon.- Parameters:
prefix
- (undocumented)- Returns:
- (undocumented)
-
newDaemonCachedThreadPool
Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.- Parameters:
prefix
- (undocumented)- Returns:
- (undocumented)
-
newDaemonCachedThreadPool
public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix, int maxThreadNumber, int keepAliveSeconds) Create a cached thread pool whose max number of threads ismaxThreadNumber
. Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.- Parameters:
prefix
- (undocumented)maxThreadNumber
- (undocumented)keepAliveSeconds
- (undocumented)- Returns:
- (undocumented)
-
newDaemonFixedThreadPool
Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.- Parameters:
nThreads
- (undocumented)prefix
- (undocumented)- Returns:
- (undocumented)
-
newDaemonSingleThreadExecutor
Wrapper over newFixedThreadPool with single daemon thread.- Parameters:
threadName
- (undocumented)- Returns:
- (undocumented)
-
newDaemonSingleThreadExecutorWithRejectedExecutionHandler
public static ThreadPoolExecutor newDaemonSingleThreadExecutorWithRejectedExecutionHandler(String threadName, int taskQueueCapacity, RejectedExecutionHandler rejectedExecutionHandler) Wrapper over newSingleThreadExecutor that allows the specification of a RejectedExecutionHandler- Parameters:
threadName
- (undocumented)taskQueueCapacity
- (undocumented)rejectedExecutionHandler
- (undocumented)- Returns:
- (undocumented)
-
newDaemonSingleThreadScheduledExecutor
Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.- Parameters:
threadName
- (undocumented)- Returns:
- (undocumented)
-
newSingleThreadScheduledExecutor
Wrapper over ScheduledThreadPoolExecutor the pool with non-daemon threads.- Parameters:
threadName
- (undocumented)- Returns:
- (undocumented)
-
newDaemonThreadPoolScheduledExecutor
public static ScheduledExecutorService newDaemonThreadPoolScheduledExecutor(String threadNamePrefix, int numThreads) Wrapper over ScheduledThreadPoolExecutor.- Parameters:
threadNamePrefix
- (undocumented)numThreads
- (undocumented)- Returns:
- (undocumented)
-
runInNewThread
Run a piece of code in a new thread and return the result. Exception in the new thread is thrown in the caller thread with an adjusted stack trace that removes references to this method for clarity. The exception stack traces will be like the following:SomeException: exception-message at CallerClass.body-method (sourcefile.scala) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at CallerClass.caller-method (sourcefile.scala) ...
- Parameters:
threadName
- (undocumented)isDaemon
- (undocumented)body
- (undocumented)- Returns:
- (undocumented)
-
wrapCallerStacktrace
public static <T extends Throwable> T wrapCallerStacktrace(T realException, String combineMessage, int dropStacks) Adjust exception stack stace to wrap with caller side thread stack trace. The exception stack traces will be like the following:SomeException: exception-message at CallerClass.body-method (sourcefile.scala) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at CallerClass.caller-method (sourcefile.scala) ...
- Parameters:
realException
- (undocumented)combineMessage
- (undocumented)dropStacks
- (undocumented)- Returns:
- (undocumented)
-
newForkJoinPool
Construct a new ForkJoinPool with a specified max parallelism and name prefix.- Parameters:
prefix
- (undocumented)maxThreadNumber
- (undocumented)- Returns:
- (undocumented)
-
awaitResult
public static <T> T awaitResult(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) throws SparkException Preferred alternative toAwait.result()
.This method wraps and re-throws any exceptions thrown by the underlying
Await
call, ensuring that this thread's stack trace appears in logs.In addition, it calls
Awaitable.result
directly to avoid usingForkJoinPool
'sBlockingContext
. Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent executions in ForkJoinPool may see someThreadLocal
value unexpectedly, this method basically prevents ForkJoinPool from running other tasks in the current waiting thread. In general, we should use this method because many places in Spark useThreadLocal
and it's hard to debug whenThreadLocal
s leak to other tasks.- Parameters:
awaitable
- (undocumented)atMost
- (undocumented)- Returns:
- (undocumented)
- Throws:
SparkException
-
awaitResult
public static <T> T awaitResult(Future<T> future, scala.concurrent.duration.Duration atMost) throws SparkException - Throws:
SparkException
-
awaitReady
public static <T> scala.concurrent.Awaitable awaitReady(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) throws SparkException Preferred alternative toAwait.ready()
.- Parameters:
awaitable
- (undocumented)atMost
- (undocumented)- Returns:
- (undocumented)
- Throws:
SparkException
- See Also:
-
shutdown
public static void shutdown(ExecutorService executor, scala.concurrent.duration.Duration gracePeriod) -
parmap
public static <I,O> scala.collection.immutable.Seq<O> parmap(scala.collection.immutable.Seq<I> in, String prefix, int maxThreads, scala.Function1<I, O> f) Transforms input collection by applying the given function to each element in parallel fashion. Comparing to the map() method of Scala parallel collections, this method can be interrupted at any time. This is useful on canceling of task execution, for example.Functions are guaranteed to be executed in freshly-created threads that inherit the calling thread's Spark thread-local variables. These threads also inherit the calling thread's active SparkSession.
- Parameters:
in
- - the input collection which should be transformed in parallel.prefix
- - the prefix assigned to the underlying thread pool.maxThreads
- - maximum number of thread can be created during execution.f
- - the lambda function will be applied to each element ofin
.- Returns:
- new collection in which each element was given from the input collection
in
by applying the lambda functionf
.
-