国科大大数据系统与大规模数据分析第一次作业(hash distinct)

一、作业内容

  1. 从HDFS中读出数据

  2. 对读出的数据进行hash去重

  3. 将处理好的数据存入Hbase

在这里插入图片描述

二、作业代码

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;

import org.apache.log4j.*;
/**
 * @ClassName Hw1Grp4
 * @Description: hw1 code
 * @Author: cgg
 * @CreateDate: 2022/3/21 11:22
 * @UpdateUser: cgg
 * @UpdateDate: 2022/3/25 10:22
 * @UpdateRemark: info
 * @Version: 1.1
 */
@SuppressWarnings("all")
public class Hw1Grp4 {

    public static ArrayList<String> TestData_List = null;//raw data
    public static ArrayList<String> DataAfterSelect = null;//Data After Select
    public static String FilePath = null;  //File path
    public static int SelectRow = 0;//Columns to select
    public static String SelectCondition = null; //Select Condition
    public static String SelectValue = null;//Condition value
    public static ArrayList<Integer> DistinctRow = null;//Columns to be redone
    public static LinkedHashMap<Integer, String> linkedHashMap = null; //Columns to be redone

    /**
     * @auther: cgg
     * @Description //TODO 	Read the data according to the parameters passed in the main function
     * @param: [args]  parameters passed in the main function
     * @return: void
     * @date: 2022/4/1 14:09
     */
    public static void Read_Data(String[] args) throws IOException {
        //Read Data
        TestData_List = new ArrayList<String>();
        if (args.length <= 0) {
			System.out.println("Usage: HDFSTest <hdfs-file-path>");
			System.exit(1);
		}

		String file = FilePath;

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(file), conf);
        Path path = new Path(file);
        FSDataInputStream in_stream = fs.open(path);

