基于Hadoop框架实现的对历年四级单词的词频分析(入门级Hadoop项目)
前情提要:飞物作者屡次四级考试未能通过,进而恼羞成怒,制作了基于Hadoop实现的对历年四级单词的词频分析项目,希望督促自己尽快通过四级(然而并没有什么卵用)
项目需求:Pycharm、IDEA、Linux、Hadoop运行环境、Hive、beeline、八爪鱼采集器
数据来源:https://zhenti.burningvocabulary.cn/cet4
“如果你想要数据,就得自己来拿,这规矩你早就懂得” ——某V姓男子
一、 数据采集
1.从目标网站上获取所需要的网址
用来获取数据的网站是一个由主界面指向各个题目页面的分支结构,所以需要使用Python爬虫从主界面获取每一个题目页面的网址
# 从该四级真题主网站上获取各个具体题目页面的链接网址
import re
import requests
# 防止爬虫被拦截
header = {
'User-Agent': 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36'
}
httpsGet = requests.get("https://zhenti.burningvocabulary.cn/cet4", headers=header) # 爬取整个页面
httpsTxt = open(r"E:\ShuJu\SiJiDanCi\AllWWW.txt", 'w', encoding='utf-8') # 创建储存网站信息的txt文件(真正的英语大佬都在使用拼音来命名文件夹)
httpsTxt.write(httpsGet.text) # 将爬取信息存入txt
httpsTxt.close() # 关闭并保存文件
a = open(r"E:\ShuJu\SiJiDanCi\AllWWW.txt", encoding='utf-8').read() # a为主网页的信息
httpsAns = re.findall(
r'<a class="link-primary" href="(.*?)"><div class="card">', a, # 正则表达式查找各页面网址
re.S)
httpsTxt.close() # 关闭存有主网站信息的txt
print(httpsAns) # 输出正则表达式的查找结果
with open(r"E:\ShuJu\SiJiDanCi\cet4WWW.txt", 'w', encoding="UTF8") as f: # 将结果存入文件
for item in httpsAns:
f.write(str(item) + "\t")
f.close()
获得了每一期题目的页面地址
2.使用八爪鱼采集器获取题目数据
(飞物作者用自己的爬虫爬了一天都没能拿到数据,只好借助外力)
点击新建,自定义任务,把需要爬取的网址全部复制粘贴过去,保存设置,然后在页面中点击你需要的文本,然后点击采集按钮就可以启动采集了
采集完成后获得一个存有历年所有英语题目的csv格式文件shuju.csv
之后对该csv文件进行数据处理
二、数据处理
使用Python将存有所有英语题目中的无用数据剔除,获得仅存有所有单词的文本数据
# 将csv格式的文件转换为txt格式
import pandas as pd
df = pd.read_csv(r"E:\ShuJu\SiJiDanCi\shuju.csv", index_col=0)
print(df)
df.to_csv(r"E:\ShuJu\SiJiDanCi\shuju.txt", sep='\t', index=False)
#清洗无用数据
import re
with open(r"E:\ShuJu\SiJiDanCi\shuju.txt", "r", encoding="UTF8") as f: # 读入原始数据
data = f.read()
f.close()
str = re.sub(r"[^a-zA-Z]+", " ", data) # 将非字母型的字符全部替换为空格
str = str.lower() # 将大写字母替换为小写字母
print(str) # 将数据处理情况显示在控制台,查看效果
with open(r"E:\ShuJu\SiJiDanCi\sijiShaitext.txt", 'w', encoding="UTF8") as f: # 将数据写出
f.write(str)
f.close()
最终效果长这个模样
三、Hadoop计算
作者在这里使用三台Linux虚拟机搭建的完全分布式Hadoop集群来进行计算
1.词频统计
首先是大家耳熟能详的wordcount计数 : map和reduce操作
打开IDEA,复制粘贴代码然后导包
Driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiShaitext.txt"));
FileOutputFormat.setOutputPath(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiOUT.txt"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Mapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text outK = new Text();
IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//map被循环遍历,读取文件中的每一行字符串,重写map方法
//1、获取一行存入字符串line
String line = value.toString();
//2、切割存入名为words的字符串数组
String[] words = line.split(" ");//以空格为分界线拆分单词
//3、循环写出,读取words中的各个字符串
for (String word : words) {//从words数组中读出的字符串word
//封装outK
outK.set(word);
//写出
context.write(outK, outV);
}
}
}
Reducer
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable value = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();//数据类型切换为int
}
// 2 输出
value.set(sum);
context.write(key, value);
}
}
这样就得到了初步的词频统计结果,默认是按照字母表顺序来排列的
接下来进行一个排序,可以更直观的看出单词的出现情况
(作为一个懒人肯定要挑出现次数高的单词来背)
2.按照单词出现次数递减排序
FlowBean
public class FlowBean implements WritableComparable<FlowBean> {
private long num; //每个单词的个数
//提供无参构造
public FlowBean() {
}
public long getnum() {
return num;
}
public void setnum(long num) {
this.num = num;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.num);
}
@Override
public void readFields(DataInput in) throws IOException {
this.num = in.readLong();
}
@Override
public String toString() {
return String.valueOf(num);
}
@Override
public int compareTo(FlowBean o) { //按照单词个数进行递减排序
if (this.num > o.num) {
return -1;
} else if (this.num < o.num) {
return 1;
} else {
return 0;
}
}
}
Driver
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出数据的KV类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiOUT.txt\\part-r-00000"));
FileOutputFormat.setOutputPath(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiOUTCompare"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
Mapper
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
try {
outK.setnum(Long.parseLong(split[1].trim()));
} catch (ArrayIndexOutOfBoundsException e) {
outK.setnum(Long.parseLong(split[0].trim()));
}
outV.set(split[0]);
//4 写出outK outV
context.write(outK, outV);
}
}
Reducer
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
这样就得到了我们想要的数据
四、Hive存储
接下来将数据存入Hive表格便于查询
首先把上面处理好的单词文件上传至HDFS中的/test文件夹中
然后在HDFS上创建一个表来存入数据,位置在/test/WordsData
使用Linux上的beeline客户端连接Hive,输入建表语句,word列存单词,num列存对应出现次数
CREATE EXTERNAL TABLE Words
(
word STRING,
num INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/test/WordsData';
将文本文件中的单词数据写入Hive表格
LOAD DATA INPATH '/test/part-r-00000' OVERWRITE INTO TABLE Words;
接下来就可以使用SQL语句愉快的查询各种单词的数据了
select * from Words where num>100;
想必这些操作对大家来说简直有手就行
点我跳转 Ciallo~(∠・ω< )⌒★