博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop/MapReduce 共同好友解决方案:求大量集合的两两交集
阅读量:2490 次
发布时间:2019-05-11

本文共 13776 字,大约阅读时间需要 45 分钟。

共同好友:求大量集合的两两交集目标:令U为包含所有用户的一个集合:{U1,U2,...,Un},我们的目标是为每个(Ui,Uj)对(i!=j)找出共同好友。前提:好友关系是双向的输入:
<,>
< >
< >...
100,200 300 400 500 600200,100 300 400300,100 200 400 500400,100 200 300500,100,300600,100解决方案1:POJO共同好友解决方案令{A1,A2,...,Am}是用户User1的好友集合,{B1,B2,...,B}是用户User2的好友集合。因此User1和User2的共同好友可以定义为两个集合的交集(共同元素)。public static Set
intersection(Set
user1friends,Set
user2friends){ if(user1friends == null || user2friends == null) return null; if(user1friends.isEmpty() || user2friends.isEmpty()) return null; if(user1friends.size() < user2friends.size()) return intersect(user1friends,user2friends); else return intersect(user2friends,user1friends);}public static Set
intersect(Set
small,Set
large){ Set
result = new TreeSet
(); for(Integer x : small)//迭代器处理小集合以提高性能 { if(large.contains(x)) result.add(x); }}解决方案2:Hadoop/MapReduce实现思路:对于100 200 300 400 500 600,生成([100,200],[200 300 400 500 600]),意为用户100和用户200中有一方的好友列表为[200 300 400 500 600]--------(1)([100,300],[200 300 400 500 600]),意为用户100和用户300中有一方的好友列表为[200 300 400 500 600]([100,400],[200 300 400 500 600]),意为用户100和用户400中有一方的好友列表为[200 300 400 500 600]([100,500],[200 300 400 500 600]),意为用户100和用户50中有一方的好友列表为[200 300 400 500 600]([100,600],[200 300 400 500 600]),意为用户100和用户600中有一方的好友列表为[200 300 400 500 600]对于200 100 300 400,生成([100,200],[100 300 400]),意为用户100和用户200中有一方的好友列表为[100 300 400]--------------------------(2)([200,300],[100 300 400]),意为用户200和用户300中有一方的好友列表为[100 300 400]([200,400],[100 300 400]),意为用户200和用户400中有一方的好友列表为[100 300 400]...然后按照键进行规约,例如,(1)和(2)会到达同一个规约器([100,200],([200 300 400 500 600],[100 300 400])只需要求两个集合的交集即可:维护一个
的map,统计各个集合各个元素的出现次数(100,1)(200,1)(300,2)(400,2)(500,1)(600,1)遍历map找出出现2次的键:300 400加入结果的值中,输出([100,200],[300 400])
 
 
实现1:生成类似([100,200],[200 300 400 500 600])的键值对时使用Text保存[200 300 400 500 600]
package commonfriends;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.commons.lang.StringUtils;public class CommonFriendsMapper        extends Mapper
{ private static final Text REDUCER_KEY = new Text(); private static final Text REDUCER_VALUE = new Text(); static String getFriends(String[] tokens) { if (tokens.length == 2) { return ""; } StringBuilder builder = new StringBuilder(); for (int i = 1; i < tokens.length; i++) { builder.append(tokens[i]); if (i < (tokens.length - 1)) { builder.append(","); } } return builder.toString(); } static String buildSortedKey(String person, String friend) { long p = Long.parseLong(person); long f = Long.parseLong(friend); if (p < f) { return person + "," + friend; } else { return friend + "," + person; } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // parse input, delimiter is a single space String[] tokens = StringUtils.split(value.toString(), " "); // create reducer value String friends = getFriends(tokens); REDUCER_VALUE.set(friends); String person = tokens[0]; for (int i = 1; i < tokens.length; i++) { String friend = tokens[i]; String reducerKeyAsString = buildSortedKey(person, friend); REDUCER_KEY.set(reducerKeyAsString); context.write(REDUCER_KEY, REDUCER_VALUE); } }} package commonfriends;import java.util.Map;import java.util.HashMap;import java.util.List;import java.util.ArrayList;import java.util.Iterator;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.commons.lang.StringUtils;public class CommonFriendsReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { System.out.println("key=" + key); Map
map = new HashMap
(); Iterator
iterator = values.iterator(); int numOfValues = 0; while (iterator.hasNext()) { String friends = iterator.next().toString(); System.out.println("friends =" + friends); if (friends.equals("")) { context.write(key, new Text("[]")); return; } addFriends(map, friends); numOfValues++; } // now iterate the map to see how many have numOfValues List
commonFriends = new ArrayList
(); for (Map.Entry
entry : map.entrySet()) { //System.out.println(entry.getKey() + "/" + entry.getValue()); if (entry.getValue() == numOfValues) { commonFriends.add(entry.getKey()); } } // sen it to output context.write(key, new Text(commonFriends.toString())); } static void addFriends(Map
map, String friendsList) { String[] friends = StringUtils.split(friendsList, ","); for (String friend : friends) { Integer count = map.get(friend); if (count == null) { map.put(friend, 1); } else { map.put(friend, ++count); } } }} package commonfriends;import org.apache.log4j.Logger;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;public class CommonFriendsDriver extends Configured implements Tool { private static final Logger theLogger = Logger.getLogger(CommonFriendsDriver.class); @Override public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJobName("CommonFriendsDriver"); // add jars to distributed cache //HadoopUtil.addJarsToDistributedCache(job, "/lib/"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // mapper will generate key as Text (the keys are as (person1,person2)) job.setOutputValueClass(Text.class); // mapper will generate value as Text (list of friends) job.setMapperClass(CommonFriendsMapper.class); job.setReducerClass(CommonFriendsReducer.class); // args[0] = input directory // args[1] = output directory FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean status = job.waitForCompletion(true); theLogger.info("run(): status=" + status); return status ? 0 : 1; } /** * The main driver for word count map/reduce program. Invoke this method to submit the map/reduce job. * * @throws Exception When there is communication problems with the job tracker. */ public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "input/friends.txt"; args[1] = "output/friends1"; // Make sure there are exactly 2 parameters if (args.length != 2) { throw new IllegalArgumentException("usage: Argument 1: input dir, Argument 2: output dir"); } theLogger.info("inputDir=" + args[0]); theLogger.info("outputDir=" + args[1]); int jobStatus = submitJob(args); theLogger.info("jobStatus=" + jobStatus); System.exit(jobStatus); } /** * The main driver for word count map/reduce program. Invoke this method to submit the map/reduce job. * * @throws Exception When there is communication problems with the job tracker. */ public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CommonFriendsDriver(), args); return jobStatus; }}
实现2:生成类似([100,200],[200 300 400 500 600])的键值对时使用ArrayListOfLongsWritable保存[200 300 400 500 600]
 
package commonfriends;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.commons.lang.StringUtils;import edu.umd.cloud9.io.array.ArrayListOfLongsWritable;public class CommonFriendsMapperUsingList        extends Mapper
{ private static final Text REDUCER_KEY = new Text(); static ArrayListOfLongsWritable getFriends(String[] tokens) { if (tokens.length == 2) { return new ArrayListOfLongsWritable(); } ArrayListOfLongsWritable list = new ArrayListOfLongsWritable(); for (int i = 1; i < tokens.length; i++) { list.add(Long.parseLong(tokens[i])); } return list; } static String buildSortedKey(String person, String friend) { long p = Long.parseLong(person); long f = Long.parseLong(friend); if (p < f) { return person + "," + friend; } else { return friend + "," + person; } } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // parse input, delimiter is a single space String[] tokens = StringUtils.split(value.toString(), " "); // create reducer value ArrayListOfLongsWritable friends = getFriends(tokens); String person = tokens[0]; for (int i = 1; i < tokens.length; i++) { String friend = tokens[i]; String reducerKeyAsString = buildSortedKey(person, friend); REDUCER_KEY.set(reducerKeyAsString); context.write(REDUCER_KEY, friends); } }} package commonfriends;import java.util.Map;import java.util.HashMap;import java.util.List;import java.util.ArrayList;import java.util.Iterator;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import edu.umd.cloud9.io.array.ArrayListOfLongsWritable;public class CommonFriendsReducerUsingList extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { // map
where k is userID, and v is the count Map
map = new HashMap
(); Iterator
iterator = values.iterator(); int numOfValues = 0; while (iterator.hasNext()) { ArrayListOfLongsWritable friends = iterator.next(); if (friends == null) { context.write(key, null); return; } addFriends(map, friends); numOfValues++; } // now iterate the map to see how many have numOfValues List
commonFriends = new ArrayList
(); for (Map.Entry
entry : map.entrySet()) { //System.out.println(entry.getKey() + "/" + entry.getValue()); if (entry.getValue() == numOfValues) { commonFriends.add(entry.getKey()); } } // sen it to output context.write(key, new Text(commonFriends.toString())); } static void addFriends(Map
map, ArrayListOfLongsWritable friendsList) { Iterator
iterator = friendsList.iterator(); while (iterator.hasNext()) { long id = iterator.next(); Integer count = map.get(id); if (count == null) { map.put(id, 1); } else { map.put(id, ++count); } } }} package commonfriends;import org.apache.log4j.Logger;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import edu.umd.cloud9.io.array.ArrayListOfLongsWritable;public class CommonFriendsDriverUsingList extends Configured implements Tool { private static Logger theLogger = Logger.getLogger(CommonFriendsDriverUsingList.class); public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJobName("CommonFriendsDriverUsingList"); // add jars to distributed cache //HadoopUtil.addJarsToDistributedCache(job, "/lib/"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // mapper will generate key as Text (the keys are as (person1,person2)) job.setOutputKeyClass(Text.class); // mapper will generate value as ArrayListOfLongsWritable (list of friends) job.setOutputValueClass(ArrayListOfLongsWritable.class); job.setMapperClass(CommonFriendsMapperUsingList.class); job.setReducerClass(CommonFriendsReducerUsingList.class); // args[0] = input directory // args[1] = output directory FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean status = job.waitForCompletion(true); theLogger.info("run(): status="+status); return status ? 0 : 1; } /** * The main driver for word count map/reduce program. * Invoke this method to submit the map/reduce job. * @throws Exception When there is communication problems with the job tracker. */ public static void main(String[] args) throws Exception { // Make sure there are exactly 2 parameters if (args.length != 2) { throw new IllegalArgumentException("usage: Argument 1: input dir, Argument 2: output dir"); } theLogger.info("inputDir="+args[0]); theLogger.info("outputDir="+args[1]); int jobStatus = submitJob(args); theLogger.info("jobStatus="+jobStatus); System.exit(jobStatus); } /** * The main driver for word count map/reduce program. * Invoke this method to submit the map/reduce job. * @throws Exception When there is communication problems with the job tracker. */ public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CommonFriendsDriverUsingList(), args); return jobStatus; }} 结果:100,200 [300, 400]100,300 [200, 400, 500]100,400 [200, 300]100,500 [300]100,600 []200,300 [100, 400]200,400 [100, 300]300,400 [100, 200]300,500 [100]

转载地址:http://xkqrb.baihongyu.com/

你可能感兴趣的文章
f:facet标签 的用法
查看>>
<h:panelgroup>相当于span元素
查看>>
java中append()的方法
查看>>
必学高级SQL语句
查看>>
经典SQL语句大全
查看>>
log日志记录是什么
查看>>
<rich:modelPanel>标签的使用
查看>>
<h:commandLink>和<h:inputLink>的区别
查看>>
<a4j:keeyAlive>的英文介绍
查看>>
关于list对象的转化问题
查看>>
VOPO对象介绍
查看>>
suse创建的虚拟机,修改ip地址
查看>>
linux的挂载的问题,重启后就挂载就没有了
查看>>
docker原始镜像启动容器并创建Apache服务器实现反向代理
查看>>
docker容器秒死的解决办法
查看>>
管理网&业务网的一些笔记
查看>>
openstack报错解决一
查看>>
openstack报错解决二
查看>>
linux source命令
查看>>
openstack报错解决三
查看>>