# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from typing import Any, Dict, Optional, TYPE_CHECKING
from pyspark import keyword_only, since
from pyspark.sql import DataFrame
from pyspark.ml.util import JavaMLWritable, JavaMLReadable
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
from pyspark.ml.param.shared import HasPredictionCol, Param, TypeConverters, Params
from py4j.java_gateway import JavaObject
__all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"]
class _FPGrowthParams(HasPredictionCol):
Params for :py:class:`FPGrowth` and :py:class:`FPGrowthModel`.
.. versionadded:: 3.0.0
itemsCol: Param[str] = Param(
Params._dummy(), "itemsCol", "items column name", typeConverter=TypeConverters.toString
minSupport: Param[float] = Param(
"Minimal support level of the frequent pattern. [0.0, 1.0]. "
+ "Any pattern that appears more than (minSupport * size-of-the-dataset) "
+ "times will be output in the frequent itemsets.",
numPartitions: Param[int] = Param(
"Number of partitions (at least 1) used by parallel FP-growth. "
+ "By default the param is not set, "
+ "and partition number of the input dataset is used.",
minConfidence: Param[float] = Param(
"Minimal confidence for generating Association Rule. [0.0, 1.0]. "
+ "minConfidence will not affect the mining for frequent itemsets, "
+ "but will affect the association rules generation.",
def __init__(self, *args: Any):
super(_FPGrowthParams, self).__init__(*args)
minSupport=0.3, minConfidence=0.8, itemsCol="items", predictionCol="prediction"
def getItemsCol(self) -> str:
Gets the value of itemsCol or its default value.
return self.getOrDefault(self.itemsCol)
def getMinSupport(self) -> float:
Gets the value of minSupport or its default value.
return self.getOrDefault(self.minSupport)
def getNumPartitions(self) -> int:
Gets the value of :py:attr:`numPartitions` or its default value.
return self.getOrDefault(self.numPartitions)
def getMinConfidence(self) -> float:
Gets the value of minConfidence or its default value.
return self.getOrDefault(self.minConfidence)
[docs]class FPGrowthModel(JavaModel, _FPGrowthParams, JavaMLWritable, JavaMLReadable["FPGrowthModel"]):
Model fitted by FPGrowth.
.. versionadded:: 2.2.0
[docs] @since("3.0.0")
def setItemsCol(self, value: str) -> "FPGrowthModel":
Sets the value of :py:attr:`itemsCol`.
return self._set(itemsCol=value)
[docs] @since("3.0.0")
def setMinConfidence(self, value: float) -> "FPGrowthModel":
Sets the value of :py:attr:`minConfidence`.
return self._set(minConfidence=value)
[docs] @since("3.0.0")
def setPredictionCol(self, value: str) -> "FPGrowthModel":
Sets the value of :py:attr:`predictionCol`.
return self._set(predictionCol=value)
@property # type: ignore[misc]
def freqItemsets(self) -> DataFrame:
DataFrame with two columns:
* `items` - Itemset of the same type as the input column.
* `freq` - Frequency of the itemset (`LongType`).
return self._call_java("freqItemsets")
@property # type: ignore[misc]
def associationRules(self) -> DataFrame:
DataFrame with four columns:
* `antecedent` - Array of the same type as the input column.
* `consequent` - Array of the same type as the input column.
* `confidence` - Confidence for the rule (`DoubleType`).
* `lift` - Lift for the rule (`DoubleType`).
return self._call_java("associationRules")
[docs]class FPGrowth(
JavaEstimator[FPGrowthModel], _FPGrowthParams, JavaMLWritable, JavaMLReadable["FPGrowth"]
A parallel FP-growth algorithm to mine frequent itemsets.
.. versionadded:: 2.2.0
The algorithm is described in
Li et al., PFP: Parallel FP-Growth for Query Recommendation [1]_.
PFP distributes computation in such a way that each worker executes an
independent group of mining tasks. The FP-Growth algorithm is described in
Han et al., Mining frequent patterns without candidate generation [2]_
NULL values in the feature column are ignored during `fit()`.
Internally `transform` `collects` and `broadcasts` association rules.
.. [1] Haoyuan Li, Yi Wang, Dong Zhang, Ming Zhang, and Edward Y. Chang. 2008.
Pfp: parallel fp-growth for query recommendation.
In Proceedings of the 2008 ACM conference on Recommender systems (RecSys '08).
Association for Computing Machinery, New York, NY, USA, 107-114.
DOI: https://doi.org/10.1145/1454008.1454027
.. [2] Jiawei Han, Jian Pei, and Yiwen Yin. 2000.
Mining frequent patterns without candidate generation.
SIGMOD Rec. 29, 2 (June 2000), 1-12.
DOI: https://doi.org/10.1145/335191.335372
>>> from pyspark.sql.functions import split
>>> data = (spark.read
... .text("data/mllib/sample_fpgrowth.txt")
... .select(split("value", "\s+").alias("items")))
>>> data.show(truncate=False)
|items |
|[r, z, h, k, p] |
|[z, y, x, w, v, u, t, s]|
|[s, x, o, n, r] |
|[x, z, y, m, t, s, q, e]|
|[z] |
|[x, z, y, r, q, t, p] |
>>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
>>> fpm = fp.fit(data)
>>> fpm.setPredictionCol("newPrediction")
>>> fpm.freqItemsets.sort("items").show(5)
| items|freq|
| [p]| 2|
| [p, r]| 2|
|[p, r, z]| 2|
| [p, z]| 2|
| [q]| 2|
only showing top 5 rows
>>> fpm.associationRules.sort("antecedent", "consequent").show(5)
|antecedent|consequent|confidence|lift| support|
| [p]| [r]| 1.0| 2.0|0.3333333333333333|
| [p]| [z]| 1.0| 1.2|0.3333333333333333|
| [p, r]| [z]| 1.0| 1.2|0.3333333333333333|
| [p, z]| [r]| 1.0| 2.0|0.3333333333333333|
| [q]| [t]| 1.0| 2.0|0.3333333333333333|
only showing top 5 rows
>>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"])
>>> sorted(fpm.transform(new_data).first().newPrediction)
['x', 'y', 'z']
>>> model_path = temp_path + "/fpm_model"
>>> fpm.save(model_path)
>>> model2 = FPGrowthModel.load(model_path)
>>> fpm.transform(data).take(1) == model2.transform(data).take(1)
_input_kwargs: Dict[str, Any]
def __init__(
minSupport: float = 0.3,
minConfidence: float = 0.8,
itemsCol: str = "items",
predictionCol: str = "prediction",
numPartitions: Optional[int] = None,
__init__(self, \\*, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
predictionCol="prediction", numPartitions=None)
super(FPGrowth, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid)
kwargs = self._input_kwargs
[docs] @keyword_only
def setParams(
minSupport: float = 0.3,
minConfidence: float = 0.8,
itemsCol: str = "items",
predictionCol: str = "prediction",
numPartitions: Optional[int] = None,
) -> "FPGrowth":
setParams(self, \\*, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
predictionCol="prediction", numPartitions=None)
kwargs = self._input_kwargs
return self._set(**kwargs)
[docs] def setItemsCol(self, value: str) -> "FPGrowth":
Sets the value of :py:attr:`itemsCol`.
return self._set(itemsCol=value)
[docs] def setMinSupport(self, value: float) -> "FPGrowth":
Sets the value of :py:attr:`minSupport`.
return self._set(minSupport=value)
[docs] def setNumPartitions(self, value: int) -> "FPGrowth":
Sets the value of :py:attr:`numPartitions`.
return self._set(numPartitions=value)
[docs] def setMinConfidence(self, value: float) -> "FPGrowth":
Sets the value of :py:attr:`minConfidence`.
return self._set(minConfidence=value)
[docs] def setPredictionCol(self, value: str) -> "FPGrowth":
Sets the value of :py:attr:`predictionCol`.
return self._set(predictionCol=value)
def _create_model(self, java_model: "JavaObject") -> FPGrowthModel:
return FPGrowthModel(java_model)
[docs]class PrefixSpan(JavaParams):
A parallel PrefixSpan algorithm to mine frequent sequential patterns.
The PrefixSpan algorithm is described in J. Pei, et al., PrefixSpan: Mining Sequential Patterns
Efficiently by Prefix-Projected Pattern Growth
(see `here <https://doi.org/10.1109/ICDE.2001.914830">`_).
This class is not yet an Estimator/Transformer, use :py:func:`findFrequentSequentialPatterns`
method to run the PrefixSpan algorithm.
.. versionadded:: 2.4.0
See `Sequential Pattern Mining (Wikipedia) \
>>> from pyspark.ml.fpm import PrefixSpan
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(sequence=[[1, 2], [3]]),
... Row(sequence=[[1], [3, 2], [1, 2]]),
... Row(sequence=[[1, 2], [5]]),
... Row(sequence=[[6]])]).toDF()
>>> prefixSpan = PrefixSpan()
>>> prefixSpan.getMaxLocalProjDBSize()
>>> prefixSpan.getSequenceCol()
>>> prefixSpan.setMinSupport(0.5)
>>> prefixSpan.setMaxPatternLength(5)
>>> prefixSpan.findFrequentSequentialPatterns(df).sort("sequence").show(truncate=False)
|sequence |freq|
|[[1]] |3 |
|[[1], [3]]|2 |
|[[2]] |3 |
|[[2, 1]] |3 |
|[[3]] |2 |
_input_kwargs: Dict[str, Any]
minSupport: Param[float] = Param(
"The minimal support level of the "
+ "sequential pattern. Sequential pattern that appears more than "
+ "(minSupport * size-of-the-dataset) times will be output. Must be >= 0.",
maxPatternLength: Param[int] = Param(
"The maximal length of the sequential pattern. Must be > 0.",
maxLocalProjDBSize: Param[int] = Param(
"The maximum number of items (including delimiters used in the "
+ "internal storage format) allowed in a projected database before "
+ "local processing. If a projected database exceeds this size, "
+ "another iteration of distributed prefix growth is run. "
+ "Must be > 0.",
sequenceCol: Param[str] = Param(
"The name of the sequence column in "
+ "dataset, rows with nulls in this column are ignored.",
def __init__(
minSupport: float = 0.1,
maxPatternLength: int = 10,
maxLocalProjDBSize: int = 32000000,
sequenceCol: str = "sequence",
__init__(self, \\*, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \
super(PrefixSpan, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.PrefixSpan", self.uid)
minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, sequenceCol="sequence"
kwargs = self._input_kwargs
[docs] @keyword_only
def setParams(
minSupport: float = 0.1,
maxPatternLength: int = 10,
maxLocalProjDBSize: int = 32000000,
sequenceCol: str = "sequence",
) -> "PrefixSpan":
setParams(self, \\*, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \
kwargs = self._input_kwargs
return self._set(**kwargs)
[docs] @since("3.0.0")
def setMinSupport(self, value: float) -> "PrefixSpan":
Sets the value of :py:attr:`minSupport`.
return self._set(minSupport=value)
[docs] @since("3.0.0")
def getMinSupport(self) -> float:
Gets the value of minSupport or its default value.
return self.getOrDefault(self.minSupport)
[docs] @since("3.0.0")
def setMaxPatternLength(self, value: int) -> "PrefixSpan":
Sets the value of :py:attr:`maxPatternLength`.
return self._set(maxPatternLength=value)
[docs] @since("3.0.0")
def getMaxPatternLength(self) -> int:
Gets the value of maxPatternLength or its default value.
return self.getOrDefault(self.maxPatternLength)
[docs] @since("3.0.0")
def setMaxLocalProjDBSize(self, value: int) -> "PrefixSpan":
Sets the value of :py:attr:`maxLocalProjDBSize`.
return self._set(maxLocalProjDBSize=value)
[docs] @since("3.0.0")
def getMaxLocalProjDBSize(self) -> int:
Gets the value of maxLocalProjDBSize or its default value.
return self.getOrDefault(self.maxLocalProjDBSize)
[docs] @since("3.0.0")
def setSequenceCol(self, value: str) -> "PrefixSpan":
Sets the value of :py:attr:`sequenceCol`.
return self._set(sequenceCol=value)
[docs] @since("3.0.0")
def getSequenceCol(self) -> str:
Gets the value of sequenceCol or its default value.
return self.getOrDefault(self.sequenceCol)
[docs] def findFrequentSequentialPatterns(self, dataset: DataFrame) -> DataFrame:
Finds the complete set of frequent sequential patterns in the input sequences of itemsets.
.. versionadded:: 2.4.0
dataset : :py:class:`pyspark.sql.DataFrame`
A dataframe containing a sequence column which is
`ArrayType(ArrayType(T))` type, T is the item type for the input dataset.
A `DataFrame` that contains columns of sequence and corresponding frequency.
The schema of it will be:
- `sequence: ArrayType(ArrayType(T))` (T is the item type)
- `freq: Long`
assert self._java_obj is not None
jdf = self._java_obj.findFrequentSequentialPatterns(dataset._jdf)
return DataFrame(jdf, dataset.sparkSession)
if __name__ == "__main__":
import doctest
import pyspark.ml.fpm
from pyspark.sql import SparkSession
globs = pyspark.ml.fpm.__dict__.copy()
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
spark = SparkSession.builder.master("local[2]").appName("ml.fpm tests").getOrCreate()
sc = spark.sparkContext
globs["sc"] = sc
globs["spark"] = spark
import tempfile
temp_path = tempfile.mkdtemp()
globs["temp_path"] = temp_path
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
from shutil import rmtree
except OSError:
if failure_count: