对于用户兴趣偏好的数据分析工作一般是基于评论数据进行的,如用户出行的评价数据、租房的评价数据或电影的评论数据等。一些网站运营商会基于用户的评论数据挖掘出客户群体对于某种事物或某件事情的看法,以便根据用户的兴趣偏好推荐用户可能感兴趣的产品。其中,对电影的影评进行分析,可以从多维度了解一部电影的质量和受欢迎程度。
常规的数据分析工具在大数据场景下处理数据的效率低下,显然不适用于大数据处理分析。分布式计算框架的出现,为分析处理大数据的计算提供了很好的解决方案。本节将使用Hadoop分布式框架并结合电影评分数据,编写MapReduce程序,实现用户影评分析,从多维度分析用户的观影兴趣偏好,同时帮助读者更好地掌握MapReduce编程操作。
在进行用户观影兴趣偏好的数据分析之前,我们需要了解分析对象,了解数据字段的含义以及数据字段之间的关系。电影网站提供了与用户信息相关的3份数据,分别为用户对电影的评分数据(ratings.dat)、已知性别的用户信息数据(users.dat)以及电影信息数据(movies.dat)。3份数据的介绍说明如下。
1)用户对电影的评分数据ratings.dat包含4个字段,即UserID(用户ID)、MovieID(电影ID)、Rating(评分)和Timestamp(时间戳),如表2-12所示。其中UserID的范围是1~6040,MovieID的范围是1~3952,Rating采用5分好评制度,即最高分为5分,最低分为1分。
表2-12 用户对电影的评分数据
2)已知性别的用户信息数据users.dat包含5个字段,分别为UserID(用户ID)、Gender(性别)、Age(年龄段)、Occupation(职业)和Zip-code(编码),如表2-13所示。其中,Occupation字段表示21种不同的职业类型。
表2-13 已知性别的用户信息数据
3)电影信息数据movies.dat包含2个数据字段,分别为MovieID(电影ID)和Genres(电影类型),如表2-14所示。数据中总共记录了18种电影类型,包括喜剧片、动作片、爱情片等。
表2-14 电影信息部分数据
电影网站提供的3份数据详细记录了每位用户的基本信息及对电影的评论信息。通过对电影网站用户及电影评论数据进行分析,我们可以从不同角度了解用户对电影的喜好偏向。结合MapReduce编程知识,对3份用户影评数据进行统计分析,可以分别从评价次数、性别、年龄段等维度分析用户的观影喜好。具体的统计分析需求如下。
1)计算所有电影的评分次数。
2)按性别和电影分组计算每部电影影评的平均评分。
3)计算某给定电影各年龄段的平均电影评分。
明确数据字段含义及数据分析任务描述之后,可以根据任务需求实施MapReduce编程方案。为方便数据共享,下面将在一个项目中完成2.7.1节所提出的分析需求,再根据不同的分析任务进行任务分析,创建不同的Java类,将每个分析任务分解为若干小的统计任务,分步实现各影评分析任务。
在IDEA中创建一个名为hadoop的maven项目,并配置pom.xml文件,配置内容如代码清单2-33所示。
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>3.1.4</version> </dependency> </dependencies>
配置完成pom.xml文件后,需要单击右侧边栏的Maven按钮,同时单击刷新按钮重新加载所有的Maven项目所需的依赖包,如图2-92所示。
图2-92 加载Maven依赖包
之后,需要将Hadoop中的配置文件core-site.xml和hdfs-site.xml放至hadoop项目的resources目录下,如图2-93所示。
图2-93 core.xml和hdfs.xml配置存放目录
完成工程创建及配置后,我们即可开始编写MapReduce程序实现用户影评分析。
通过电影的评分次数,我们可以直观地看出该部电影的受欢迎程度。评分次数越多,也意味着观众对该部电影的关注度越高。若计算所有电影的评分次数,则需要求出电影ID(Moviesid)及电影评分次数(RateNum),涉及movies.dat和ratings.dat两份数据,因此,需要先将这两份数据进行连接。MapReduce中常用的多表连接的方法有两种,分别是reducejoin()方法和mapjoin()方法。其中,因为reducejoin()方法容易造成数据倾斜,所以对于并行执行的数据文件而言,更常用的是mapjoin()方法。mapjoin()方法在Mapper阶段即可完成数据连接,且一般不会造成数据倾斜,即使发生数据倾斜,倾斜的数据量也很小。
在Mapper阶段,我们需要将movies.dat数据提前加载至各个节点的内存中,在执行map()方法时,通过内连接完成组合。具体的操作过程分为如下两个步骤。
1)实现movies.dat和ratings.dat两份数据的连接。
2)通过连接之后的数据计算所有电影的评分次数。
通过上述两个步骤,即可求出所有电影的评分次数。本节中所定义的所有代码类将分别放至com.cqyti.film.mapreduce和com.cqyti.fim.filmBean两个包下。
(1)连接movies.dat和ratings.dat数据
首先在hadoop项目com.cqyti.film.mapreduce包下定义一个名为Movies_Join_Ratings的类。该类主要完成movies.dat和ratings.dat两份数据的连接,如代码清单2-34所示。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; public class Movies_Join_Ratings { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); // 设置环境参数 job.setJarByClass(Movies_Join_Ratings.class); // 设置整个程序的类名 job.setMapperClass(Movies_Join_Ratings_Mapper.class); // 添加Mapper类 job.setOutputKeyClass(Text.class); // 输出类型 job.setOutputValueClass(NullWritable.class); // 输出类型 Path inputPath = new Path("/Tipdm/Hadoop/MapReduce/ratings.dat"); // ratings.dat输入路径 Path outputPath = new Path("/join/output/"); // ratings和movies连接后的输出路径 if (fs.exists(outputPath)) { // 判断,如果输出路径存在,那么将其删掉 fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.setNumReduceTasks(0); // 无Reduce任务 boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); job.addCacheFile(new URI("hdfs://master:8020/Tipdm/Hadoop/MapReduce/movies.dat")); //movies.dat的读取路径 } public static class Movies_Join_Ratings_Mapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text kout = new Text(); Text valueout = new Text(); // 执行map任务之前提前加载movies.dat,将movies.dat加载到movieMap中 private HashMap<String, String> movieMap = new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileReader fr = new FileReader("/opt/data/Hadoop/NO8/movies.dat"); BufferedReader br = new BufferedReader(fr); String readLine = ""; while ((readLine = br.readLine()) != null) { String[] reads = readLine.split("::"); String movieid = reads[0]; String movietype = reads[1]; movieMap.put(movieid, movietype); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿到一行数据并将其转换成String类型 String line = value.toString().trim(); // 对原数据按::进行切分,可取出各字段信息 String[] reads = line.split("::"); // 提取电影属性:1::1193::5::978300760 String userid = reads[0]; String movieid = reads[1]; int rate = Integer.parseInt(reads[2]); long ts = Long.parseLong(reads[3]); // 通过movieid 在movieMap中获取电影ID和电影类型 String moivetype = movieMap.get(movieid); // 将信息组合输出 String kk = userid + "::" + movieid + "::" + rate + "::" + ts + "::" + moivetype; kout.set(kk); context.write(kout, NullWritable.get()); } } }
在代码清单2-34中,通过Movies_Join_Ratings中的configuration()方法获得程序运行时的参数情况,并将参数存储在String[] Args数组中,随后,通过类Job设置环境参数。首先,设置整个程序的类名为Movies_Join_Ratings(该类中包含两份数据连接的全部实现代码);再添加已经写好的Movies_Join_Ratings_Mapper类。由于本次计算不需要Reduce模块参与,所以并无Reduce类。接着设置整个Hadoop程序的输出类型,即Map输出结果<key,value>和value各自的类型。最后根据程序运行的参数,设置输入和输出路径。Movies_Join_Ratings类是YARN资源调度器的一个客户端,主要功能是将MapReduce程序的Jar包提交给YARN,再将jar包分发到多个NodeManager上执行。
将整个项目打包并上传至Hadoop集群中,通过hadoop jar命令接上jar包名称(MoviesRatesAll.jar)和类名(com.cqyti.film.mapreduce.Movies_Join_Ratings),并按“Enter”键执行该MapReduce程序,即可在HDFS的/join/output目录下生成part-m-00000文件。part-m-00000文件存放的是movies.dat和ratings.dat两份数据连接后的结果,如图2-94所示。由于数据量比较大,所以这里仅读取前10行数据进行展示。
图2-94 movies和ratings两份数据连接后的结果
在图2-94所示的结果中,每行数据的各字段属性分别是用户ID、电影ID、评分、时间戳和电影类型。该结果已保存至HDFS,后续将以此文件为基础计算所有电影的评分次数。
(2)计算所有电影的评分次数
完成movies.dat和ratings.dat两份数据的连接后,计算所有电影的评分次数。首先,创建一个名为MoviesRatesAll的类,计算所有电影的评分次数,如代码清单2-35所示。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MoviesRatesAll { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(MoviesRatesAll.class); job.setMapperClass(MovieRatesAll_Mapper.class); job.setReducerClass(MovieRatesAll_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Path inputPath = new Path("/join/output/"); //将movies.dat和rating.dat连接后的结果目录作为输出目录 Path outputPath = new Path("/join/outputAll/"); // 输出所有电影的评分次数到该目录下 if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class MovieRatesAll_Mapper extends Mapper<LongWritable, Text, Text, Text> { Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("::"); // 用户id::电影id::评分::时间戳::电影类型 // 1::1193::5::978300760::One Flew Over the Cuckoo's Nest (1975)::Drama String kk = reads[1]; // 获取Movieid作为key输出 String vv = reads[4]; // 获取电影类型作为value值输出 kout.set(kk); valueout.set(vv); context.write(kout, valueout); } } // 根据map阶段的结果<k:v>统计value的次数,存入rateNum中,即为某一电影的评分次数 public static class MovieRatesAll_Reducer extends Reducer<Text, Text, Text, NullWritable> { Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int rateNum = 0; String moiveType = ""; for(Text text : values){ rateNum++; moiveType = text.toString(); } String kk = key.toString() + "\t" + moiveType + "\t" + rateNum; kout.set(kk); context.write(kout, NullWritable.get()); } } }
在代码清单2-35中,MoviesRatesAll类的main()方法的所有配置基本与代码清单8-2中的配置保持一致,不同的是需要将/join/output/目录作为本次计算的输入路径,同时将计算结果保存至/join/outputAll/目录下。数据输出目录将自动创建。
将项目打成jar包上传至集群,最后使用hadoop jar MoviesRatesAll.jar com.cqyti.film.mapreduce.MoviesRatesAll命令将MapReduce程序提交至集群中运行。运行完成后即可在HDFS的/join/outputAll/目录下生成part-r-00000文件。part-r-00000文件中保存的内容即为所有电影的评分次数。在Shell中通过hdfs dfs -cat /join/outputAll/part-r-00000 | head -10命令即可查看前10条电影的评分次数,如图2-95所示。
图2-95 所有电影的评分次数
在图2-95所示的结果中,各字段分别为电影ID、电影类型和电影评分次数(该字段为新生成的数据),并按照电影ID进行升序排序。
由于男女在观影喜好上可能会有所差别,所以在向用户推荐电影时,我们也可以根据不同性别的大众观影喜好向用户推荐相关电影。有关用户性别的信息在users.dat数据中,因此,按性别和电影分组统计每部电影影评的平均评分,需要连接movies.dat、ratings.dat、users.dat这3份数据,再将连接后的结果依据性别和电影ID进行分组,分别计算不同组内每部电影的总评分,并除以每部电影的评分次数,即每部电影的平均评分。根据需求,具体的操作过程分为如下两个步骤。
1)创建类,实现movies.dat、ratings.dat、users.dat数据的连接。
2)将连接好的数据根据性别和电影ID进行分组,并计算组内每部电影的平均评分。
通过上述两个步骤,即可按性别和电影分组计算每部电影的平均评分。
(1)连接movies.dat、users.dat、ratings.dat数据
首先创建一个MapjoinThreeTables类,实现3份数据连接。该类中的代码基本与8.2.1节中的两份数据连接的代码相似,均无reduce任务。在编写MapReduce程序前,我们同样需要将movies.dat、users.dat和ratings.dat的文件提前加载至每个节点的内存中,如代码清单2-36所示。
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapjoinThreeTables { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(MapjoinThreeTables.class); job.setMapperClass(MapjoinThreeTables_Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); Path inputPath = new Path("/Tipdm/Hadoop/MapReduce/ratings.dat"); // ratings.dat输入路径 Path outputPath = new Path("/join/outPutMapjoinThreeTables/"); // 结果输出路径,无须创建,将自动生成 job.setNumReduceTasks(0); //无reduce任务 if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); job.addCacheFile(new URI("hdfs://master:8020/Tipdm/Hadoop/MapReduce/movies.dat")); // 需提前加载至内存的movies.dat job.addCacheFile(new URI("hdfs://master:8020/Tipdm/Hadoop/MapReduce/users.dat")); // users.dat的输入路径 } public static class MapjoinThreeTables_Mapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text kout = new Text(); Text valueout = new Text(); private static HashMap<String, String> moviemap = new HashMap<String, String>(); private static HashMap<String, String> usersmap = new HashMap<String, String>(); @SuppressWarnings("deprecation") @Override protected void setup(Context context) throws IOException, InterruptedException { // 1::Toy Story (1995)::Animation|Children's|Comedy // 通过地址读取电影数据 FileReader fr1 = new FileReader("/opt/data/Hadoop/NO8/movies.dat"); BufferedReader bf1 = new BufferedReader(fr1); String stringLine = null; while ((stringLine = bf1.readLine()) != null) { String[] reads = stringLine.split("::"); String movieid = reads[0]; String movieInfo = reads[1]; moviemap.put(movieid, movieInfo); } // 1::F::1::10::48067 // 通过地址读取用户数据 FileReader fr2 = new FileReader("/opt/data/Hadoop/NO8/users.dat"); BufferedReader bf2 = new BufferedReader(fr2); String stringLine2 = null; while ((stringLine2 = bf2.readLine()) != null) { String[] reads = stringLine2.split("::"); String userid = reads[0]; String userInfo = reads[1] + "::" + reads[2] + "::" + reads[3] + "::" + reads[4]; usersmap.put(userid, userInfo); } // 关闭资源 IOUtils.closeStream(bf1); IOUtils.closeStream(bf2); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] reads1 = value.toString().trim().split("::"); // 1::1193::5::978300760 :用户ID、电影ID、评分、评分时间戳 // 通过电影ID和用户ID在对应的map中获取信息,ratings不存在空信息,如果存在空信息,那么需要进行map.contain判断 String struser = usersmap.get(reads1[0]); String strmovie = moviemap.get(reads1[1]); // 进行多表连接,数据格式为userid,movieId,rate,ts,sex,age,occupation,zipcode,movieType String[] userinfo = struser.split("::");//sex, age, occupation, zipcode String kk = reads1[0] + "::" + reads1[1] + "::" + reads1[2] + "::" + reads1[3] + "::" + userinfo[0] + "::" + userinfo[1] + "::" + userinfo[2] + "::" + userinfo[3] + "::" + strmovie; kout.set(kk); context.write(kout, NullWritable.get()); } } }
在代码清单2-36中,根据ratings.dat数据中的电影ID和用户ID完成3份数据的连接,其中ratings.dat作为主数据文件与其他两份数据进行左连接,即可获得全面的用户观影信息。
打包并提交MapReduce程序至Hadoop集群运行,最终连接结果将保存至/join/outPutMapjoinThreeTables/目录下的part-m-00000文件中。
在Shell中通过hdfs dfs -cat /join/outPutMapjoinThreeTables/part-m-00000 | head -10命令可查看前10条记录,如图2-96所示。
图2-96 连接movies.dat、users.dat、ratings.dat
在图2-96中,每条记录的字段信息分别是用户ID、电影ID、评分、时间戳、性别、年龄段、职业、邮政编码和电影类型。至此,我们完成了3份数据的连接,并为后续的影评分析提供了一份完整的数据文件。
(2)按性别和电影分组计算每部电影的平均评分
创建一个MoviesRatesAllGroupByGender类,该类中主要完成两个计算过程,一是按性别和电影进行分组,二是分别在组内计算每部电影的平均评分,具体的计算过程是以组内每部电影的总评分除以每部电影的评分次数,得到每部电影的平均评分。实现过程如代码清单2-37所示。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MoviesRatesAllGroupByGender { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(MoviesRatesAllGroupByGender.class); job.setMapperClass(MoviesRatesAllGroupByGender_Mapper.class); job.setReducerClass(MoviesRatesAllGroupByGender_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path inputPath = new Path("/join/outPutMapjoinThreeTables/"); Path outputPath = new Path("/join/outPutMoviesRatesAllGroupByGender/"); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class MoviesRatesAllGroupByGender_Mapper extends Mapper<LongWritable, Text, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("::"); // 1::1193::5::978300760::F::1::10::48067::Drama // 性别、电影ID、评分 String sex = reads[4]; String mID = reads[1]; int rate = Integer.parseInt(reads[2]); // 每部电影的评分:组内每部电影的总评分/每部电影的评分次数 // 按照性别和电影ID进行分组 String kk = sex + "\t" +mID; String vv = reads[2]; kout.set(kk); valueout.set(vv); context.write(kout, valueout); } } public static class MoviesRatesAllGroupByGender_Reducer extends Reducer<Text, Text, Text, DoubleWritable>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int totalRate = 0; // 初始化总评分为0 int rateNum = 0; // 初始化总评分次数为0 double avgRate = 0; // 初始化每部电影的平均评分为0 for(Text text : values){ // 计算每部电影的总评分及评分次数 int rate = Integer.parseInt(text.toString()); totalRate += rate; rateNum ++; } avgRate = 1.0 * totalRate / rateNum; // 计算每部电影的平均评分 DoubleWritable vv = new DoubleWritable(avgRate); context.write(key, vv); } } }
在代码清单2-37中,Map阶段的map()方法主要以用户性别和电影ID作为key,将对应的rate(评分)作为value输出,并将中间结果传输至Reducer端。Reduce阶段的reduce()方法根据Map阶段的<k,v>键值对数据统计v(值)结果,最终将结果输出保存至/join/outPutMoviesRatesAllGroupByGender/目录下。
打包并提交MapReduce程序至Hadoop集群运行,即可将最终结果保存至/join/outPutMoviesRatesAllGroupByGender/目录的part-r-00000文件中。part-r-0000文件中以性别分组保存了所有电影的平均评分。由于文件内容较多,且每条记录是以性别进行排序,所以分别使用hdfs dfs -cat /join/outPutMoviesRatesAllGroupByGender/part-r-00000 | head -10命令和hdfs dfs -cat /join/outPutMoviesRatesAllGroupByGender/part-r-00000 | tail -10命令查看part-r-00000文件中的前10条和后10条记录,即性别为女性(F)的分组对所有电影的平均评分和性别为男性(M)的分组对所有电影的平均评分,分别如图2-97和图2-98所示。
图2-97 性别为F组中所有电影的平均评分
图2-98 性别为M组中所有电影的平均评分
从图2-97和图2-98所示的结果可以看到,因为电影的平均评分还未进行降序排序,所以我们暂时无法判别不同性别的观影喜好是否有比较大的差异,还需进行进一步的处理,即根据电影的平均评分进行降序排序。
根据users.dat中数据的描述信息得知,字段Age并不是用户的真实年龄,而是年龄段。查看users.dat中的年龄段,该文件中Age的取值共7个,分别为0、1、2、3、4、5、6,分别表示7个年龄段,对应关系如表2-15所示。
表2-15 Age年龄组及其说明
要计算某给定电影各年龄段的平均电影评分,需要确定计算的电影ID。在前面小节中我们已经对电影的评分次数进行了计算,发现电影ID为2858的电影的评分次数最多,该部电影的用户年龄分布可能相对较广,因此确定分析电影ID为2858的电影的各年龄段的平均评分。
定义一个MoviesAvgScore_GroupByAge类,将movies.dat、ratings.dat、users.dat这3份数据进行连接后的数据作为输入,同时在map()方法中以电影ID为2858作为筛选条件过滤其他电影。由于要分析各年龄段的影评,所以需要将Age作为map()方法的key(键),将电影评分和电影ID作为value(值),再将Map阶段的中间结果传输至Reduce端,在Reduce端中得到不同年龄段用户对指定电影的平均评分。具体实现如代码清单2-38所示。
import java.io.IOException; import java.text.DecimalFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MoviesAvgScore_GroupByAge { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(MoviesAvgScore_GroupByAge.class); job.setMapperClass(MovieAvgScore_GroupByAge_Mapper.class); job.setReducerClass(MovieAvgScore_GroupByAge_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/join/outPutMapjoinThreeTables/"); // 以3表连接的输出路径作为本次任务的输入路径 Path outputPath = new Path("/join/MoviesAvgScore_GroupByAge"); // 设置输出路径 if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class MovieAvgScore_GroupByAge_Mapper extends Mapper<LongWritable, Text, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); // 求movieid = 2858这部电影各年龄段的平均影评 // userid, movieId, rate, ts, gender, age, occupation, zipcode, movieType // 用户ID、电影ID、评分、评分时间戳、性别、年龄、职业、邮政编码、电影类型 @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String [] reads = value.toString().trim().split("::"); String movieid = reads[1]; String age = reads[5]; String rate = reads[2]; if (movieid.equals("2858")) { // 判断电影ID是否为2858,进行过滤 kout.set(age); // 输出k值为age valueout.set(rate + "\t" + movieid); // v值为电影评分和电影ID context.write(kout, valueout); // 输出到Reduce端 } } } public static class MovieAvgScore_GroupByAge_Reducer extends Reducer<Text, Text, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int totalRate = 0; // 初始化电影总评分 int rateNum = 0; // 初始化电影评论次数 double avgRate = 0; // 初始化平均评分 String movieid = ""; for(Text text : values){ String[] reads = text.toString().split("\t"); totalRate += Integer.parseInt(reads[0]); rateNum ++; // 累加评分次数 movieid = reads[1]; // 验证一下 } avgRate = 1.0 * totalRate / rateNum; // 计算电影平均评分 DecimalFormat df = new DecimalFormat("#.#"); // 设置评分格式 String string = df.format(avgRate); String vv = string + "\t" +movieid; // 将电影平均评分与电影ID连接 valueout.set(vv); context.write(key, valueout); } } }
在代码清单2-38中,通过map()方法与reduce()方法可计算出电影ID为2858的电影各年龄段的平均评分。打包并提交项目至Hadoop集群运行,即可在HDFS的/join/MoviesAvgScore_GroupByAge/目录下生成part-r-00000文件。在Shell中通过hdfs dfs -cat/join/MoviesAvgScore_GroupByAge/part-r-00000命令可查看最终结果,如图2-99所示。
图2-99 各年龄段用户对ID为2858的电影的平均评分
由图2-99的结果可以看出,随着年龄的增长,用户对该电影的评分逐渐降低。这可能是因为随着用户年龄的增长,用户的观影阅历越多,思考会更加深入,所以对电影的评分会更加严谨。但从总体而言,各年龄段用户对这部电影的评分均在4以上,因此这部电影是比较受大众欢迎的。