Class ALS
- All Implemented Interfaces:
Serializable
,org.apache.spark.internal.Logging
,Params
,HasBlockSize
,HasCheckpointInterval
,HasMaxIter
,HasPredictionCol
,HasRegParam
,HasSeed
,ALSModelParams
,ALSParams
,DefaultParamsWritable
,Identifiable
,MLWritable
,scala.Serializable
ALS attempts to estimate the ratings matrix R
as the product of two lower-rank matrices,
X
and Y
, i.e. X * Yt = R
. Typically these approximations are called 'factor' matrices.
The general approach is iterative. During each iteration, one of the factor matrices is held
constant, while the other is solved for using least squares. The newly-solved factor matrix is
then held constant while solving for the other factor matrix.
This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the "out-links" of each user (which blocks of products it will contribute to) and "in-link" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.
For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at https://doi.org/10.1109/ICDM.2008.22, adapted for the blocked approach used here.
Essentially instead of finding the low-rank approximations to the rating matrix R
,
this finds the approximations for a preference matrix P
where the elements of P
are 1 if
r is greater than 0 and 0 if r is less than or equal to 0. The ratings then act as 'confidence'
values related to strength of indicated user
preferences rather than explicit ratings given to items.
Note: the input rating dataset to the ALS implementation should be deterministic.
Nondeterministic data can cause failure during fitting ALS model.
For example, an order-sensitive operation like sampling after a repartition makes dataset
output nondeterministic, like dataset.repartition(2).sample(false, 0.5, 1618)
.
Checkpointing sampled dataset or adding a sort before sampling can help make the dataset
deterministic.
- See Also:
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
static interface
Trait for least squares solvers applied to the normal equation.static class
Rating class for better code readability.static class
static class
Nested classes/interfaces inherited from interface org.apache.spark.internal.Logging
org.apache.spark.internal.Logging.SparkShellLoggingFilter
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionalpha()
Param for the alpha parameter in the implicit preference formulation (nonnegative).final IntParam
Param for block size for stacking input data in matrices.final IntParam
Param for set checkpoint interval (>= 1) or disable checkpoint (-1).Param for strategy for dealing with unknown or new users/items at prediction time.Creates a copy of this instance with the same UID and some extra params.Param for StorageLevel for ALS model factors.Fits a model to the input data.Param to decide whether to use implicit preference.Param for StorageLevel for intermediate datasets.itemCol()
Param for the column name for item ids.static ALS
final IntParam
maxIter()
Param for maximum number of iterations (>= 0).Param for whether to apply nonnegativity constraints.Param for number of item blocks (positive).Param for number of user blocks (positive).static org.slf4j.Logger
static void
org$apache$spark$internal$Logging$$log__$eq
(org.slf4j.Logger x$1) Param for prediction column name.rank()
Param for rank of the matrix factorization (positive).Param for the column name for ratings.static MLReader<T>
read()
final DoubleParam
regParam()
Param for regularization parameter (>= 0).final LongParam
seed()
Param for random seed.setAlpha
(double value) setBlockSize
(int value) Set block size for stacking input data in matrices.setCheckpointInterval
(int value) setColdStartStrategy
(String value) setFinalStorageLevel
(String value) setImplicitPrefs
(boolean value) setItemCol
(String value) setMaxIter
(int value) setNonnegative
(boolean value) setNumBlocks
(int value) Sets both numUserBlocks and numItemBlocks to the specific value.setNumItemBlocks
(int value) setNumUserBlocks
(int value) setPredictionCol
(String value) setRank
(int value) setRatingCol
(String value) setRegParam
(double value) setSeed
(long value) setUserCol
(String value) train
(RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord) Implementation of the ALS algorithm.transformSchema
(StructType schema) Check transform validity and derive the output schema from the input schema.uid()
An immutable unique ID for the object and its derivatives.userCol()
Param for the column name for user ids.Methods inherited from class org.apache.spark.ml.PipelineStage
params
Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.spark.ml.recommendation.ALSModelParams
checkIntegers, getColdStartStrategy, getItemCol, getUserCol
Methods inherited from interface org.apache.spark.ml.recommendation.ALSParams
getAlpha, getFinalStorageLevel, getImplicitPrefs, getIntermediateStorageLevel, getNonnegative, getNumItemBlocks, getNumUserBlocks, getRank, getRatingCol, validateAndTransformSchema
Methods inherited from interface org.apache.spark.ml.util.DefaultParamsWritable
write
Methods inherited from interface org.apache.spark.ml.param.shared.HasBlockSize
getBlockSize
Methods inherited from interface org.apache.spark.ml.param.shared.HasCheckpointInterval
getCheckpointInterval
Methods inherited from interface org.apache.spark.ml.param.shared.HasMaxIter
getMaxIter
Methods inherited from interface org.apache.spark.ml.param.shared.HasPredictionCol
getPredictionCol
Methods inherited from interface org.apache.spark.ml.param.shared.HasRegParam
getRegParam
Methods inherited from interface org.apache.spark.ml.util.Identifiable
toString
Methods inherited from interface org.apache.spark.internal.Logging
initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq
Methods inherited from interface org.apache.spark.ml.util.MLWritable
save
Methods inherited from interface org.apache.spark.ml.param.Params
clear, copyValues, defaultCopy, defaultParamMap, explainParam, explainParams, extractParamMap, extractParamMap, get, getDefault, getOrDefault, getParam, hasDefault, hasParam, isDefined, isSet, onParamChange, paramMap, params, set, set, set, setDefault, setDefault, shouldOwn
-
Constructor Details
-
ALS
-
ALS
public ALS()
-
-
Method Details
-
load
-
train
public static <ID> scala.Tuple2<RDD<scala.Tuple2<ID,float[]>>, trainRDD<scala.Tuple2<ID, float[]>>> (RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord) Implementation of the ALS algorithm.This implementation of the ALS factorization algorithm partitions the two sets of factors among Spark workers so as to reduce network communication by only sending one copy of each factor vector to each Spark worker on each iteration, and only if needed. This is achieved by precomputing some information about the ratings matrix to determine which users require which item factors and vice versa. See the Scaladoc for
InBlock
for a detailed explanation of how the precomputation is done.In addition, since each iteration of calculating the factor matrices depends on the known ratings, which are spread across Spark partitions, a naive implementation would incur significant network communication overhead between Spark workers, as the ratings RDD would be repeatedly shuffled during each iteration. This implementation reduces that overhead by performing the shuffling operation up front, precomputing each partition's ratings dependencies and duplicating those values to the appropriate workers before starting iterations to solve for the factor matrices. See the Scaladoc for
OutBlock
for a detailed explanation of how the precomputation is done.Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by contiguous blocks from the ratings matrix but by a hash function on the rating's location in the matrix. If it helps you to visualize the partitions, it is easier to think of the term "block" as referring to a subset of an RDD containing the ratings rather than a contiguous submatrix of the ratings matrix.
- Parameters:
ratings
- (undocumented)rank
- (undocumented)numUserBlocks
- (undocumented)numItemBlocks
- (undocumented)maxIter
- (undocumented)regParam
- (undocumented)implicitPrefs
- (undocumented)alpha
- (undocumented)nonnegative
- (undocumented)intermediateRDDStorageLevel
- (undocumented)finalRDDStorageLevel
- (undocumented)checkpointInterval
- (undocumented)seed
- (undocumented)evidence$1
- (undocumented)ord
- (undocumented)- Returns:
- (undocumented)
-
read
-
org$apache$spark$internal$Logging$$log_
public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_() -
org$apache$spark$internal$Logging$$log__$eq
public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) -
rank
Description copied from interface:ALSParams
Param for rank of the matrix factorization (positive). Default: 10 -
numUserBlocks
Description copied from interface:ALSParams
Param for number of user blocks (positive). Default: 10- Specified by:
numUserBlocks
in interfaceALSParams
- Returns:
- (undocumented)
-
numItemBlocks
Description copied from interface:ALSParams
Param for number of item blocks (positive). Default: 10- Specified by:
numItemBlocks
in interfaceALSParams
- Returns:
- (undocumented)
-
implicitPrefs
Description copied from interface:ALSParams
Param to decide whether to use implicit preference. Default: false- Specified by:
implicitPrefs
in interfaceALSParams
- Returns:
- (undocumented)
-
alpha
Description copied from interface:ALSParams
Param for the alpha parameter in the implicit preference formulation (nonnegative). Default: 1.0 -
ratingCol
Description copied from interface:ALSParams
Param for the column name for ratings. Default: "rating" -
nonnegative
Description copied from interface:ALSParams
Param for whether to apply nonnegativity constraints. Default: false- Specified by:
nonnegative
in interfaceALSParams
- Returns:
- (undocumented)
-
intermediateStorageLevel
Description copied from interface:ALSParams
Param for StorageLevel for intermediate datasets. Pass in a string representation ofStorageLevel
. Cannot be "NONE". Default: "MEMORY_AND_DISK".- Specified by:
intermediateStorageLevel
in interfaceALSParams
- Returns:
- (undocumented)
-
finalStorageLevel
Description copied from interface:ALSParams
Param for StorageLevel for ALS model factors. Pass in a string representation ofStorageLevel
. Default: "MEMORY_AND_DISK".- Specified by:
finalStorageLevel
in interfaceALSParams
- Returns:
- (undocumented)
-
seed
Description copied from interface:HasSeed
Param for random seed. -
checkpointInterval
Description copied from interface:HasCheckpointInterval
Param for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.- Specified by:
checkpointInterval
in interfaceHasCheckpointInterval
- Returns:
- (undocumented)
-
regParam
Description copied from interface:HasRegParam
Param for regularization parameter (>= 0).- Specified by:
regParam
in interfaceHasRegParam
- Returns:
- (undocumented)
-
maxIter
Description copied from interface:HasMaxIter
Param for maximum number of iterations (>= 0).- Specified by:
maxIter
in interfaceHasMaxIter
- Returns:
- (undocumented)
-
userCol
Description copied from interface:ALSModelParams
Param for the column name for user ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "user"- Specified by:
userCol
in interfaceALSModelParams
- Returns:
- (undocumented)
-
itemCol
Description copied from interface:ALSModelParams
Param for the column name for item ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "item"- Specified by:
itemCol
in interfaceALSModelParams
- Returns:
- (undocumented)
-
coldStartStrategy
Description copied from interface:ALSModelParams
Param for strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: - "nan": predicted value for unknown ids will be NaN. - "drop": rows in the input DataFrame containing unknown ids will be dropped from the output DataFrame containing predictions. Default: "nan".- Specified by:
coldStartStrategy
in interfaceALSModelParams
- Returns:
- (undocumented)
-
blockSize
Description copied from interface:HasBlockSize
Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..- Specified by:
blockSize
in interfaceHasBlockSize
- Returns:
- (undocumented)
-
predictionCol
Description copied from interface:HasPredictionCol
Param for prediction column name.- Specified by:
predictionCol
in interfaceHasPredictionCol
- Returns:
- (undocumented)
-
uid
Description copied from interface:Identifiable
An immutable unique ID for the object and its derivatives.- Specified by:
uid
in interfaceIdentifiable
- Returns:
- (undocumented)
-
setRank
-
setNumUserBlocks
-
setNumItemBlocks
-
setImplicitPrefs
-
setAlpha
-
setUserCol
-
setItemCol
-
setRatingCol
-
setPredictionCol
-
setMaxIter
-
setRegParam
-
setNonnegative
-
setCheckpointInterval
-
setSeed
-
setIntermediateStorageLevel
-
setFinalStorageLevel
-
setColdStartStrategy
-
setBlockSize
Set block size for stacking input data in matrices. Default is 4096.- Parameters:
value
- (undocumented)- Returns:
- (undocumented)
-
setNumBlocks
Sets both numUserBlocks and numItemBlocks to the specific value.- Parameters:
value
- (undocumented)- Returns:
- (undocumented)
-
fit
Description copied from class:Estimator
Fits a model to the input data. -
transformSchema
Description copied from class:PipelineStage
Check transform validity and derive the output schema from the input schema.We check validity for interactions between parameters during
transformSchema
and raise an exception if any parameter value is invalid. Parameter value checks which do not depend on other parameters are handled byParam.validate()
.Typical implementation should first conduct verification on schema change and parameter validity, including complex parameter interaction checks.
- Specified by:
transformSchema
in classPipelineStage
- Parameters:
schema
- (undocumented)- Returns:
- (undocumented)
-
copy
Description copied from interface:Params
Creates a copy of this instance with the same UID and some extra params. Subclasses should implement this method and set the return type properly. SeedefaultCopy()
.
-