Class ThreadUtils

Object
org.apache.spark.util.ThreadUtils

public class ThreadUtils extends Object
  • Constructor Details

    • ThreadUtils

      public ThreadUtils()
  • Method Details

    • sameThreadExecutorService

      public static ExecutorService sameThreadExecutorService()
    • sameThread

      public static scala.concurrent.ExecutionContextExecutor sameThread()
      An ExecutionContextExecutor that runs each task in the thread that invokes execute/submit. The caller should make sure the tasks running in this ExecutionContextExecutor are short and never block.
      Returns:
      (undocumented)
    • namedThreadFactory

      public static ThreadFactory namedThreadFactory(String prefix)
      Create a thread factory that names threads with a prefix and also sets the threads to daemon.
      Parameters:
      prefix - (undocumented)
      Returns:
      (undocumented)
    • newDaemonCachedThreadPool

      public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix)
      Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
      Parameters:
      prefix - (undocumented)
      Returns:
      (undocumented)
    • newDaemonCachedThreadPool

      public static ThreadPoolExecutor newDaemonCachedThreadPool(String prefix, int maxThreadNumber, int keepAliveSeconds)
      Create a cached thread pool whose max number of threads is maxThreadNumber. Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
      Parameters:
      prefix - (undocumented)
      maxThreadNumber - (undocumented)
      keepAliveSeconds - (undocumented)
      Returns:
      (undocumented)
    • newDaemonFixedThreadPool

      public static ThreadPoolExecutor newDaemonFixedThreadPool(int nThreads, String prefix)
      Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
      Parameters:
      nThreads - (undocumented)
      prefix - (undocumented)
      Returns:
      (undocumented)
    • newDaemonSingleThreadExecutor

      public static ThreadPoolExecutor newDaemonSingleThreadExecutor(String threadName)
      Wrapper over newFixedThreadPool with single daemon thread.
      Parameters:
      threadName - (undocumented)
      Returns:
      (undocumented)
    • newDaemonSingleThreadExecutorWithRejectedExecutionHandler

      public static ThreadPoolExecutor newDaemonSingleThreadExecutorWithRejectedExecutionHandler(String threadName, int taskQueueCapacity, RejectedExecutionHandler rejectedExecutionHandler)
      Wrapper over newSingleThreadExecutor that allows the specification of a RejectedExecutionHandler
      Parameters:
      threadName - (undocumented)
      taskQueueCapacity - (undocumented)
      rejectedExecutionHandler - (undocumented)
      Returns:
      (undocumented)
    • newDaemonSingleThreadScheduledExecutor

      public static ScheduledExecutorService newDaemonSingleThreadScheduledExecutor(String threadName)
      Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.
      Parameters:
      threadName - (undocumented)
      Returns:
      (undocumented)
    • newSingleThreadScheduledExecutor

      public static ScheduledThreadPoolExecutor newSingleThreadScheduledExecutor(String threadName)
      Wrapper over ScheduledThreadPoolExecutor the pool with non-daemon threads.
      Parameters:
      threadName - (undocumented)
      Returns:
      (undocumented)
    • newDaemonThreadPoolScheduledExecutor

      public static ScheduledExecutorService newDaemonThreadPoolScheduledExecutor(String threadNamePrefix, int numThreads)
      Wrapper over ScheduledThreadPoolExecutor.
      Parameters:
      threadNamePrefix - (undocumented)
      numThreads - (undocumented)
      Returns:
      (undocumented)
    • runInNewThread

      public static <T> T runInNewThread(String threadName, boolean isDaemon, scala.Function0<T> body)
      Run a piece of code in a new thread and return the result. Exception in the new thread is thrown in the caller thread with an adjusted stack trace that removes references to this method for clarity. The exception stack traces will be like the following:

      SomeException: exception-message at CallerClass.body-method (sourcefile.scala) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at CallerClass.caller-method (sourcefile.scala) ...

      Parameters:
      threadName - (undocumented)
      isDaemon - (undocumented)
      body - (undocumented)
      Returns:
      (undocumented)
    • wrapCallerStacktrace

      public static <T extends Throwable> T wrapCallerStacktrace(T realException, String combineMessage, int dropStacks)
      Adjust exception stack stace to wrap with caller side thread stack trace. The exception stack traces will be like the following:

      SomeException: exception-message at CallerClass.body-method (sourcefile.scala) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at CallerClass.caller-method (sourcefile.scala) ...

      Parameters:
      realException - (undocumented)
      combineMessage - (undocumented)
      dropStacks - (undocumented)
      Returns:
      (undocumented)
    • newForkJoinPool

      public static ForkJoinPool newForkJoinPool(String prefix, int maxThreadNumber)
      Construct a new ForkJoinPool with a specified max parallelism and name prefix.
      Parameters:
      prefix - (undocumented)
      maxThreadNumber - (undocumented)
      Returns:
      (undocumented)
    • awaitResult

      public static <T> T awaitResult(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) throws SparkException
      Preferred alternative to Await.result().

      This method wraps and re-throws any exceptions thrown by the underlying Await call, ensuring that this thread's stack trace appears in logs.

      In addition, it calls Awaitable.result directly to avoid using ForkJoinPool's BlockingContext. Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent executions in ForkJoinPool may see some ThreadLocal value unexpectedly, this method basically prevents ForkJoinPool from running other tasks in the current waiting thread. In general, we should use this method because many places in Spark use ThreadLocal and it's hard to debug when ThreadLocals leak to other tasks.

      Parameters:
      awaitable - (undocumented)
      atMost - (undocumented)
      Returns:
      (undocumented)
      Throws:
      SparkException
    • awaitResult

      public static <T> T awaitResult(Future<T> future, scala.concurrent.duration.Duration atMost) throws SparkException
      Throws:
      SparkException
    • awaitReady

      public static <T> scala.concurrent.Awaitable awaitReady(scala.concurrent.Awaitable<T> awaitable, scala.concurrent.duration.Duration atMost) throws SparkException
      Preferred alternative to Await.ready().

      Parameters:
      awaitable - (undocumented)
      atMost - (undocumented)
      Returns:
      (undocumented)
      Throws:
      SparkException
      See Also:
    • shutdown

      public static void shutdown(ExecutorService executor, scala.concurrent.duration.Duration gracePeriod)
    • parmap

      public static <I, O> scala.collection.immutable.Seq<O> parmap(scala.collection.immutable.Seq<I> in, String prefix, int maxThreads, scala.Function1<I,O> f)
      Transforms input collection by applying the given function to each element in parallel fashion. Comparing to the map() method of Scala parallel collections, this method can be interrupted at any time. This is useful on canceling of task execution, for example.

      Functions are guaranteed to be executed in freshly-created threads that inherit the calling thread's Spark thread-local variables. These threads also inherit the calling thread's active SparkSession.

      Parameters:
      in - - the input collection which should be transformed in parallel.
      prefix - - the prefix assigned to the underlying thread pool.
      maxThreads - - maximum number of thread can be created during execution.
      f - - the lambda function will be applied to each element of in.
      Returns:
      new collection in which each element was given from the input collection in by applying the lambda function f.