Class ALS
- All Implemented Interfaces:
- Serializable,- org.apache.spark.internal.Logging,- Params,- HasBlockSize,- HasCheckpointInterval,- HasMaxIter,- HasPredictionCol,- HasRegParam,- HasSeed,- ALSModelParams,- ALSParams,- DefaultParamsWritable,- Identifiable,- MLWritable
 ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices,
 X and Y, i.e. X * Yt = R. Typically these approximations are called 'factor' matrices.
 The general approach is iterative. During each iteration, one of the factor matrices is held
 constant, while the other is solved for using least squares. The newly-solved factor matrix is
 then held constant while solving for the other factor matrix.
 
This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the "out-links" of each user (which blocks of products it will contribute to) and "in-link" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.
For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at https://doi.org/10.1109/ICDM.2008.22, adapted for the blocked approach used here.
 Essentially instead of finding the low-rank approximations to the rating matrix R,
 this finds the approximations for a preference matrix P where the elements of P are 1 if
 r is greater than 0 and 0 if r is less than or equal to 0. The ratings then act as 'confidence'
 values related to strength of indicated user
 preferences rather than explicit ratings given to items.
 
 Note: the input rating dataset to the ALS implementation should be deterministic.
 Nondeterministic data can cause failure during fitting ALS model.
 For example, an order-sensitive operation like sampling after a repartition makes dataset
 output nondeterministic, like dataset.repartition(2).sample(false, 0.5, 1618).
 Checkpointing sampled dataset or adding a sort before sampling can help make the dataset
 deterministic.
- See Also:
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic classstatic interfaceTrait for least squares solvers applied to the normal equation.static classRating class for better code readability.static classstatic classNested classes/interfaces inherited from interface org.apache.spark.internal.Loggingorg.apache.spark.internal.Logging.LogStringContext, org.apache.spark.internal.Logging.SparkShellLoggingFilter
- 
Constructor SummaryConstructors
- 
Method SummaryModifier and TypeMethodDescriptionalpha()Param for the alpha parameter in the implicit preference formulation (nonnegative).final IntParamParam for block size for stacking input data in matrices.final IntParamParam for set checkpoint interval (>= 1) or disable checkpoint (-1).Param for strategy for dealing with unknown or new users/items at prediction time.Creates a copy of this instance with the same UID and some extra params.longestimateModelSize(Dataset<?> dataset) Param for StorageLevel for ALS model factors.Fits a model to the input data.Param to decide whether to use implicit preference.Param for StorageLevel for intermediate datasets.itemCol()Param for the column name for item ids.static ALSstatic org.apache.spark.internal.Logging.LogStringContextLogStringContext(scala.StringContext sc) final IntParammaxIter()Param for maximum number of iterations (>= 0).Param for whether to apply nonnegativity constraints.Param for number of item blocks (positive).Param for number of user blocks (positive).static org.slf4j.Loggerstatic voidorg$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) Param for prediction column name.rank()Param for rank of the matrix factorization (positive).Param for the column name for ratings.static MLReader<T>read()final DoubleParamregParam()Param for regularization parameter (>= 0).final LongParamseed()Param for random seed.setAlpha(double value) setBlockSize(int value) Set block size for stacking input data in matrices.setCheckpointInterval(int value) setColdStartStrategy(String value) setFinalStorageLevel(String value) setImplicitPrefs(boolean value) setItemCol(String value) setMaxIter(int value) setNonnegative(boolean value) setNumBlocks(int value) Sets both numUserBlocks and numItemBlocks to the specific value.setNumItemBlocks(int value) setNumUserBlocks(int value) setPredictionCol(String value) setRank(int value) setRatingCol(String value) setRegParam(double value) setSeed(long value) setUserCol(String value) train(RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord) Implementation of the ALS algorithm.transformSchema(StructType schema) Check transform validity and derive the output schema from the input schema.uid()An immutable unique ID for the object and its derivatives.userCol()Param for the column name for user ids.Methods inherited from class org.apache.spark.ml.PipelineStageparamsMethods inherited from class java.lang.Objectequals, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.apache.spark.ml.recommendation.ALSModelParamscheckIntegers, getColdStartStrategy, getItemCol, getUserColMethods inherited from interface org.apache.spark.ml.recommendation.ALSParamsgetAlpha, getFinalStorageLevel, getImplicitPrefs, getIntermediateStorageLevel, getNonnegative, getNumItemBlocks, getNumUserBlocks, getRank, getRatingCol, validateAndTransformSchemaMethods inherited from interface org.apache.spark.ml.util.DefaultParamsWritablewriteMethods inherited from interface org.apache.spark.ml.param.shared.HasBlockSizegetBlockSizeMethods inherited from interface org.apache.spark.ml.param.shared.HasCheckpointIntervalgetCheckpointIntervalMethods inherited from interface org.apache.spark.ml.param.shared.HasMaxItergetMaxIterMethods inherited from interface org.apache.spark.ml.param.shared.HasPredictionColgetPredictionColMethods inherited from interface org.apache.spark.ml.param.shared.HasRegParamgetRegParamMethods inherited from interface org.apache.spark.ml.util.IdentifiabletoStringMethods inherited from interface org.apache.spark.internal.LogginginitializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logBasedOnLevel, logDebug, logDebug, logDebug, logDebug, logError, logError, logError, logError, logInfo, logInfo, logInfo, logInfo, logName, LogStringContext, logTrace, logTrace, logTrace, logTrace, logWarning, logWarning, logWarning, logWarning, MDC, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq, withLogContextMethods inherited from interface org.apache.spark.ml.util.MLWritablesaveMethods inherited from interface org.apache.spark.ml.param.Paramsclear, copyValues, defaultCopy, defaultParamMap, estimateMatadataSize, explainParam, explainParams, extractParamMap, extractParamMap, get, getDefault, getOrDefault, getParam, hasDefault, hasParam, isDefined, isSet, onParamChange, paramMap, params, set, set, set, setDefault, setDefault, shouldOwn
- 
Constructor Details- 
ALS
- 
ALSpublic ALS()
 
- 
- 
Method Details- 
load
- 
trainpublic static <ID> scala.Tuple2<RDD<scala.Tuple2<ID,float[]>>, trainRDD<scala.Tuple2<ID, float[]>>> (RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord) Implementation of the ALS algorithm.This implementation of the ALS factorization algorithm partitions the two sets of factors among Spark workers so as to reduce network communication by only sending one copy of each factor vector to each Spark worker on each iteration, and only if needed. This is achieved by precomputing some information about the ratings matrix to determine which users require which item factors and vice versa. See the Scaladoc for InBlockfor a detailed explanation of how the precomputation is done.In addition, since each iteration of calculating the factor matrices depends on the known ratings, which are spread across Spark partitions, a naive implementation would incur significant network communication overhead between Spark workers, as the ratings RDD would be repeatedly shuffled during each iteration. This implementation reduces that overhead by performing the shuffling operation up front, precomputing each partition's ratings dependencies and duplicating those values to the appropriate workers before starting iterations to solve for the factor matrices. See the Scaladoc for OutBlockfor a detailed explanation of how the precomputation is done.Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by contiguous blocks from the ratings matrix but by a hash function on the rating's location in the matrix. If it helps you to visualize the partitions, it is easier to think of the term "block" as referring to a subset of an RDD containing the ratings rather than a contiguous submatrix of the ratings matrix. - Parameters:
- ratings- (undocumented)
- rank- (undocumented)
- numUserBlocks- (undocumented)
- numItemBlocks- (undocumented)
- maxIter- (undocumented)
- regParam- (undocumented)
- implicitPrefs- (undocumented)
- alpha- (undocumented)
- nonnegative- (undocumented)
- intermediateRDDStorageLevel- (undocumented)
- finalRDDStorageLevel- (undocumented)
- checkpointInterval- (undocumented)
- seed- (undocumented)
- evidence$1- (undocumented)
- ord- (undocumented)
- Returns:
- (undocumented)
 
- 
read
- 
org$apache$spark$internal$Logging$$log_public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_()
- 
org$apache$spark$internal$Logging$$log__$eqpublic static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) 
- 
LogStringContextpublic static org.apache.spark.internal.Logging.LogStringContext LogStringContext(scala.StringContext sc) 
- 
rankDescription copied from interface:ALSParamsParam for rank of the matrix factorization (positive). Default: 10
- 
numUserBlocksDescription copied from interface:ALSParamsParam for number of user blocks (positive). Default: 10- Specified by:
- numUserBlocksin interface- ALSParams
- Returns:
- (undocumented)
 
- 
numItemBlocksDescription copied from interface:ALSParamsParam for number of item blocks (positive). Default: 10- Specified by:
- numItemBlocksin interface- ALSParams
- Returns:
- (undocumented)
 
- 
implicitPrefsDescription copied from interface:ALSParamsParam to decide whether to use implicit preference. Default: false- Specified by:
- implicitPrefsin interface- ALSParams
- Returns:
- (undocumented)
 
