`
yaven
  • 浏览: 62408 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

mahout中的PFPGrowth算法源码解析

 
阅读更多

略做分析,忘批评指正。

mahout源码版本:0.7

 

mahout官方算法说明是根据原google的几位大牛的论文进行实现,详情请参见:

https://cwiki.apache.org/confluence/display/MAHOUT/Parallel+Frequent+Pattern+Mining

 

在实际应用中,貌似有些问题:

I1: 不能通过指定谓词进行挖掘。

I2:  不能通过约束规则进行挖掘。

在实际的使用中还需要进行一些改动来实现这些功能。

 

1  起始类:FPGrowthDriver

输入参数:minSupport 最小支持计数,默认是3.

          maxHeapSize 最大的堆大小,表示top Kitem数,默认是50

          numGroups 分组数,即将数据分成多少组来进行频繁树的建立。

          splitterPattern  特征的分割方式。

          numTreeCacheEntries 树的entry的缓存数,用来防止树的重复建立。

          method  指定是顺序方式,还是mapreduce计算方式。

          encoding 文件的编码集。

          userFPG2 用替换的FPG实现方式。

 

2   并行频繁树。

如果使用的mapreduce计算方式,则调用PFPGrowthrunPFPGrowth方法。

第一个job统计所有feature的支持度计数,用于之后的排序等工作。

2.1 startingParallelCounting

   ParallelCountingMapper 将一行transaction的每个特征按其名字分布出去。

   ParallelCountingReducer 对特征值进行计数。

   结果存到PARALLEL_COUNTING中。

 

3   第二个步骤将第一步得到的feature存入DistributeCache

   然后会把支持度大于指定最小支持度(默认为3)以上的feature列表载入到内存中。这里如果feature列表大于内存能承受的大小,则需要将feature分到不同的文件中,因为hdfs不支持随机

saveFList feature列表存入到hdfs中。

 

4   第三步,对feature进行分布,各自建TransactionTree,然后导出频繁模式。

startParallelFPGrowth

     4.1   从原始feature文件中,读入feature,根据支持度计数排列后的feature进行分组,分组的feature即为规则的右值。

ParallelFPGrowthMappersetup方法中会载入所有的满足最小支持度的feature及其支持度,并对feature进行incrementindex的建立。根据item对应的indexitem进行分组。

对每个transaction,从后取feature作为分组分布的key,把列表中feature前的feature做为TransactionTree分布出去。

ParallelFPGrowthCombiner 构造TransactionTree,将所有相同indexfeature相关的TransactionTree合并到一个中。

TransactionTree

   childCount 是一个数组,保存节点的子节点的个数。

   nodeCount 是一个数组,保存节点的频繁度计数。

   attribute 是一个数组,保存feature的编号,不同分支下的相同名字的节点,所在位置不同。

 nodeChildren 是一个二维数组,保持某节点的第n个子节点在attribute数组中的位置编号。

addPattern() 在现有的树中添加一个模式。遍历新加入的模式的各属性,首先默认是增量状态,从根节点找与模式想匹配的频繁树的分支,在没有找到现有模式节点的地方,切换成新建模式的状态,开始创建模式的节点。

context.write(key, cTree.getCompressedTree()), combiner导出的是一个压缩树,这里又涉及到了TransactionTreeIterator对象对TransactionTree的遍历,对树的路径的遍历。computeNext中:

while(top[1] + 1 == transactionTree.childCount(top[0]))这行用于判断child node是否都被访问过了(因为top[1]-1开始增量计数)。

getCompressedTree()node表示树中独立节点的个数,size代表事物集中项的个数加上2乘以transaction的个数,如果node×4 + 树的所有节点的子节点个数小于等于 size,则直接返回重新构成的树,否则用compressedTransactionSet构造一个TransactionTree

   最后将压缩的TransactionTree分布出去。

 

    4.2   ParallelFPGrowthReducer

setup中,从Configuration中取到PFPGrowth.PFP_PARAMETERS参数;从DistributedCache中读入feature freq list需要ensure PFPGrowth.readFList方法读入的文件是否已经按freq从大到小排列好!!!),feature存入到featureReverseMap中,频繁计数值存入到freqList中。

reduce中,对接收到的Transaction列表进行合并,构造一棵新的TransactionTree—cTree;cTree中导出feature->freq的列表localFList这个是选用FPGrowth的处理方法时使用的,可以放在条件判断分支中???,记录了所有feature及其freq计数的列表,将localFList的元素按freq计数倒序排列。

判断是用FPGrowthIds还是用FPGrowth进行频繁模式的导出。

FPGrowthIds.generateTopKFreqentPatterns(cTree.iterator(),

freqList,

minSupport,

maxHeapSize,

PFPGrowth.getGroupMembers(key.get(),

maxPerGroup,

numFeatures),

IntegerStringOutputConverter,

ContextStatusUpdater)

首先,将freqList进行截断,只保留频繁度大于minSupportfeature,这个feature列表returnFeatures用于之后的频繁模式的导出;然后构造FPTree,遍历cTree中的所有事务,即树的所有分支,并将这些事务用来构造频繁树。

 

FPTree.java中添加事物项集到频繁树中的方法:

/**

     * Adds an itemset with the given occurrance count.

     */

    publicvoid accumulate(IntArrayList argItems, long count) {

        // boxed primitive used so we can use custom comparitor in sort

        List<Integer> items = Lists.newArrayList();

        for (int i = 0; i < argItems.size(); i++) {

            items.add(argItems.get(i));

        }

        /* 对这个transaction中的itemcode根据支持度计数进行排序。 */

        Collections.sort(items, attrComparator);

 

        FPNode currNode = root;

        for (int i = 0; i < items.size(); i++) {

            int item = items.get(i);

            long attrCount = 0;

            if (item < attrCountList.size())

                attrCount = attrCountList.get(item);

            /* 只收支持度高于minSupport */

            if (attrCount < minSupport)

                continue;

 

            /* 查看当前节点是否有子节点是item */

            FPNode next = currNode.child(item);

            /* 如果没有子节点是item,则创建此子节点 */

            if (next == null) {

                next = new FPNode(currNode, item, count);

                currNode.addChild(next);

                /* item的节点索引 */

                List<FPNode> nodeList = (List<FPNode>) attrNodeLists.get(item);

                if (nodeList == null) {

                    nodeList = Lists.newArrayList();

                    attrNodeLists.put(item, nodeList);

                }

                nodeList.add(next);

            } else {

                next.accumulate(count);

            }

            currNode = next;

        }

    }

 

   4.3   用频繁模树导出频繁模式。

调用fpGrowth()方法来对每个满足最小支持度的feature进行频繁模式FrequentPatternMaxHeap的导出;对某个feature,从FPTree中导出其频繁模式的growth()方法。

growth()方法,如果当前feature的频繁度计数不够大,则直接返回一个空的FrequentPatternMaxHeap;否则调用FPTreecreateMoreFreqConditionalTree方法,找到这个节点出现的所有位置,往根节点扫描,导出路径上的频繁节点,构造一个condTree;然后调用splitSinglePrefix方法,将共享的feature放入到pTree中,底层的feature放到qTree中。

ptreeqtree分别保存往上挖和往下挖频繁模式得到的结果。如下图,从x节点开始挖,ptree包含nqtree包含x的子节点中的频繁项。

 

 

 

导出的FrequentPatternMaxHeapTopKPatternsOutputConverter发送出去。结果的存储路径为“fpgrowth”

 

5   AggregateMapper

读入fpgrowth中的数据,根据模式中的每个feature将模式及其支持度发送出去。

 

6   AggregateReducer

对某一个feature,将所有相关的模式进行合并,即可得到最终的频繁模式的结果。

 

  • 大小: 8.5 KB
分享到:
评论
2 楼 yaven 2013-07-02  
comaple 写道
你好,我现在在研究mahout的PFPGrowth算法,想问一下你的下面这段话中,“对每个transaction,从后取feature作为分组分布的key”是从哪行代码中获得的,我的理解这个key就是groupid,但不知道对不对。请指教谢谢。
   
“对每个transaction,从后取feature作为分组分布的key,把列表中feature前的feature做为TransactionTree分布出去。”


是的,key就是PFPGrowth.getGroup(item, maxPerGroup);这行代码生成的groupID。
对应ParallelFPGrowthMapper类中的map方法的第2个for循环的处理。

你应该都已经找到代码了。之前的“ParrallelFPGrowthMapper”多写了一个“r",特此更正一下。
1 楼 comaple 2013-03-13  
你好,我现在在研究mahout的PFPGrowth算法,想问一下你的下面这段话中,“对每个transaction,从后取feature作为分组分布的key”是从哪行代码中获得的,我的理解这个key就是groupid,但不知道对不对。请指教谢谢。
   
“对每个transaction,从后取feature作为分组分布的key,把列表中feature前的feature做为TransactionTree分布出去。”

相关推荐

Global site tag (gtag.js) - Google Analytics