All Implemented Interfaces:
Serializable, org.apache.spark.internal.Logging, Params, HasBlockSize, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed, ALSModelParams, ALSParams, DefaultParamsWritable, Identifiable, MLWritable

public class ALS extends Estimator<ALSModel> implements ALSParams, DefaultParamsWritable
Alternating Least Squares (ALS) matrix factorization.

ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called 'factor' matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.

This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the "out-links" of each user (which blocks of products it will contribute to) and "in-link" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.

For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at https://doi.org/10.1109/ICDM.2008.22, adapted for the blocked approach used here.

Essentially instead of finding the low-rank approximations to the rating matrix R, this finds the approximations for a preference matrix P where the elements of P are 1 if r is greater than 0 and 0 if r is less than or equal to 0. The ratings then act as 'confidence' values related to strength of indicated user preferences rather than explicit ratings given to items.

Note: the input rating dataset to the ALS implementation should be deterministic. Nondeterministic data can cause failure during fitting ALS model. For example, an order-sensitive operation like sampling after a repartition makes dataset output nondeterministic, like dataset.repartition(2).sample(false, 0.5, 1618). Checkpointing sampled dataset or adding a sort before sampling can help make the dataset deterministic.

See Also:
  • Constructor Details

    • ALS

      public ALS(String uid)
    • ALS

      public ALS()
  • Method Details

    • load

      public static ALS load(String path)
    • train

      public static <ID> scala.Tuple2<RDD<scala.Tuple2<ID,float[]>>,RDD<scala.Tuple2<ID,float[]>>> train(RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord)
      Implementation of the ALS algorithm.

      This implementation of the ALS factorization algorithm partitions the two sets of factors among Spark workers so as to reduce network communication by only sending one copy of each factor vector to each Spark worker on each iteration, and only if needed. This is achieved by precomputing some information about the ratings matrix to determine which users require which item factors and vice versa. See the Scaladoc for InBlock for a detailed explanation of how the precomputation is done.

      In addition, since each iteration of calculating the factor matrices depends on the known ratings, which are spread across Spark partitions, a naive implementation would incur significant network communication overhead between Spark workers, as the ratings RDD would be repeatedly shuffled during each iteration. This implementation reduces that overhead by performing the shuffling operation up front, precomputing each partition's ratings dependencies and duplicating those values to the appropriate workers before starting iterations to solve for the factor matrices. See the Scaladoc for OutBlock for a detailed explanation of how the precomputation is done.

      Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by contiguous blocks from the ratings matrix but by a hash function on the rating's location in the matrix. If it helps you to visualize the partitions, it is easier to think of the term "block" as referring to a subset of an RDD containing the ratings rather than a contiguous submatrix of the ratings matrix.

      Parameters:
      ratings - (undocumented)
      rank - (undocumented)
      numUserBlocks - (undocumented)
      numItemBlocks - (undocumented)
      maxIter - (undocumented)
      regParam - (undocumented)
      implicitPrefs - (undocumented)
      alpha - (undocumented)
      nonnegative - (undocumented)
      intermediateRDDStorageLevel - (undocumented)
      finalRDDStorageLevel - (undocumented)
      checkpointInterval - (undocumented)
      seed - (undocumented)
      evidence$1 - (undocumented)
      ord - (undocumented)
      Returns:
      (undocumented)
    • read

      public static MLReader<T> read()
    • org$apache$spark$internal$Logging$$log_

      public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_()
    • org$apache$spark$internal$Logging$$log__$eq

      public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1)
    • LogStringContext

      public static org.apache.spark.internal.Logging.LogStringContext LogStringContext(scala.StringContext sc)
    • rank

      public IntParam rank()
      Description copied from interface: ALSParams
      Param for rank of the matrix factorization (positive). Default: 10
      Specified by:
      rank in interface ALSParams
      Returns:
      (undocumented)
    • numUserBlocks

      public IntParam numUserBlocks()
      Description copied from interface: ALSParams
      Param for number of user blocks (positive). Default: 10
      Specified by:
      numUserBlocks in interface ALSParams
      Returns:
      (undocumented)
    • numItemBlocks

      public IntParam numItemBlocks()
      Description copied from interface: ALSParams
      Param for number of item blocks (positive). Default: 10
      Specified by:
      numItemBlocks in interface ALSParams
      Returns:
      (undocumented)
    • implicitPrefs

      public BooleanParam implicitPrefs()
      Description copied from interface: ALSParams
      Param to decide whether to use implicit preference. Default: false
      Specified by:
      implicitPrefs in interface ALSParams
      Returns:
      (undocumented)
    • alpha

      public DoubleParam alpha()
      Description copied from interface: ALSParams
      Param for the alpha parameter in the implicit preference formulation (nonnegative). Default: 1.0
      Specified by:
      alpha in interface ALSParams
      Returns:
      (undocumented)
    • ratingCol

      public Param<String> ratingCol()
      Description copied from interface: ALSParams
      Param for the column name for ratings. Default: "rating"
      Specified by:
      ratingCol in interface ALSParams
      Returns:
      (undocumented)
    • nonnegative

      public BooleanParam nonnegative()
      Description copied from interface: ALSParams
      Param for whether to apply nonnegativity constraints. Default: false
      Specified by:
      nonnegative in interface ALSParams
      Returns:
      (undocumented)
    • intermediateStorageLevel

      public Param<String> intermediateStorageLevel()
      Description copied from interface: ALSParams
      Param for StorageLevel for intermediate datasets. Pass in a string representation of StorageLevel. Cannot be "NONE". Default: "MEMORY_AND_DISK".

      Specified by:
      intermediateStorageLevel in interface ALSParams
      Returns:
      (undocumented)
    • finalStorageLevel

      public Param<String> finalStorageLevel()
      Description copied from interface: ALSParams
      Param for StorageLevel for ALS model factors. Pass in a string representation of StorageLevel. Default: "MEMORY_AND_DISK".

      Specified by:
      finalStorageLevel in interface ALSParams
      Returns:
      (undocumented)
    • seed

      public final LongParam seed()
      Description copied from interface: HasSeed
      Param for random seed.
      Specified by:
      seed in interface HasSeed
      Returns:
      (undocumented)
    • checkpointInterval

      public final IntParam checkpointInterval()
      Description copied from interface: HasCheckpointInterval
      Param for set checkpoint interval (&gt;= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.
      Specified by:
      checkpointInterval in interface HasCheckpointInterval
      Returns:
      (undocumented)
    • regParam

      public final DoubleParam regParam()
      Description copied from interface: HasRegParam
      Param for regularization parameter (&gt;= 0).
      Specified by:
      regParam in interface HasRegParam
      Returns:
      (undocumented)
    • maxIter

      public final IntParam maxIter()
      Description copied from interface: HasMaxIter
      Param for maximum number of iterations (&gt;= 0).
      Specified by:
      maxIter in interface HasMaxIter
      Returns:
      (undocumented)
    • userCol

      public Param<String> userCol()
      Description copied from interface: ALSModelParams
      Param for the column name for user ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "user"
      Specified by:
      userCol in interface ALSModelParams
      Returns:
      (undocumented)
    • itemCol

      public Param<String> itemCol()
      Description copied from interface: ALSModelParams
      Param for the column name for item ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "item"
      Specified by:
      itemCol in interface ALSModelParams
      Returns:
      (undocumented)
    • coldStartStrategy

      public Param<String> coldStartStrategy()
      Description copied from interface: ALSModelParams
      Param for strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: - "nan": predicted value for unknown ids will be NaN. - "drop": rows in the input DataFrame containing unknown ids will be dropped from the output DataFrame containing predictions. Default: "nan".
      Specified by:
      coldStartStrategy in interface ALSModelParams
      Returns:
      (undocumented)
    • blockSize

      public final IntParam blockSize()
      Description copied from interface: HasBlockSize
      Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..
      Specified by:
      blockSize in interface HasBlockSize
      Returns:
      (undocumented)
    • predictionCol

      public final Param<String> predictionCol()
      Description copied from interface: HasPredictionCol
      Param for prediction column name.
      Specified by:
      predictionCol in interface HasPredictionCol
      Returns:
      (undocumented)
    • uid

      public String uid()
      Description copied from interface: Identifiable
      An immutable unique ID for the object and its derivatives.
      Specified by:
      uid in interface Identifiable
      Returns:
      (undocumented)
    • setRank

      public ALS setRank(int value)
    • setNumUserBlocks

      public ALS setNumUserBlocks(int value)
    • setNumItemBlocks

      public ALS setNumItemBlocks(int value)
    • setImplicitPrefs

      public ALS setImplicitPrefs(boolean value)
    • setAlpha

      public ALS setAlpha(double value)
    • setUserCol

      public ALS setUserCol(String value)
    • setItemCol

      public ALS setItemCol(String value)
    • setRatingCol

      public ALS setRatingCol(String value)
    • setPredictionCol

      public ALS setPredictionCol(String value)
    • setMaxIter

      public ALS setMaxIter(int value)
    • setRegParam

      public ALS setRegParam(double value)
    • setNonnegative

      public ALS setNonnegative(boolean value)
    • setCheckpointInterval

      public ALS setCheckpointInterval(int value)
    • setSeed

      public ALS setSeed(long value)
    • setIntermediateStorageLevel

      public ALS setIntermediateStorageLevel(String value)
    • setFinalStorageLevel

      public ALS setFinalStorageLevel(String value)
    • setColdStartStrategy

      public ALS setColdStartStrategy(String value)
    • setBlockSize

      public ALS setBlockSize(int value)
      Set block size for stacking input data in matrices. Default is 4096.

      Parameters:
      value - (undocumented)
      Returns:
      (undocumented)
    • setNumBlocks

      public ALS setNumBlocks(int value)
      Sets both numUserBlocks and numItemBlocks to the specific value.

      Parameters:
      value - (undocumented)
      Returns:
      (undocumented)
    • fit

      public ALSModel fit(Dataset<?> dataset)
      Description copied from class: Estimator
      Fits a model to the input data.
      Specified by:
      fit in class Estimator<ALSModel>
      Parameters:
      dataset - (undocumented)
      Returns:
      (undocumented)
    • transformSchema

      public StructType transformSchema(StructType schema)
      Description copied from class: PipelineStage
      Check transform validity and derive the output schema from the input schema.

      We check validity for interactions between parameters during transformSchema and raise an exception if any parameter value is invalid. Parameter value checks which do not depend on other parameters are handled by Param.validate().

      Typical implementation should first conduct verification on schema change and parameter validity, including complex parameter interaction checks.

      Specified by:
      transformSchema in class PipelineStage
      Parameters:
      schema - (undocumented)
      Returns:
      (undocumented)
    • copy

      public ALS copy(ParamMap extra)
      Description copied from interface: Params
      Creates a copy of this instance with the same UID and some extra params. Subclasses should implement this method and set the return type properly. See defaultCopy().
      Specified by:
      copy in interface Params
      Specified by:
      copy in class Estimator<ALSModel>
      Parameters:
      extra - (undocumented)
      Returns:
      (undocumented)