




已阅读5页,还剩19页未读, 继续免费阅读
版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
基于MapReduce的K-Means并行算法设计目录一、实验设计21.实验说明22.算法说明2(1).聚类2(2).基于划分的聚类方法2(3).K-Means算法2(4).MapReduce并行化聚类算法设计思路33.类说明3(1).Instance3(2).Cluster4(3).EuclideanDistance4(4).RandomClusterGenerator4(5).KMeans4(6).KmeansCluster4(7).KMeansDriver5二、实验结果说明51.实验设置52.源数据点63.聚类结果7三、性能和不足7四、源程序81.Instance类82.Cluster类103.EuclideanDistance类124.PageRankDriver类125.KMeans类156.KMeansCluster类197.KMeansDriver类21五、心得体会23一、 实验设计1. 实验说明在Eclipse环境下编写实现k-means算法。2. 算法说明(1). 聚类将给定的多个对象分成若干组,组内的各个对象是相似的,组间的对象是不相似的。进行划分的过程就是聚类过程,划分后的组称为簇(cluster)。几种聚类方法: 基于划分的方法; 基于层次的方法; 基于密度的方法; .(2). 基于划分的聚类方法给定N个对象,构造K个分组,每个分组就代表一个聚类。K个分组满足以下条件:(1)每个分组至少包含一个对象;(2)每个对象属于且仅属于一个分组。K-Means算法是最常见和典型的基于划分的聚类方法(3). K-Means算法输入:待聚类的N个数据点,期望生成的聚类的个数K输出:K个聚类算法描述: 选出K个点作为初始的cluster center Loop: 对输入中的每一个点p: 计算p到各个cluster的距离; 将p归入最近的cluster; 重新计算各个cluster的中心 如果不满足停止条件,goto Loop; 否则,停止(4). MapReduce并行化聚类算法设计思路将所有的数据分布到不同的MapReduce节点上,每个节点只对自己的数据进行计算。每个Map节点能够读取上一次迭代生成的cluster centers,并判断自己的各个数据点应该属于哪一个cluster。Reduce节点综合每个属于每个cluster的数据点,计算出新的cluster centers。每一个节点需要访问如下的全局文件:当前的迭代计数和K个表示不同聚类中心的如下的数据结构: cluster idcluster center属于该cluster center的数据点的个数这是唯一的全局文件。3. 类说明(1). Instance该类对应文件中原始数据点的格式,以ArrayList存放数据点的各个分量。需要编写加法,乘法,除法函数,用于计算簇中心:簇中所有点相加/簇中数据点的个数。如果是在原有的簇中追加一个数据点,则用(簇中心点*簇中数据点个数+新的数据点)/(簇中数据点个数+1)。(2). Cluster该类记录簇的信息各个节点共享的全局信息,记录每次划分的簇的信息。(3). EuclideanDistance计算欧式距离,通过计算数据点与各个簇中心的距离,选择最小的欧式距离对应的簇中心作为该数据点属于的簇。(4). RandomClusterGenerator该类用于初始时生成k个簇中心,一开始将读入的每个节点作为簇中心,并为其分配一个ID,当达到k个时,以1/(1+k)的概率返回一个0,k-1中的正整数,将其对应的簇ID的簇中心替换。(5). KMeans该类即是kmeans算法的实现。它的mapper类应该读入所有的簇中心的信息kclusters。应在setup()时将之前计算的簇中心读入,作为全局文件。map()所要做的是,读取每个数据点寻找离该点最近的簇id,通过计算欧式距离,选择距离最小的那个簇中心,输出的格式是。它的combiner类将同一个节点的数据点各个簇计算新的簇中心。簇中所有点相加/簇中数据点的个数=新的簇中心。它的reducer类将不同节点的计算结果汇总,计算全局的新的簇中心。(6). KmeansCluster该类做的工作其实与KMeans类差不多,只不过是没有KMeans的reducer类。在收敛条件满足且所有簇中心的文件最后产生后,再对输入文件中的所有实例进行划分簇的工作,最后把所有实例按照(实例,簇id)的方式写进结果文件。它的Mapper类也是在setup()时将之前计算的簇中心读入,作为全局文件。map()所要做的是,读取每个数据点寻找离该点最近的簇id,通过计算欧式距离,选择距离最小的那个簇中心,输出的格式是。(7). KMeansDriver该类是用来启动整个MapReduce,启动参数包括k: 簇中心数,iteration num: 迭代数,input path: 输入路径,output path: 输出路径。首先调用RandomClusterGenerator类的generateInitialCluster()初始化簇中心,然后用KMeans类进行簇中心的迭代计算,最后用KMeansCluster类为每个数据点选择最终确定的距离最近的簇。二、 实验结果说明1. 实验设置簇个数设置为5,迭代次数设置为102. 源数据点3. 聚类结果三、 性能和不足k-means算法对初始cluster centers的选取会影响到最终的聚类结果,能得到局部最优解,不保证得到全局最优解。相似度计算和比较时的计算量较大。四、 源程序1. Instance类public class Instance implements WritableArrayList value;public Instance()value = new ArrayList();public Instance(String line)String valueString = line.split(,);value = new ArrayList();for(int i = 0; i valueString.length; i+)value.add(Double.parseDouble(valueStringi);public Instance(Instance ins)value = new ArrayList();for(int i = 0; i ins.getValue().size(); i+)value.add(new Double(ins.getValue().get(i);public ArrayList getValue()return value;public Instance add(Instance instance)if(value.size() = 0)return new Instance(instance);else if(instance.getValue().size() = 0)return new Instance(this);else if(value.size() != instance.getValue().size()try throw new Exception(can not add! dimension not compatible! + value.size() + ,+ instance.getValue().size(); catch (Exception e) e.printStackTrace();return null;elseInstance result = new Instance();for(int i = 0;i value.size(); i+)result.getValue().add(value.get(i) + instance.getValue().get(i);return result;public Instance multiply(double num)Instance result = new Instance();for(int i = 0; i value.size(); i+)result.getValue().add(value.get(i) * num);return result;public Instance divide(double num)Instance result = new Instance();for(int i = 0; i value.size(); i+)result.getValue().add(value.get(i) / num);return result;Overridepublic void write(DataOutput out) throws IOException out.writeInt(value.size();for(int i = 0; i value.size(); i+)out.writeDouble(value.get(i);Overridepublic void readFields(DataInput in) throws IOException int size = 0;value = new ArrayList();if(size = in.readInt() != 0)for(int i = 0; i size; i+)value.add(in.readDouble();2. Cluster类public class Cluster implements Writableprivate int clusterID;private long numOfPoints;private Instance center;public Cluster()this.setClusterID(-1);this.setNumOfPoints(0);this.setCenter(new Instance();public Cluster(int clusterID,Instance center)this.setClusterID(clusterID);this.setNumOfPoints(0);this.setCenter(center);public Cluster(String line)String value = line.split(,3);clusterID = Integer.parseInt(value0);numOfPoints = Long.parseLong(value1);center = new Instance(value2);Overridepublic void write(DataOutput out) throws IOException out.writeInt(clusterID);out.writeLong(numOfPoints);center.write(out);Overridepublic void readFields(DataInput in) throws IOException clusterID = in.readInt();numOfPoints = in.readLong();center.readFields(in);3. EuclideanDistance类public interface Distance double getDistance(List a,List b) throws Exception;public class EuclideanDistance implements Distance public double getDistance(List a, List b) throws Exception if(a.size() != b.size()throw new Exception(size not compatible!);elsedouble sum = 0.0;for(int i = 0;i a.size();i+)sum += Math.pow(a.get(i).doubleValue() - b.get(i).doubleValue(), 2);sum = Math.sqrt(sum);return sum;4. PageRankDriver类public final class RandomClusterGenerator private int k;private FileStatus fileList;private FileSystem fs;private ArrayList kClusters;private Configuration conf;public RandomClusterGenerator(Configuration conf,String filePath,int k)this.k = k;try fs = FileSystem.get(URI.create(filePath),conf);fileList = fs.listStatus(new Path(filePath);kClusters = new ArrayList(k);this.conf = conf; catch (IOException e) e.printStackTrace();public void generateInitialCluster(String destinationPath)Text line = new Text();FSDataInputStream fsi = null;try for(int i = 0;i 0)Instance instance = new Instance(line.toString();makeDecision(instance); catch (IOException e) e.printStackTrace(); finally try fsi.close(); catch (IOException e) e.printStackTrace();writeBackToFile(destinationPath);public void makeDecision(Instance instance)if(kClusters.size() k)Cluster cluster = new Cluster(kClusters.size() + 1, instance);kClusters.add(cluster);elseint choice = randomChoose(k);if(!(choice = -1)int id = kClusters.get(choice).getClusterID();kClusters.remove(choice);Cluster cluster = new Cluster(id, instance);kClusters.add(cluster);public int randomChoose(int k)Random random = new Random();if(random.nextInt(k + 1) = 0)return new Random().nextInt(k);elsereturn -1;public void writeBackToFile(String destinationPath)Path path = new Path(destinationPath + cluster-0/clusters);FSDataOutputStream fsi = null;try fsi = fs.create(path);for(Cluster cluster : kClusters)fsi.write(cluster.toString() + n).getBytes(); catch (IOException e) e.printStackTrace(); finally try fsi.close(); catch (IOException e) e.printStackTrace();5. KMeans类public class KMeans public static class KMeansMapper extends Mapperprivate ArrayList kClusters = new ArrayList();protected void setup(Context context) throws IOException,InterruptedExceptionsuper.setup(context);FileSystem fs = FileSystem.get(context.getConfiguration(); FileStatus fileList = fs.listStatus(new Path(context.getConfiguration().get(clusterPath); BufferedReader in = null;FSDataInputStream fsi = null;String line = null; for(int i = 0; i fileList.length; i+) if(!fileListi.isDir() fsi = fs.open(fileListi.getPath();in = new BufferedReader(new InputStreamReader(fsi,UTF-8);while(line = in.readLine() != null)Cluster cluster = new Cluster(line);cluster.setNumOfPoints(0);kClusters.add(cluster); in.close(); fsi.close();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedExceptionInstance instance = new Instance(value.toString();int id;try id = getNearest(instance);if(id = -1)throw new InterruptedException(id = -1);elseCluster cluster = new Cluster(id, instance);cluster.setNumOfPoints(1);context.write(new IntWritable(id), cluster); catch (Exception e) e.printStackTrace();public int getNearest(Instance instance) throws Exceptionint id = -1;double distance = Double.MAX_VALUE;Distance distanceMeasure = new EuclideanDistance();double newDis = 0.0;for(Cluster cluster : kClusters)newDis = distanceMeasure.getDistance(cluster.getCenter().getValue(), instance.getValue();if(newDis distance)id = cluster.getClusterID();distance = newDis;return id; public static class KMeansCombiner extends Reducerpublic void reduce(IntWritable key, Iterable value, Context context)throws IOException, InterruptedExceptionInstance instance = new Instance();int numOfPoints = 0;for(Cluster cluster : value)numOfPoints += cluster.getNumOfPoints();instance = instance.add(cluster.getCenter().multiply(cluster.getNumOfPoints();Cluster cluster = new Cluster(key.get(),instance.divide(numOfPoints);cluster.setNumOfPoints(numOfPoints);System.out.println(combiner emit cluster: + cluster.toString();context.write(key, cluster); public static class KMeansReducer extends Reducerpublic void reduce(IntWritable key, Iterable value, Context context)throws IOException, InterruptedExceptionInstance instance = new Instance();int numOfPoints = 0;for(Cluster cluster : value)numOfPoints += cluster.getNumOfPoints();instance = instance.add(cluster.getCenter().multiply(cluster.getNumOfPoints();Cluster cluster = new Cluster(key.get(),instance.divide(numOfPoints);cluster.setNumOfPoints(numOfPoints);context.write(NullWritable.get(), cluster);6. KMeansCluster类public class KMeansCluster public static class KMeansClusterMapper extends Mapperprivate ArrayList kClusters = new ArrayList();protected void setup(Context context) throws IOException,InterruptedExceptionsuper.setup(context);FileSystem fs = FileSystem.get(context.getConfiguration(); FileStatus fileList = fs.listStatus(new Path(context.getConfiguration().get(clusterPath); BufferedReader in = null;FSDataInputStream fsi = null;String line = null; for(int i = 0; i fileList.length; i+) if(!fileListi.isDir() fsi = fs.open(fileListi.getPath();in = new BufferedReader(new InputStreamReader(fsi,UTF-8);while(line = in.readLine() != null)System.out.println(read a line: + line);Cluster cluster = new Cluster(line);cluster.setNumOfPoints(0);kClusters.add(cluster); in.close(); fsi.close();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedExceptionInstance instance = new Instance(value.toString();int id;try id = getNearest(instance);if(id = -1)throw new InterruptedException(id = -1);elsecontext.write(value, new IntWritable(id); catch (Exception e) e.printStackTrace();public int getNearest(Instance instance) throws Exceptionint id = -1;double distance = Double.MAX_VALUE;Distance distanceMeasure = new EuclideanDistance();double newDis = 0.0;for(Cluster cluster : kClusters)newDis = distanceMeasure.getDistance(cluster.getCenter().getValue(), instance.getValue();if(newDis distance)id = cluster.getClusterID();distance = newDis;return id;7. KMeansDriver类public class KMeansDriver private int k;private int iterationNum;private String sourcePath;private String outputPath;private Configuration conf;public KMeansDriver(int k, int iterationNum, String sourcePath, String outputPath, Configuration conf)this.k = k;this.iterationNum = iterationNum;this.sourcePath = sourcePath;this.outputPath = outputPath;this.conf = conf;public void clusterCenterJob() throws IOException, InterruptedException, ClassNotFoundExceptionfor(int i = 0;i iterationNum; i+)Job clusterCenterJob = new Job();clusterCenterJob .setJobName(clusterCenterJob + i);clusterCenterJob .setJarByClass(KMeans.class);clusterCenterJob.getConfiguration().set(clusterPath, outputPath + /cluster- + i +/);clusterCenterJob.setMapperClass(KMeans.KMeansMapper.class);clusterCenterJob.setMapOutputKeyClass(IntWritable.class);clusterCenterJob.setMapOutputValueClass(Cluster.class);clusterCenterJob.setCombinerClass(KMeans.KMeansCombiner.class);clusterCenterJob.setReducerClass(KMeans.KMeansReducer .class);clusterCenterJob.setOutputKeyClass(NullWritable.class);clusterCenterJob.setOutputValueClass(Cluster.class);FileInputFormat.addInputPath(clusterCenterJob, new Path(sourcePath);FileOutputFormat.setOutputPath(clusterCenterJob, new Path(outputPath + /cluster- + (i + 1) +/);clusterCenterJob.waitForCompletion(true);System.out.println(finished!);public void KMeansClusterJob() throws IOException, InterruptedException, ClassNotFoundExceptionJob kMeansClusterJob = new Job();kMeansClusterJob.setJobName(KMeansClusterJob)
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 渣土车运营管理办法
- 上下属关系管理办法
- 集体荒地管理办法
- 集中采暖管理办法
- 鳌江流域管理办法
- 电动工器具管理办法
- 煤矿保卫部管理办法
- 高校命名管理办法
- 防疫经费管理办法
- 鸽场养殖管理办法
- 【课件】破茧 逐光-2026届新高三启航主题班会:挑战极限成就梦想(含规划指南、学法指导、心理护航)
- 2025至2030中国GPU芯片行业市场发展现状调研及竞争格局与产业运行态势及投资规划深度研究报告
- 氢能产业协同-洞察及研究
- 三聚氰胺基复合气凝胶:制备工艺、性能表征与应用前景探究
- 2025广西中医药大学赛恩斯新医药学院教师招聘考试试题
- 医院存货盘点管理制度
- 社区干部考试题库及答案
- 学校特殊教师管理制度
- 电玩城制度管理制度
- DB31/T 779-2014学校物业管理服务规范
- 颈椎病试题及答案选择题
评论
0/150
提交评论