Constructor and Description |
---|
Utils() |
Modifier and Type | Method and Description |
---|---|
static String |
bytesToString(long size)
Convert a quantity in bytes to a human-readable string such as "4.0 MB".
|
static void |
checkHost(String host,
String message) |
static void |
checkHostPort(String hostPort,
String message) |
static boolean |
chmod700(java.io.File file)
JDK equivalent of
chmod 700 file . |
static Class<?> |
classForName(String className)
Preferred alternative to Class.forName(className)
|
static boolean |
classIsLoadable(String clazz)
Determines whether the provided class is loadable in the current thread.
|
static <T> T |
clone(T value,
SerializerInstance serializer,
scala.reflect.ClassTag<T> evidence$2)
Clone an object using a Spark serializer.
|
static void |
configTestLog4j(String level)
config a log4j properties used for testsuite
|
static java.net.URI |
constructURIForAuthentication(java.net.URI uri,
SecurityManager securityMgr)
Construct a URI container information used for authentication.
|
static long |
copyStream(java.io.InputStream in,
java.io.OutputStream out,
boolean closeStreams,
boolean transferToEnabled)
Copy all data from an InputStream to an OutputStream.
|
static java.io.File |
createDirectory(String root,
String namePrefix)
Create a directory inside the given parent directory.
|
static java.io.File |
createTempDir(String root,
String namePrefix)
Create a temporary directory inside the given parent directory.
|
static void |
deleteRecursively(java.io.File file)
Delete a file or directory and its contents recursively.
|
static void |
deleteRecursively(tachyon.client.TachyonFile dir,
tachyon.client.TachyonFS client)
Delete a file or directory and its contents recursively.
|
static <T> T |
deserialize(byte[] bytes)
Deserialize an object using Java serialization
|
static <T> T |
deserialize(byte[] bytes,
ClassLoader loader)
Deserialize an object using Java serialization and the given ClassLoader
|
static long |
deserializeLongValue(byte[] bytes)
Deserialize a Long value (used for
PythonPartitioner ) |
static void |
deserializeViaNestedStream(java.io.InputStream is,
SerializerInstance ser,
scala.Function1<DeserializationStream,scala.runtime.BoxedUnit> f)
Deserialize via nested stream using specific serializer
|
static boolean |
doesDirectoryContainAnyNewFiles(java.io.File dir,
long cutoff)
Determines if a directory contains any files newer than cutoff seconds.
|
static org.json4s.JsonAST.JObject |
emptyJson()
Return an empty JSON object
|
static String |
exceptionString(Throwable e)
Return a nice string representation of the exception.
|
static String |
executeAndGetOutput(scala.collection.Seq<String> command,
java.io.File workingDir,
scala.collection.Map<String,String> extraEnvironment,
boolean redirectStderr)
Execute a command and get its output, throwing an exception if it yields a code other than 0.
|
static Process |
executeCommand(scala.collection.Seq<String> command,
java.io.File workingDir,
scala.collection.Map<String,String> extraEnvironment,
boolean redirectStderr)
Execute a command and return the process running the command.
|
static scala.Tuple2<String,Object> |
extractHostPortFromSparkUrl(String sparkUrl)
Return a pair of host and port extracted from the
sparkUrl . |
static void |
fetchFile(String url,
java.io.File targetDir,
SparkConf conf,
SecurityManager securityMgr,
org.apache.hadoop.conf.Configuration hadoopConf,
long timestamp,
boolean useCache)
Download a file or directory to target directory.
|
static void |
fetchHcfsFile(org.apache.hadoop.fs.Path path,
java.io.File targetDir,
org.apache.hadoop.fs.FileSystem fs,
SparkConf conf,
org.apache.hadoop.conf.Configuration hadoopConf,
boolean fileOverwrite,
scala.Option<String> filename)
Fetch a file or directory from a Hadoop-compatible filesystem.
|
static String |
formatWindowsPath(String path)
Format a Windows path such that it can be safely passed to a URI.
|
static String |
getAddressHostName(String address) |
static CallSite |
getCallSite(scala.Function1<String,Object> skipClass)
When called inside a class in the spark package, returns the name of the user code class
(outside the spark package) that called into Spark, as well as which Spark method they called.
|
static ClassLoader |
getContextOrSparkClassLoader()
Get the Context ClassLoader on this thread or, if not present, the ClassLoader that
loaded Spark.
|
static String |
getCurrentUserName()
Returns the current user name.
|
static String |
getDefaultPropertiesFile(scala.collection.Map<String,String> env)
Return the path of the default Spark properties file.
|
static org.apache.hadoop.fs.Path |
getFilePath(java.io.File dir,
String fileName)
Return the absolute path of a file in the given directory.
|
static String |
getFormattedClassName(Object obj)
Return the class name of the given object, removing all dollar signs
|
static org.apache.hadoop.fs.FileSystem |
getHadoopFileSystem(String path,
org.apache.hadoop.conf.Configuration conf)
Return a Hadoop FileSystem with the scheme encoded in the given path.
|
static org.apache.hadoop.fs.FileSystem |
getHadoopFileSystem(java.net.URI path,
org.apache.hadoop.conf.Configuration conf)
Return a Hadoop FileSystem with the scheme encoded in the given path.
|
static <T> long |
getIteratorSize(scala.collection.Iterator<T> iterator)
Counts the number of elements of an iterator using a while loop rather than calling
TraversableOnce.size() because it uses a for loop, which is slightly slower
in the current version of Scala. |
static String |
getLocalDir(SparkConf conf)
Get the path of a temporary directory.
|
static long |
getMaxResultSize(SparkConf conf) |
static String[] |
getOrCreateLocalRootDirs(SparkConf conf)
Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
and returns only the directories that exist / could be created.
|
static scala.collection.Map<String,String> |
getPropertiesFromFile(String filename)
Load properties present in the given file.
|
static ClassLoader |
getSparkClassLoader()
Get the ClassLoader which loaded Spark.
|
static String |
getSparkOrYarnConfig(SparkConf conf,
String key,
String default_)
Return the value of a config either through the SparkConf or the Hadoop configuration
if this is Yarn mode.
|
static scala.Option<String> |
getStderr(Process process,
long timeoutMs)
Return the stderr of a process after waiting for the process to terminate.
|
static scala.collection.Map<String,String> |
getSystemProperties()
Returns the system properties map that is thread-safe to iterator over.
|
static ThreadStackTrace[] |
getThreadDump()
Return a thread dump of all threads' stacktraces.
|
static String |
getUsedTimeMs(long startTimeMs)
Return the string to tell how long has passed in milliseconds.
|
static boolean |
hasRootAsShutdownDeleteDir(java.io.File file) |
static boolean |
hasRootAsShutdownDeleteDir(tachyon.client.TachyonFile file) |
static boolean |
hasShutdownDeleteDir(java.io.File file) |
static boolean |
hasShutdownDeleteTachyonDir(tachyon.client.TachyonFile file) |
static boolean |
inShutdown()
Detect whether this thread might be executing a shutdown hook.
|
static Object |
invoke(Class<?> clazz,
Object obj,
String methodName,
scala.collection.Seq<scala.Tuple2<Class<?>,Object>> args) |
static boolean |
isBindCollision(Throwable exception)
Return whether the exception is caused by an address-port collision when binding.
|
static boolean |
isFatalError(Throwable e)
Returns true if the given exception was fatal.
|
static boolean |
isMac()
Whether the underlying operating system is Mac OS X.
|
static boolean |
isRunningInYarnContainer(SparkConf conf) |
static boolean |
isSymlink(java.io.File file)
Check to see if file is a symbolic link.
|
static boolean |
isTesting()
Indicates whether Spark is currently running unit tests.
|
static boolean |
isWindows()
Whether the underlying operating system is Windows.
|
static scala.Option<org.json4s.JsonAST.JValue> |
jsonOption(org.json4s.JsonAST.JValue json)
Return an option that translates JNothing to None
|
static String |
libraryPathEnvName()
Return the current system LD_LIBRARY_PATH name
|
static String |
libraryPathEnvPrefix(scala.collection.Seq<String> libraryPaths)
Return the prefix of a command that appends the given library paths to the
system-specific library path environment variable.
|
static String |
loadDefaultSparkProperties(SparkConf conf,
String filePath)
Load default Spark properties from the given file.
|
static String |
localHostName()
Get the local machine's hostname.
|
static String |
localIpAddress()
Get the local host's IP address in dotted-quad format (e.g.
|
static String |
localIpAddressHostname() |
static <T> T |
logUncaughtExceptions(scala.Function0<T> f)
Execute the given block, logging and re-throwing any uncaught exception.
|
static String |
megabytesToString(long megabytes)
Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
|
static int |
memoryStringToMb(String str)
Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
|
static String |
msDurationToString(long ms)
Returns a human-readable string representing a duration such as "35ms"
|
static java.util.concurrent.ThreadFactory |
namedThreadFactory(String prefix)
Create a thread factory that names threads with a prefix and also sets the threads to daemon.
|
static java.util.concurrent.ThreadPoolExecutor |
newDaemonCachedThreadPool(String prefix)
Wrapper over newCachedThreadPool.
|
static java.util.concurrent.ThreadPoolExecutor |
newDaemonFixedThreadPool(int nThreads,
String prefix)
Wrapper over newFixedThreadPool.
|
static String[] |
nonLocalPaths(String paths,
boolean testWindows)
Return all non-local paths from a comma-separated list of paths.
|
static int |
nonNegativeHash(Object obj) |
static int |
nonNegativeMod(int x,
int mod) |
static String |
offsetBytes(scala.collection.Seq<java.io.File> files,
long start,
long end)
Return a string containing data across a set of files.
|
static String |
offsetBytes(String path,
long start,
long end)
Return a string containing part of a file from byte 'start' to 'end'.
|
static scala.Tuple2<String,Object> |
parseHostPort(String hostPort) |
static int |
portMaxRetries(SparkConf conf)
Maximum number of retries when binding to a port before giving up.
|
static Thread |
processStreamByLine(String threadName,
java.io.InputStream inputStream,
scala.Function1<String,scala.runtime.BoxedUnit> processLine)
Return and start a daemon thread that processes the content of the input stream line by line.
|
static java.util.Random |
random() |
static <T> scala.collection.Seq<T> |
randomize(scala.collection.TraversableOnce<T> seq,
scala.reflect.ClassTag<T> evidence$1)
Shuffle the elements of a collection into a random order, returning the
result in a new collection.
|
static <T> Object |
randomizeInPlace(Object arr,
java.util.Random rand)
Shuffle the elements of an array into a random order, modifying the
original array.
|
static void |
registerShutdownDeleteDir(java.io.File file) |
static void |
registerShutdownDeleteDir(tachyon.client.TachyonFile tachyonfile) |
static java.net.URI |
resolveURI(String path,
boolean testWindows)
Return a well-formed URI for the file described by a user input string.
|
static String |
resolveURIs(String paths,
boolean testWindows)
Resolve a comma-separated list of paths.
|
static <T> byte[] |
serialize(T o)
Serialize an object using Java serialization
|
static void |
serializeViaNestedStream(java.io.OutputStream os,
SerializerInstance ser,
scala.Function1<SerializationStream,scala.runtime.BoxedUnit> f)
Serialize via nested stream using specific serializer
|
static void |
setCustomHostname(String hostname)
Allow setting a custom host name because when we run on Mesos we need to use the same
hostname it reports to the master.
|
static java.net.URLConnection |
setupSecureURLConnection(java.net.URLConnection urlConnection,
SecurityManager sm)
If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and
the host verifier from the given security manager.
|
static scala.collection.Seq<String> |
sparkJavaOpts(SparkConf conf,
scala.Function1<String,Object> filterKey)
Convert all spark properties set in the given SparkConf to a sequence of java options.
|
static scala.collection.Seq<String> |
splitCommandString(String s)
Split a string of potentially quoted arguments from the command line the way that a shell
would do it to determine arguments to a command.
|
static <T> scala.Tuple2<T,Object> |
startServiceOnPort(int startPort,
scala.Function1<Object,scala.Tuple2<T,Object>> startService,
SparkConf conf,
String serviceName)
Attempt to start a service on the given port, or fail after a number of attempts.
|
static String |
stripDirectory(String path)
Strip the directory from a path name
|
static void |
symlink(java.io.File src,
java.io.File dst)
Creates a symlink.
|
static long |
timeIt(int numIters,
scala.Function0<scala.runtime.BoxedUnit> f,
scala.Option<scala.Function0<scala.runtime.BoxedUnit>> prepare)
Timing method based on iterations that permit JVM JIT optimization.
|
static void |
times(int numIters,
scala.Function0<scala.runtime.BoxedUnit> f)
Method executed for repeating a task for side effects.
|
static <T> scala.util.Try<T> |
tryLog(scala.Function0<T> f)
Executes the given block in a Try, logging any uncaught exceptions.
|
static void |
tryOrExit(scala.Function0<scala.runtime.BoxedUnit> block)
Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
default UncaughtExceptionHandler
|
static void |
tryOrIOException(scala.Function0<scala.runtime.BoxedUnit> block)
Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
exceptions as IOException.
|
static <T> T |
tryOrIOException(scala.Function0<T> block)
Execute a block of code that returns a value, re-throwing any non-fatal uncaught
exceptions as IOException.
|
static boolean |
waitForProcess(Process process,
long timeoutMs)
Wait for a process to terminate for at most the specified duration.
|
static scala.util.matching.Regex |
windowsDrive()
Pattern for matching a Windows drive, which contains only a single alphabet character.
|
static void |
writeByteBuffer(java.nio.ByteBuffer bb,
java.io.ObjectOutput out)
Primitive often used when writing
ByteBuffer to DataOutput |
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
initializeIfNecessary, initializeLogging, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
public static java.util.Random random()
public static <T> byte[] serialize(T o)
public static <T> T deserialize(byte[] bytes)
public static <T> T deserialize(byte[] bytes, ClassLoader loader)
public static long deserializeLongValue(byte[] bytes)
PythonPartitioner
)public static void serializeViaNestedStream(java.io.OutputStream os, SerializerInstance ser, scala.Function1<SerializationStream,scala.runtime.BoxedUnit> f)
public static void deserializeViaNestedStream(java.io.InputStream is, SerializerInstance ser, scala.Function1<DeserializationStream,scala.runtime.BoxedUnit> f)
public static ClassLoader getSparkClassLoader()
public static ClassLoader getContextOrSparkClassLoader()
This should be used whenever passing a ClassLoader to Class.ForName or finding the currently active loader when setting up ClassLoader delegation chains.
public static boolean classIsLoadable(String clazz)
public static Class<?> classForName(String className)
public static void writeByteBuffer(java.nio.ByteBuffer bb, java.io.ObjectOutput out)
ByteBuffer
to DataOutput
public static void registerShutdownDeleteDir(java.io.File file)
public static void registerShutdownDeleteDir(tachyon.client.TachyonFile tachyonfile)
public static boolean hasShutdownDeleteDir(java.io.File file)
public static boolean hasShutdownDeleteTachyonDir(tachyon.client.TachyonFile file)
public static boolean hasRootAsShutdownDeleteDir(java.io.File file)
public static boolean hasRootAsShutdownDeleteDir(tachyon.client.TachyonFile file)
public static boolean chmod700(java.io.File file)
chmod 700 file
.
file
- the file whose permissions will be modifiedpublic static java.io.File createDirectory(String root, String namePrefix)
public static java.io.File createTempDir(String root, String namePrefix)
public static long copyStream(java.io.InputStream in, java.io.OutputStream out, boolean closeStreams, boolean transferToEnabled)
public static java.net.URI constructURIForAuthentication(java.net.URI uri, SecurityManager securityMgr)
Note this relies on the Authenticator.setDefault being set properly to decode the user name and password. This is currently set in the SecurityManager.
public static void fetchFile(String url, java.io.File targetDir, SparkConf conf, SecurityManager securityMgr, org.apache.hadoop.conf.Configuration hadoopConf, long timestamp, boolean useCache)
If useCache
is true, first attempts to fetch the file to a local cache that's shared
across executors running the same application. useCache
is used mainly for
the executors, and not in local mode.
Throws SparkException if the target file already exists and has different contents than the requested file.
public static void fetchHcfsFile(org.apache.hadoop.fs.Path path, java.io.File targetDir, org.apache.hadoop.fs.FileSystem fs, SparkConf conf, org.apache.hadoop.conf.Configuration hadoopConf, boolean fileOverwrite, scala.Option<String> filename)
Visible for testing
public static String getLocalDir(SparkConf conf)
- If called from inside of a YARN container, this will return a directory chosen by YARN. - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it. - Otherwise, if the spark.local.dir is set, this will return a directory from it. - Otherwise, this will return java.io.tmpdir.
Some of these configuration options might be lists of multiple paths, but this method will always return a single directory.
public static boolean isRunningInYarnContainer(SparkConf conf)
public static String[] getOrCreateLocalRootDirs(SparkConf conf)
If no directories could be created, this will return an empty list.
public static <T> scala.collection.Seq<T> randomize(scala.collection.TraversableOnce<T> seq, scala.reflect.ClassTag<T> evidence$1)
public static <T> Object randomizeInPlace(Object arr, java.util.Random rand)
public static String localIpAddress()
public static String localIpAddressHostname()
public static void setCustomHostname(String hostname)
public static String localHostName()
public static String getAddressHostName(String address)
public static void checkHost(String host, String message)
public static void checkHostPort(String hostPort, String message)
public static scala.Tuple2<String,Object> parseHostPort(String hostPort)
public static java.util.concurrent.ThreadFactory namedThreadFactory(String prefix)
public static java.util.concurrent.ThreadPoolExecutor newDaemonCachedThreadPool(String prefix)
public static java.util.concurrent.ThreadPoolExecutor newDaemonFixedThreadPool(int nThreads, String prefix)
public static String getUsedTimeMs(long startTimeMs)
public static void deleteRecursively(java.io.File file)
public static void deleteRecursively(tachyon.client.TachyonFile dir, tachyon.client.TachyonFS client)
public static boolean isSymlink(java.io.File file)
public static boolean doesDirectoryContainAnyNewFiles(java.io.File dir, long cutoff)
dir
- must be the path to a directory, or IllegalArgumentException is throwncutoff
- measured in seconds. Returns true if there are any files or directories in the
given directory whose last modified time is later than this many seconds agopublic static int memoryStringToMb(String str)
public static String bytesToString(long size)
public static String msDurationToString(long ms)
public static String megabytesToString(long megabytes)
public static Process executeCommand(scala.collection.Seq<String> command, java.io.File workingDir, scala.collection.Map<String,String> extraEnvironment, boolean redirectStderr)
public static String executeAndGetOutput(scala.collection.Seq<String> command, java.io.File workingDir, scala.collection.Map<String,String> extraEnvironment, boolean redirectStderr)
public static Thread processStreamByLine(String threadName, java.io.InputStream inputStream, scala.Function1<String,scala.runtime.BoxedUnit> processLine)
public static void tryOrExit(scala.Function0<scala.runtime.BoxedUnit> block)
public static void tryOrIOException(scala.Function0<scala.runtime.BoxedUnit> block)
public static <T> T tryOrIOException(scala.Function0<T> block)
public static CallSite getCallSite(scala.Function1<String,Object> skipClass)
skipClass
- Function that is used to exclude non-user-code classes.public static String offsetBytes(String path, long start, long end)
public static String offsetBytes(scala.collection.Seq<java.io.File> files, long start, long end)
startIndex
and endIndex
is based on the cumulative size of all the files take in
the given order. See figure below for more details.public static <T> T clone(T value, SerializerInstance serializer, scala.reflect.ClassTag<T> evidence$2)
public static boolean inShutdown()
Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing an IllegalStateException.
public static scala.collection.Seq<String> splitCommandString(String s)
public static int nonNegativeMod(int x, int mod)
public static int nonNegativeHash(Object obj)
public static scala.collection.Map<String,String> getSystemProperties()
public static void times(int numIters, scala.Function0<scala.runtime.BoxedUnit> f)
public static long timeIt(int numIters, scala.Function0<scala.runtime.BoxedUnit> f, scala.Option<scala.Function0<scala.runtime.BoxedUnit>> prepare)
numIters
- number of iterationsf
- function to be executed. If prepare is not None, the running time of each call to f
must be an order of magnitude longer than one millisecond for accurate timing.prepare
- function to be executed before each call to f. Its running time doesn't count.public static <T> long getIteratorSize(scala.collection.Iterator<T> iterator)
TraversableOnce.size()
because it uses a for loop, which is slightly slower
in the current version of Scala.public static void symlink(java.io.File src, java.io.File dst)
src
- absolute path to the sourcedst
- relative path for the destinationpublic static String getFormattedClassName(Object obj)
public static scala.Option<org.json4s.JsonAST.JValue> jsonOption(org.json4s.JsonAST.JValue json)
public static org.json4s.JsonAST.JObject emptyJson()
public static org.apache.hadoop.fs.FileSystem getHadoopFileSystem(java.net.URI path, org.apache.hadoop.conf.Configuration conf)
public static org.apache.hadoop.fs.FileSystem getHadoopFileSystem(String path, org.apache.hadoop.conf.Configuration conf)
public static org.apache.hadoop.fs.Path getFilePath(java.io.File dir, String fileName)
public static boolean isWindows()
public static boolean isMac()
public static scala.util.matching.Regex windowsDrive()
public static String formatWindowsPath(String path)
public static boolean isTesting()
public static String stripDirectory(String path)
public static boolean waitForProcess(Process process, long timeoutMs)
public static scala.Option<String> getStderr(Process process, long timeoutMs)
public static <T> T logUncaughtExceptions(scala.Function0<T> f)
public static <T> scala.util.Try<T> tryLog(scala.Function0<T> f)
public static boolean isFatalError(Throwable e)
public static java.net.URI resolveURI(String path, boolean testWindows)
If the supplied path does not contain a scheme, or is a relative path, it will be converted into an absolute path with a file:// scheme.
public static String resolveURIs(String paths, boolean testWindows)
public static String[] nonLocalPaths(String paths, boolean testWindows)
public static String loadDefaultSparkProperties(SparkConf conf, String filePath)
public static scala.collection.Map<String,String> getPropertiesFromFile(String filename)
public static String getDefaultPropertiesFile(scala.collection.Map<String,String> env)
public static String exceptionString(Throwable e)
public static ThreadStackTrace[] getThreadDump()
public static scala.collection.Seq<String> sparkJavaOpts(SparkConf conf, scala.Function1<String,Object> filterKey)
public static int portMaxRetries(SparkConf conf)
public static <T> scala.Tuple2<T,Object> startServiceOnPort(int startPort, scala.Function1<Object,scala.Tuple2<T,Object>> startService, SparkConf conf, String serviceName)
startPort
- The initial port to start the service on.startService
- Function to start service on a given port.
This is expected to throw java.net.BindException on port collision.conf
- A SparkConf used to get the maximum number of retries when binding to a port.serviceName
- Name of the service.public static boolean isBindCollision(Throwable exception)
public static void configTestLog4j(String level)
public static java.net.URLConnection setupSecureURLConnection(java.net.URLConnection urlConnection, SecurityManager sm)
public static Object invoke(Class<?> clazz, Object obj, String methodName, scala.collection.Seq<scala.Tuple2<Class<?>,Object>> args)
public static long getMaxResultSize(SparkConf conf)
public static String libraryPathEnvName()
public static String libraryPathEnvPrefix(scala.collection.Seq<String> libraryPaths)
public static String getSparkOrYarnConfig(SparkConf conf, String key, String default_)
public static scala.Tuple2<String,Object> extractHostPortFromSparkUrl(String sparkUrl)
sparkUrl
.
A spark url (spark://host:port
) is a special URI that its scheme is spark
and only contains
host and port.
SparkException
- if sparkUrl
is invalid.public static String getCurrentUserName()
SPARK_USER
environment variable.