- 
alphaDescription copied from interface:ALSParamsParam for the alpha parameter in the implicit preference formulation (nonnegative). Default: 1.0
- 
ratingColDescription copied from interface:ALSParamsParam for the column name for ratings. Default: "rating"
- 
nonnegativeDescription copied from interface:ALSParamsParam for whether to apply nonnegativity constraints. Default: false- Specified by:
- nonnegativein interface- ALSParams
- Returns:
- (undocumented)
 
- 
intermediateStorageLevelDescription copied from interface:ALSParamsParam for StorageLevel for intermediate datasets. Pass in a string representation ofStorageLevel. Cannot be "NONE". Default: "MEMORY_AND_DISK".- Specified by:
- intermediateStorageLevelin interface- ALSParams
- Returns:
- (undocumented)
 
- 
finalStorageLevelDescription copied from interface:ALSParamsParam for StorageLevel for ALS model factors. Pass in a string representation ofStorageLevel. Default: "MEMORY_AND_DISK".- Specified by:
- finalStorageLevelin interface- ALSParams
- Returns:
- (undocumented)
 
- 
seedDescription copied from interface:HasSeedParam for random seed.
- 
checkpointIntervalDescription copied from interface:HasCheckpointIntervalParam for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.- Specified by:
- checkpointIntervalin interface- HasCheckpointInterval
- Returns:
- (undocumented)
 
- 
regParamDescription copied from interface:HasRegParamParam for regularization parameter (>= 0).- Specified by:
- regParamin interface- HasRegParam
- Returns:
- (undocumented)
 
- 
maxIterDescription copied from interface:HasMaxIterParam for maximum number of iterations (>= 0).- Specified by:
- maxIterin interface- HasMaxIter
- Returns:
- (undocumented)
 
- 
userColDescription copied from interface:ALSModelParamsParam for the column name for user ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "user"- Specified by:
- userColin interface- ALSModelParams
- Returns:
- (undocumented)
 
- 
itemColDescription copied from interface:ALSModelParamsParam for the column name for item ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "item"- Specified by:
- itemColin interface- ALSModelParams
- Returns:
- (undocumented)
 
- 
coldStartStrategyDescription copied from interface:ALSModelParamsParam for strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: - "nan": predicted value for unknown ids will be NaN. - "drop": rows in the input DataFrame containing unknown ids will be dropped from the output DataFrame containing predictions. Default: "nan".- Specified by:
- coldStartStrategyin interface- ALSModelParams
- Returns:
- (undocumented)
 
- 
blockSizeDescription copied from interface:HasBlockSizeParam for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..- Specified by:
- blockSizein interface- HasBlockSize
- Returns:
- (undocumented)
 
- 
predictionColDescription copied from interface:HasPredictionColParam for prediction column name.- Specified by:
- predictionColin interface- HasPredictionCol
- Returns:
- (undocumented)
 
- 
uidDescription copied from interface:IdentifiableAn immutable unique ID for the object and its derivatives.- Specified by:
- uidin interface- Identifiable
- Returns:
- (undocumented)
 
- 
setRank
- 
setNumUserBlocks
- 
setNumItemBlocks
- 
setImplicitPrefs
- 
setAlpha
- 
setUserCol
- 
setItemCol
- 
setRatingCol
- 
setPredictionCol
- 
setMaxIter
- 
setRegParam
- 
setNonnegative
- 
setCheckpointInterval
- 
setSeed
- 
setIntermediateStorageLevel
- 
setFinalStorageLevel
- 
setColdStartStrategy
- 
setBlockSizeSet block size for stacking input data in matrices. Default is 4096.- Parameters:
- value- (undocumented)
- Returns:
- (undocumented)
 
- 
setNumBlocksSets both numUserBlocks and numItemBlocks to the specific value.- Parameters:
- value- (undocumented)
- Returns:
- (undocumented)
 
- 
fitDescription copied from class:EstimatorFits a model to the input data.
- 
transformSchemaDescription copied from class:PipelineStageCheck transform validity and derive the output schema from the input schema.We check validity for interactions between parameters during transformSchemaand raise an exception if any parameter value is invalid. Parameter value checks which do not depend on other parameters are handled byParam.validate().Typical implementation should first conduct verification on schema change and parameter validity, including complex parameter interaction checks. - Specified by:
- transformSchemain class- PipelineStage
- Parameters:
- schema- (undocumented)
- Returns:
- (undocumented)
 
- 
copyDescription copied from interface:ParamsCreates a copy of this instance with the same UID and some extra params. Subclasses should implement this method and set the return type properly. SeedefaultCopy().
- 
estimateModelSize
 
-