mahout推荐系统算法分析_第1页
mahout推荐系统算法分析_第2页
mahout推荐系统算法分析_第3页
mahout推荐系统算法分析_第4页
mahout推荐系统算法分析_第5页
已阅读5页,还剩19页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

RecommenderJob 源码分析 源码分析 Step by Step Maj 2014 年 6 月 3 日 11 58 09 0 RecommenderJob 简介简介 RecommenderJob 是 mahout 推荐系统的分布式实现 使用的算法是 Itembased 的协同 过滤算法 它的输入是一个含有用户喜好数据的 csv 文件 数据格式是 userID itemID prefValue 输出是 userIDs 对应的推荐物品和评分 这里以书中的例子作为输入 11015 11023 11032 5 21012 21022 5 21035 21042 31012 5 31044 31054 5 31075 41015 41033 41044 5 41064 51014 51023 51032 51044 51053 5 51064 其中第一列是用户 ID 第二列是物品 第三列是该用户对物品的评分 如用户 1 对 101 物品评分是 5 如此类推 将上面的评分保存为文件 存到 hdfs 中 路径为 usr testdata intro txt 执行以下命令 hadoop jar mahout core 0 9 job jar org apache mahout cf taste hadoop item RecommenderJob input usr testdata intro txt s SIMILARITY EUCLIDEAN DISTANCE output output1 该命令使用欧式距离求相似度 将结果输出到 output1 目录中 结果如下所示 这个结果也是很好理解的 对于用户 1 首要推荐 104 其次推荐 106 以此类推 RecommenderJob 使用 MR 分布式计算框架 所以所有的矩阵运算全部都映射为 key value 对并操作 运算不直观 推荐算法中用到了 9 个 MR 任务 所以对各个 MR 任务的 源码进行分析 为方便理解 提供对应步骤的中间结果显示 看完各个 MR 任务后 可以 对输入文件如何转换为向量 分布式矩阵乘法操作 如何计算相似度有比较好的理解 这 些中间结果绝大部分摘自网上参考文献 文章最后有附网站 不过网上用的是 0 7 我们 这边用 0 9 有个别流程不同 1 RecommenderJob 步骤一步骤一 PreparePreferenceMatrixJob PreparePreferenceMatrixJob 主要用于前期准备工作 如将输入文件的 key value 转为 key vector 等 该 Job 继承自 AbstractJob 实现了 run 方法就能被 main 调用了 下同 PreparePreferenceMatrixJob 由 3 个 PrepareJob 组成 Map reduce 下面来一个一个看 1 1ItemIDIndexMapper and ItemIDIndexReducer 用途 Convert items to an internal index 将 items 转化为内部索引 输入 输入 input 指定的输入文件 输出 输出 itemIDIndex ItemIDIndexMapper 这个 mapper 很简单 Setup 过程获取 transpose 布尔变量 默认是 false Map 代码如下 protected void map LongWritable key Text value Context context throws IOException InterruptedException String tokens TasteHadoopUtils splitPrefTokens value toString long itemID Long parseLong tokens transpose 0 1 int index TasteHadoopUtils idToIndex itemID indexWritable set index itemIDWritable set itemID context write indexWritable itemIDWritable 这里 itemID tokens 1 idToIndex 为 return 0 x7FFFFFFF 所以这里也等于 itemID 比如 itemID 为 101 则 idToIndex 也为 101 ItemIDIndexReducer 代码如下 protected void reduce VarIntWritable index Iterable possibleItemIDs Context context throws IOException InterruptedException long minimumItemID Long MAX VALUE for VarLongWritable varLongWritable possibleItemIDs long itemID varLongWritable get if itemID 101 不太清楚为什么会有 for 循环在里面 前面一个 key 应该只对 应一个 value 才对的 最后得出结果 1 ITEMID INDEX 2 102 102 103 103 101 101 106 106 107 107 104 104 105 105 ToItemPrefsMapper and ToUserVectorsReducer convert user preferences into a vector per user 将 user 评估值转换为每个 user 的向量 输入 input 指定的输入文件 输出 userVectors ToItemPrefsMapper ToItemPrefsMapper 继承自 ToEntityPrefsMapper 构造函数为 super false 即 itemKey false 因为 ToItemPrefsMapper 自身什么都没实现 所以可以直接看 ToEntityPrefsMapper Setup 设置 booleanData transpose ratingShift 这里的 ratingShift 没有设置项 所以为 0 0 transpose 为 false Map 代码如下 public void map LongWritable key Text value Context context throws IOException InterruptedException String tokens DELIMITER split value toString long userID Long parseLong tokens 0 long itemID Long parseLong tokens 1 if itemKey transpose If using items as keys and not transposing items and users then users are items Or if not using items as keys users are as usual but transposing items and users then users are items Confused long temp userID userID itemID itemID temp if booleanData context write new VarLongWritable userID new VarLongWritable itemID else float prefValue tokens length 2 Float parseFloat tokens 2 ratingShift 1 0f context write new VarLongWritable userID new EntityPrefWritable itemID prefValue 从上面代码不难看出最后输出是 userId itemID prefValue 如果 booleanData 设置为 true 则为 userID itemID ToUserVectorsReducer 代码如下 protected void reduce VarLongWritable userID Iterable itemPrefs Context context throws IOException InterruptedException Vector userVector new RandomAccessSparseVector Integer MAX VALUE 100 for VarLongWritable itemPref itemPrefs int index TasteHadoopUtils idToIndex itemPref get float value itemPref instanceof EntityPrefWritable EntityPrefWritable itemPref getPrefValue 1 0f userVector set index value if userVector getNumNondefaultElements minPreferences userVectorWritable set userVector userVectorWritable setWritesLaxPrecision true context getCounter Counters USERS increment 1 context write userID userVectorWritable 前面的 mapper 输出的 value 是EntityPrefWritable 但因为 EntityPrefWritable 是继承自 VarLongWritable 的 所以 Iterable 是 VarLongWritable 然后将用户所有的评分写到一个 vector 中 如果 vector 含有的 item 个数大于 minPreference 就输出 否则不输出 该值默 认为 1 这个 Job 的输出应该为 userId vector itmeId preValue itemId preValue 1 USER VECTORS 2 1 103 2 5 102 3 0 101 5 0 3 2 101 2 0 104 2 0 103 5 0 102 2 5 4 3 101 2 5 107 5 0 105 4 5 104 4 0 5 4 101 5 0 106 4 0 104 4 5 103 3 0 6 5 106 4 0 105 3 5 104 4 0 103 2 0 102 3 0 101 4 0 1 3 ToItemVectorsMapper and ToItemVectorsReducer 紧跟着上面 Job 之后 会将 users 写入到 hadoop 中 代码如下 int numberOfUsers int toUserVectors getCounters findCounter ToUserVectorsReducer Counters USERS getValue HadoopUtil writeInt numberOfUsers getOutputPath NUM USERS getConf 写入的文件为 numUsers bin 这个使得该 reduce 不能设置为多个 否则会出现多个 reduce 同时写一个文件的现象 不过这个 reducer 因为不需要干什么活 所以性能不差 这里的 numUsers bin 中的结果为 5 表明有 5 个用户 接下来的 Job 是 build the rating matrix 构建评级矩阵 ToItemVectorsMapper ToItemVectorsReducer 输入 userVectors 该输入是上个 Job 的输出 即userId vector itmeId preValue itemId preValue 输出 ratingMatrix ToItemVectorsMapper 代码如下 protected void map VarLongWritable rowIndex VectorWritable vectorWritable Context ctx throws IOException InterruptedException Vector userRatings vectorWritable get int column TasteHadoopUtils idToIndex rowIndex get itemVectorWritable setWritesLaxPrecision true Vector itemVector new RandomAccessSparseVector Integer MAX VALUE 1 for Vector Element elem userRatings nonZeroes itemID set elem index itemVector setQuick column elem get itemVectorWritable set itemVector ctx write itemID itemVectorWritable reset vector for reuse itemVector setQuick elem index 0 0 将原来userId vector itmeId preValue itemId preValue 改为 itemId userId prefValue ToItemVectorsReducer 这里和博客上的不一样 就是将 map 结果合并为 vector 输出的格式为 itemId vector userId prefValue userId prefValue 如下面结果 1 RATING MATRIX 2 102 5 3 0 2 2 5 1 3 0 3 103 5 2 0 4 3 0 2 5 0 1 2 5 4 101 5 4 0 4 5 0 3 2 5 2 2 0 1 5 0 5 106 5 4 0 4 4 0 6 107 3 5 0 7 104 5 4 0 4 4 5 3 4 0 2 2 0 8 105 5 3 5 3 4 5 2 RowSimilarityJob 该 Job 用于计算共生矩阵 是最重要的 Job 之一 该 Job 的 Input 是上一个 Job 的评级 矩阵 ratingMatrix 输出是 similarityMatrix 可以看出这个方法有 4 个 prepareJob Map reduce 这里和博客不同 博客只有 3 个 Mapreduce 2 0 初始化初始化 从 run 函数中可以看出该 Job 需要以下参数 numberOfColumns 获取 user 个数 similarityClassname 获取相似度参数 根据 overwrite 选项 确认是否删除中间结果文件夹 maxSimilaritiesPerRow 获取每行最大的相似度个数 默认是 100 excludeSelfSimilarity 是否计算自身相似性 默认是 false 不计算 maxObservationsPerRow 获取行最大个数 maxObservationsPerColumn 获取列最大个数 2 1 countObservation 输入 ratingMatrix 即 itemId vector userId prefValue userId prefValue 输出 notUsed CountObservationsMapper Map 代码如下 protected void map IntWritable rowIndex VectorWritable rowVectorWritable Context ctx throws IOException InterruptedException Vector row rowVectorWritable get for Vector Element elem row nonZeroes columnCounts setQuick elem index columnCounts getQuick elem index 1 该 Map 将 columnCounts 赋值 如 vector userId 1 userID 1 Cleanup protected void cleanup Context ctx throws IOException InterruptedException ctx write NullWritable get new VectorWritable columnCounts 将 vector userId 1 userID 1 写入到输出中 SumObservationsReducer 代码 public static class SumObservationsReducer extends Reducer Override protected void reduce NullWritable nullWritable Iterable partialVectors Context ctx throws IOException InterruptedException Vector counts Vectors sum partialVectors iterator Vectors write counts new Path ctx getConfiguration get OBSERVATIONS PER COLUMN PATH ctx getConfiguration 计算 vector 中值的和 并将其写入到 observationsPerColumnPath 中 实际运行时 通过写 读程序 可看出其写入的值为 Vector index value 2 4 5 6 3 4 1 3 4 4 可以看出上面是每个用户评分的 Item 个数 CombinerClass VectorSumCombiner 暂时没分析 总体来说 这个 mapreduce 比较诡异 除了 observationsPerColumnPath 有内容外 另外的 输出 notUsed 内容都是 null 2 2 normsAndTranspose 输入 ratingMatrix 即 itemId vector userId prefValue userId prefValue 输出 weights VectorNormMapper Setup 进行初始化 这里注意 observationsPerColumn 用到了 observationsPerColumnPath 中的结果 即用到 SumObservationsReducer 的结果 protected void setup Context ctx throws IOException InterruptedException Configuration conf ctx getConfiguration similarity ClassUtils instantiateAs conf get SIMILARITY CLASSNAME VectorSimilarityMeasure class norms new RandomAccessSparseVector Integer MAX VALUE nonZeroEntries new RandomAccessSparseVector Integer MAX VALUE maxValues new RandomAccessSparseVector Integer MAX VALUE threshold Double parseDouble conf get THRESHOLD observationsPerColumn Vectors readAsIntMap new Path conf get OBSERVATIONS PER COLUMN PATH conf maxObservationsPerRow conf getInt MAX OBSERVATIONS PER ROW DEFAULT MAX OBSERVATIONS PER ROW maxObservationsPerColumn conf getInt MAX OBSERVATIONS PER COLUMN DEFAULT MAX OBSERVATIONS PER COLUMN long seed Long parseLong conf get RANDOM SEED if seed NO FIXED RANDOM SEED random RandomUtils getRandom else random RandomUtils getRandom seed Map 函数中调用了 sampleDown 该代码如下 private Vector sampleDown Vector rowVector Context ctx int observationsPerRow rowVector getNumNondefaultElements double rowSampleRate double Math min maxObservationsPerRow observationsPerRow double observationsPerRow Vector downsampledRow rowVector like long usedObservations 0 long neglectedObservations 0 for Vector Element elem rowVector nonZeroes int columnCount observationsPerColumn get elem index double columnSampleRate double Math min maxObservationsPerColumn columnCount double columnCount if random nextDouble Math min rowSampleRate columnSampleRate maj random nextDouble 返回的是返回的是 0 1 downsampledRow setQuick elem index elem get usedObservations else neglectedObservations ctx getCounter Counters USED OBSERVATIONS increment usedObservations ctx getCounter Counters NEGLECTED OBSERVATIONS increment neglectedObservations return downsampledRow 从代码中看出获取一个 Item 的 Vector 计算出 rowSampleRate 然后复制一个空的 vector downsampledRow 然后 for 循环每一个用户 根据 observationsPerColumnPath 的结 果 计算出 columnSampleRate 如果 rowSampleRate 和 columnSampleRate 都小于 random 的值 则 downsampledRow 会设置 userId prefValue 否则忽略 这里是当实际数据大于 maxObservationsPerColumn 和 maxObservationsPerRow 时 用随机值随机获取 userID prefValue Map 代码如下 代码如下 protected void map IntWritable row VectorWritable vectorWritable Context ctx throws IOException InterruptedException Vector sampledRowVector sampleDown vectorWritable get ctx Vector rowVector similarity normalize sampledRowVector int numNonZeroEntries 0 double maxValue Double MIN VALUE for Vector Element element rowVector nonZeroes RandomAccessSparseVector partialColumnVector new RandomAccessSparseVector Integer MAX VALUE partialColumnVector setQuick row get element get ctx write new IntWritable element index new VectorWritable partialColumnVector numNonZeroEntries if maxValue vector maxValues 2147483646 0 x80000002 vector nonzeroEntries 2147483648 0 x80000000 vector norms 这里由于 map 中 threshhold 和 NO THRESHOLD 相等 所以 if 语句不执行 所以 maxValues 和 nonzeroEntries 都应该为空 normPath 为 1 normsPath 2 107 25 0 106 32 0 105 32 5 104 56 25 103 44 25 102 24 25 101 76 25 Combiner 函数函数 MergeVectorsCombiner 这里的 reduce 函数只有一行 protected void reduce IntWritable row Iterable partialVectors Context ctx throws IOException InterruptedException ctx write row new VectorWritable Vectors merge partialVectors 就是将 userID itemID prefValue 相同的 userID 合并为 userID vector itemID prefValue itmeID prefValue Reduce 函数 MergeVectorsReducer protected void reduce IntWritable row Iterable partialVectors Context ctx throws IOException InterruptedException Vector partialVector Vectors merge partialVectors if row get NORM VECTOR MARKER Vectors write partialVector normsPath ctx getConfiguration else if row get MAXVALUE VECTOR MARKER Vectors write partialVector maxValuesPath ctx getConfiguration else if row get NUM NON ZERO ENTRIES VECTOR MARKER Vectors write partialVector numNonZeroEntriesPath ctx getConfiguration true else ctx write row new VectorWritable partialVector 这里先将 mapper 中的输出再 merge 一次 然后判断 key 是否属于 cleanup 中的三个变 量 如果属于 则写入到对应的文件中 最后将 userID vector itemID prefValue itmeID prefValue 写入到结果文件 该分析完毕 Weight 结果如下 1 weights 2 1 103 2 5 102 3 0 101 5 0 3 2 101 2 0 104 2 0 103 5 0 102 2 5 4 3 101 2 5 107 5 0 105 4 5 104 4 0 5 4 101 5 0 106 4 0 104 4 5 103 3 0 6 5 106 4 0 105 3 5 104 4 0 103 2 0 102 3 0 101 4 0 2 3 pairwiseSimilarity 该 MR 用于计算相似度矩阵 输入 weightsPath 即 userID vector itemID prefValue itmeID prefValue 输出 pairwiseSimilarityPath CooccurrencesMapper setup 函数 函数 protected void setup Context ctx throws IOException InterruptedException similarity ClassUtils instantiateAs ctx getConfiguration get SIMILARITY CLASSNAME VectorSimilarityMeasure class numNonZeroEntries Vectors readAsIntMap new Path ctx getConfiguration get NUM NON ZERO ENTRIES PATH ctx getConfiguration maxValues Vectors read new Path ctx getConfiguration get MAXVALUES PATH ctx getConfiguration threshold Double parseDouble ctx getConfiguration get THRESHOLD 这里获取了 similarity 方法 还有 maxValue 和 nonzeroEntries 根据根据 2 2 Cleanup 函 数的分析 这两个向量的结果都是空的 CooccurrencesMapper map 函数函数 protected void map IntWritable column VectorWritable occurrenceVector Context ctx throws IOException InterruptedException Vector Element occurrences Vectors toArray occurrenceVector Arrays sort occurrences BY INDEX int cooccurrences 0 int prunedCooccurrences 0 for int n 0 n occurrences length n Vector Element occurrenceA occurrences n Vector dots new RandomAccessSparseVector Integer MAX VALUE for int m n m 102 prefValue101 prefValue102 103 prefValue101 prefValue103 104 prefValue101 prefVal ue104 然后是 然后是 102 103 prefValue102 prefValue103 104 prefValue102 prefValue104 以此类推 以此类推 其中其中similarity aggregate返回就是两个参数的乘积 返回就是两个参数的乘积 Combiner 函数 函数 VectorSumReducer protected void reduce WritableComparable key Iterable values Context ctx throws IOException InterruptedException ctx write key new VectorWritable Vectors sum values iterator 将相同 key 的 vector 进行相加 这个就是把 map 的输出整理一下 把相同 key 的 vector 中对应的项相加 比如 102 103 1prefValue102 1prefValue103 104 1prefValue102 1prefValue104 102 103 prefValue102 prefValue103 104 prefValue102 prefValue104 105 prefValue102 prefValu e105 那么整合就是 102 103 prefValue102 prefValue103 1prefValue102 1prefValue103 104 prefValue102 prefValue1 04 1prefValue102 1prefValue104 105 prefValue102 prefValue105 Reduce 函数 函数 SimilarityReducer Setup 最主要是读取 norms 这里是设置每个 itemID 的 prefValue 的平方和 接着分析 reduce 函数 protected void reduce IntWritable row Iterable partialDots Context ctx throws IOException InterruptedException Iterator partialDotsIterator partialDots iterator Vector dots partialDotsIterator next get while partialDotsIterator hasNext Vector toAdd partialDotsIterator next get for Element nonZeroElement toAdd nonZeroes dots setQuick nonZeroElement index dots getQuick nonZeroElement index nonZeroElement get Vector similarities dots like double normA norms getQuick row get for Element b dots nonZeroes double similarityValue similarity similarity b get normA norms getQuick b index numberOfColumns if similarityValue treshold similarities set b index similarityValue if excludeSelfSimilarity similarities setQuick row get 0 ctx write row new VectorWritable similarities 第一个 while 是将相同的 itemID 做一次相加计算 完成 combiner 中未完成的工作 第二个循环 for 循环 用于求相似度 这里说一下 similarity 的参数 如 norm 向量为 1 normsPath 2 107 25 0 106 32 0 105 32 5 104 56 25 103 44 25 102 24 25 101 76 25 上图第一行是 102 对应的 vector 第二行 dots 是 102 与其他各个 item 进行点乘后相加结果 第三行和第二行相同 感觉是博主打印多了一行 可以不用理会 b get 是 106 对应的值 12 是用户 5 的 102 prefValue 106 prefValue normA 是 102 的 norm 值 为 24 25 norms getQuick b index 为 106 的 norm 值 为 32 0 numberofColumns 为 5 根据欧式距离计算 可得对应的值为 0 149725067 可以看出 本 reduce 用于计算 itemID 与其他各个 itemID 的相似度 输出为 item vector itemId simi itemID simi 根据以上分析 矩阵的点乘公式为 相似度计算公式为 欧式距离为例 其中 I1 I2 分别代表项目 I1 和项目 I2 PuiI1 代表用户 ui 对项目 I1 的评分 PuiI2 代表用户 ui 对项目 I2 的评分 normsI1 就是 norms 中对应 I1 的平方和 比如 102 项目和 103 项目 102 5 3 0 2 2 5 1 3 0 103 5 2 0 4 3 0 2 5 0 1 2 5 可以得到 dot 26 norms 102 24 25 norms 103 44 25 带入上面的公式 可以 得到 simi 102 103 0 197 可见和上面的一样 2 4 asMatrix 输入 pairwiseSimilarityPath 即item vector itemId simi itemID simi 输出 getoutput 这里是 similarityMatrix UnsymmetrifyMapper setup 函数函数 就设置 maxSimilaritiesPerRow 变量 UnsymmetrifyMapper map 函数函数 protected void map IntWritable row VectorWritable similaritiesWritable Context ctx throws IOException InterruptedException Vector similarities similaritiesWritable get For performance the creation of transposedPartial is moved out of the while loop and it is reused inside Vector transposedPartial new RandomAccessSparseVector similarities size 1 TopElementsQueue topKQueue new TopElementsQueue maxSimilaritiesPerRow for Element nonZeroElement similarities nonZeroes MutableElement top topKQueue top double candidateValue nonZeroElement get if candidateValue top get top setIndex nonZeroElement index top set candidateValue topKQueue updateTop transposedPartial setQuick row get candidateValue ctx write new IntWritable nonZeroElement index new VectorWritable transposedPartial maj 被评价的被评价的itemId itemId pref transposedPartial setQuick row get 0 0 Vector topKSimilarities new RandomAccessSparseVector similarities size maxSimilaritiesPerRow for Vector Element topKSimilarity topKQueue getTopElements topKSimilarities setQuick topKSimilarity index topKSimilarity get ctx write row new VectorWritable topKSimilarities maj itemId vector 被评价被评价itemID pref 这里有两个循环 第一个循环 计算如 item105 vector itemId106 simi itemID107 simi 中 itme105 最大的相似度 然后将结果写成 item106 vector item105 simi item107 vector item105 simi 这里相当于把另外半个矩阵给补全了 很巧妙 第二个循环 将 itemId topKSimilarities 输出 这里不明白 topK 是否真的能把 topK 给输出来 应该是不对的 但不管怎么样 是有 两种输出的 Combiner 函数 函数 MergeToTopKSimilaritiesReducer 和 map 的 setup 一样 Reduce protected void reduce IntWritable row Iterable partials Context ctx throws IOException InterruptedException Vector allSimilarities Vectors merge partials Vector topKSimilarities Vectors topKElements maxSimilaritiesPerRow allSimilarities ctx write row new VectorWritable topKSimilarities 从代码上理解应该是先将相同 item 的 vector 给 merge 一下 然后进行 topk 排序 Reduce 函数 就是 MergeToTopKSimilaritiesReducer 不再分析 最后得到的结果如下 1 similarityMatrix 2 102 101 0 14201473202245876 106 0 14972

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论