基于Hadoop框架实现的对历年四级单词的词频分析(入门级Hadoop项目)

前情提要:飞物作者屡次四级考试未能通过,进而恼羞成怒,制作了基于Hadoop实现的对历年四级单词的词频分析项目,希望督促自己尽快通过四级(然而并没有什么卵用)


项目需求:Pycharm、IDEA、Linux、Hadoop运行环境、Hive、beeline、八爪鱼采集器
数据来源:https://zhenti.burningvocabulary.cn/cet4

“如果你想要数据,就得自己来拿,这规矩你早就懂得” ——某V姓男子

一、 数据采集

1.从目标网站上获取所需要的网址

用来获取数据的网站是一个由主界面指向各个题目页面的分支结构,所以需要使用Python爬虫从主界面获取每一个题目页面的网址

# 从该四级真题主网站上获取各个具体题目页面的链接网址
import re
import requests

# 防止爬虫被拦截
header = {
    'User-Agent': 'User-Agent:  Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Safari/537.36'
}

httpsGet = requests.get("https://zhenti.burningvocabulary.cn/cet4", headers=header)  # 爬取整个页面
httpsTxt = open(r"E:\ShuJu\SiJiDanCi\AllWWW.txt", 'w', encoding='utf-8')  # 创建储存网站信息的txt文件(真正的英语大佬都在使用拼音来命名文件夹)
httpsTxt.write(httpsGet.text)  # 将爬取信息存入txt
httpsTxt.close()  # 关闭并保存文件
a = open(r"E:\ShuJu\SiJiDanCi\AllWWW.txt", encoding='utf-8').read()  # a为主网页的信息
httpsAns = re.findall(
    r'<a class="link-primary" href="(.*?)"><div class="card">', a,  # 正则表达式查找各页面网址
    re.S)
httpsTxt.close()  # 关闭存有主网站信息的txt
print(httpsAns)  # 输出正则表达式的查找结果

with open(r"E:\ShuJu\SiJiDanCi\cet4WWW.txt", 'w', encoding="UTF8") as f:  # 将结果存入文件
    for item in httpsAns:
        f.write(str(item) + "\t")
f.close()

获得了每一期题目的页面地址


2.使用八爪鱼采集器获取题目数据

(飞物作者用自己的爬虫爬了一天都没能拿到数据,只好借助外力)

点击新建,自定义任务,把需要爬取的网址全部复制粘贴过去,保存设置,然后在页面中点击你需要的文本,然后点击采集按钮就可以启动采集了

采集完成后获得一个存有历年所有英语题目的csv格式文件shuju.csv
之后对该csv文件进行数据处理

二、数据处理

使用Python将存有所有英语题目中的无用数据剔除,获得仅存有所有单词的文本数据

# 将csv格式的文件转换为txt格式
import pandas as pd

df = pd.read_csv(r"E:\ShuJu\SiJiDanCi\shuju.csv", index_col=0)
print(df)
df.to_csv(r"E:\ShuJu\SiJiDanCi\shuju.txt", sep='\t', index=False)
#清洗无用数据
import re

with open(r"E:\ShuJu\SiJiDanCi\shuju.txt", "r", encoding="UTF8") as f:  # 读入原始数据
    data = f.read()
f.close()
str = re.sub(r"[^a-zA-Z]+", " ", data)  # 将非字母型的字符全部替换为空格
str = str.lower()  # 将大写字母替换为小写字母
print(str)  # 将数据处理情况显示在控制台,查看效果
with open(r"E:\ShuJu\SiJiDanCi\sijiShaitext.txt", 'w', encoding="UTF8") as f:  # 将数据写出
    f.write(str)
f.close()

最终效果长这个模样

三、Hadoop计算


作者在这里使用三台Linux虚拟机搭建的完全分布式Hadoop集群来进行计算

1.词频统计

首先是大家耳熟能详的wordcount计数 : map和reduce操作
打开IDEA,复制粘贴代码然后导包

Driver

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 关联本Driver程序的jar
        job.setJarByClass(WordCountDriver.class);
        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiShaitext.txt"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiOUT.txt"));
        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Mapper

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text outK = new Text();
    IntWritable outV = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //map被循环遍历,读取文件中的每一行字符串,重写map方法
        //1、获取一行存入字符串line
        String line = value.toString();
        //2、切割存入名为words的字符串数组
        String[] words = line.split(" ");//以空格为分界线拆分单词
        //3、循环写出,读取words中的各个字符串
        for (String word : words) {//从words数组中读出的字符串word
            //封装outK
            outK.set(word);
            //写出
            context.write(outK, outV);
        }
    }
}

Reducer

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    int sum;
    IntWritable value = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();//数据类型切换为int
        }
        // 2 输出
        value.set(sum);
        context.write(key, value);
    }
}

这样就得到了初步的词频统计结果,默认是按照字母表顺序来排列的
接下来进行一个排序,可以更直观的看出单词的出现情况
(作为一个懒人肯定要挑出现次数高的单词来背)

2.按照单词出现次数递减排序

FlowBean

public class FlowBean implements WritableComparable<FlowBean> {
    private long num; //每个单词的个数
    //提供无参构造
    public FlowBean() {
    }
    public long getnum() {
        return num;
    }
    public void setnum(long num) {
        this.num = num;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.num);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.num = in.readLong();
    }
    @Override
    public String toString() {
        return String.valueOf(num);
    }
    @Override
    public int compareTo(FlowBean o) { //按照单词个数进行递减排序
        if (this.num > o.num) {
            return -1;
        } else if (this.num < o.num) {
            return 1;
        } else {
            return 0;
        }
    }
}

Driver

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2 关联本Driver类
        job.setJarByClass(FlowDriver.class);
        //3 关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //4 设置Map端输出数据的KV类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        //5 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiOUT.txt\\part-r-00000"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\ShuJu\\SiJiDanCi\\sijiOUTCompare"));
        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

Mapper

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split("\t");
        try {
            outK.setnum(Long.parseLong(split[1].trim()));
        } catch (ArrayIndexOutOfBoundsException e) {
            outK.setnum(Long.parseLong(split[0].trim()));
        }
        outV.set(split[0]);
        //4 写出outK outV
        context.write(outK, outV);
    }
}

Reducer

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value, key);
        }
    }
}

这样就得到了我们想要的数据

四、Hive存储

接下来将数据存入Hive表格便于查询
首先把上面处理好的单词文件上传至HDFS中的/test文件夹中
然后在HDFS上创建一个表来存入数据,位置在/test/WordsData

使用Linux上的beeline客户端连接Hive,输入建表语句,word列存单词,num列存对应出现次数

CREATE EXTERNAL TABLE Words
(
  word  STRING,
  num   INT
)
ROW FORMAT DELIMITED              
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE            
LOCATION '/test/WordsData'; 

将文本文件中的单词数据写入Hive表格

LOAD DATA INPATH '/test/part-r-00000' OVERWRITE INTO TABLE Words;

接下来就可以使用SQL语句愉快的查询各种单词的数据了

select * from Words where num>100;


想必这些操作对大家来说简直有手就行
点我跳转 Ciallo~(∠・ω< )⌒★

热门相关:误踩老公底线:甜心难招架!   重生不嫁豪门   闪婚总裁很惧内   前夫有毒:1000万夺子契约   灭世魔帝