        BufferedReader in = new BufferedReader(new InputStreamReader(in_stream));
        String s;
        while ((s=in.readLine())!=null) {
             TestData_List.add(s);
        }
        in.close();
        fs.close();
    }
    /**
     * @auther: cgg
     * @Description //TODO  Initialize to get the file name, selected column,selection condition and column to be de duplicated
     * @param: [args]   parameters passed in the main function
     * @return: void
     * @date: 2022/4/1 14:17
     */
    public static void init(String[] args) {
        String arg0 = args[0];
        String arg1 = args[1];
        String arg2 = args[2];

        //regular expression  arg0
        String regStr = "^R=(.*)$";
        Pattern pattern = Pattern.compile(regStr);
        Matcher matcher = pattern.matcher(arg0);
        while (matcher.find()) {
            FilePath = matcher.group(1);
            break;
        }
        //regular expression  arg1
        regStr = "^select:(.*)$";
        pattern = Pattern.compile(regStr);
        matcher = pattern.matcher(arg1);
        while (matcher.find()) {
            arg1 = matcher.group(1);
            break;
        }
        String[] temp = arg1.split(",");
        SelectRow = Integer.parseInt(temp[0].substring(temp[0].length() - 1));
        SelectCondition = temp[1];
        SelectValue = temp[2];

        //regular expression  arg2
        regStr = "^distinct:(.*)$";
        pattern = Pattern.compile(regStr);
        matcher = pattern.matcher(arg2);
        while (matcher.find()) {
            arg2 = matcher.group(1);
            break;
        }
        temp = arg2.split(",");
        DistinctRow = new ArrayList<Integer>();
        for (int i = 0; i < temp.length; i++) {
            DistinctRow.add(Integer.parseInt(temp[i].substring(temp[i].length() - 1)));
        }
    }
    /**
     * @auther: cgg
     * @Description //TODO 	Robustness test, if the input parameters are irregular, exit the program
     * @param: []
     * @return: void
     * @date: 2022/4/1 14:21
     */
    public static void Robustness() {
        if (FilePath == null || FilePath == "") System.exit(1);
        if (SelectRow < 0 || SelectRow >= TestData_List.get(0).split("\\|").length) System.exit(1);
        if (!SelectCondition.equals("gt") && !SelectCondition.equals("ge") && !SelectCondition.equals("eq")
                && !SelectCondition.equals("ne") && !SelectCondition.equals("le") && !SelectCondition.equals("lt")) {
            System.exit(1);
        }
        for (int i = 0; i < DistinctRow.size(); i++) {
            if (DistinctRow.get(i) < 0 || DistinctRow.get(i) >= TestData_List.get(0).split("\\|").length) {
                System.exit(1);
            }
        }
    }
    /**
     * @auther: cgg
     * @Description //TODO 	Judge whether the data in this column meets the filtering conditions
     * @param: [content, condition, value]	content:Content value to compare; condition:Conditions to compare; value: Comparison value
     * @return: boolean
     * @date: 2022/4/1 14:22
     */
    public static boolean IsSelectData(String content, String condition, String value) {
        switch (condition) {
            case "gt"://>
                return Double.parseDouble(content) > Double.parseDouble(value);
            case "ge"://>=
                return Double.parseDouble(content) >= Double.parseDouble(value);
            case "eq"://==
                return content.equals(value);
            case "ne"://!=
                return !content.equals(value);
            case "le"://<=
                return Double.parseDouble(content) <= Double.parseDouble(value);
            case "lt"://<
                return Double.parseDouble(content) < Double.parseDouble(value);
            default:
                System.out.println("Illegal instruction!!");
                break;
        }
        return false;
    }
    /**
     * @auther: cgg
     * @Description //TODO 	//Select:The filtered data is put into a list
     * @param: []
     * @return: void
     * @date: 2022/4/1 14:25
     */
    public static void Select() {
        DataAfterSelect = new ArrayList<String>();
        for (Object data : TestData_List) {
            String[] temp = ((String) data).split("\\|");
            if (IsSelectData(temp[SelectRow], SelectCondition, SelectValue)) {
                String TempDataStr = "";
                for (int i = 0; i < DistinctRow.size(); i++) {
                    if (i == 0) TempDataStr += temp[(int) DistinctRow.get(i)];
                    else TempDataStr += "|" + temp[(int) DistinctRow.get(i)];
                }
                DataAfterSelect.add(TempDataStr);
            }
        }
    }
    /**
     * @auther: cgg
     * @Description //TODO 	//De duplication of filtered data
     * @param: []
     * @return: void
     * @date: 2022/4/1 14:26
     */
    public static void Distinct() {
        linkedHashMap = new LinkedHashMap<Integer, String>();
        for (Object o : DataAfterSelect) {
            String data = (String) o;
            linkedHashMap.put(data.hashCode(), data);
        }
    }
    /**
     * @auther: cgg
     * @Description //TODO 	//get anser print
     * @param: []
     * @return: void
     * @date: 2022/4/1 14:27
     */
    public static void GetAnser() {
        int cnt = 0;
        Iterator it = linkedHashMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String[] temp = ((String) entry.getValue()).split("\\|");
            for (int i = 0; i < temp.length; i++) {
                System.out.print("(row key=" + cnt + ",res:R" + DistinctRow.get(i) + "=" + temp[i] + ")");
            }
            System.out.println();
            cnt++;
        }
    }
    /**
     * @auther: cgg
     * @Description //TODO 	//write in Hbase
     * @param: []
     * @return: void
     * @date: 2022/4/1 14:29
     */
    public static void WritertoHbase() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
        Logger.getRootLogger().setLevel(Level.WARN);
         // create table descriptor
        String tableName= "Result";
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));

        // create column descriptor
        HColumnDescriptor cf = new HColumnDescriptor("res");
        htd.addFamily(cf);

        // configure HBase
        Configuration configuration = HBaseConfiguration.create();
        HBaseAdmin hAdmin = new HBaseAdmin(configuration);

        if (hAdmin.tableExists(tableName)) {
            hAdmin.deleteTable(tableName);
            hAdmin.createTable(htd);
            System.out.println("Table already exists");
        }
        else {
            hAdmin.createTable(htd);
            System.out.println("table "+tableName+ " created successfully");
        }
        hAdmin.close();

        HTable table = new HTable(configuration,tableName);
        //Traversal result writing
        int cnt=0;
        Iterator it = linkedHashMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            Put put;
            String[] temp = ((String) entry.getValue()).split("\\|");
            for (int i = 0; i < temp.length; i++) {
                put  = new Put(("row key="+cnt).getBytes());
                put.add("res".getBytes(),("R" + DistinctRow.get(i)).getBytes(),temp[i].getBytes());
                table.put(put);
            }
            cnt++;
        }
        table.close();
        System.out.println("put successfully");
    }
    public static void main(String[] args) throws IOException {
        init(args);
        Read_Data(args);
        Robustness();
        Select();
        Distinct();
        GetAnser();
        System.out.println("start write");
        WritertoHbase();
        System.out.println("end write");
    }
}

三、总结

1. 使用LinkedHashMap能够去重且保证顺序,大大减少去重编程的难度
2. 要熟练掌握在HDFS上存取数据操作,要注意Hbase的存储格式
3. 要学会使用正则表达式进行文本字符的处理
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