国科大大数据系统与大规模数据分析课程第一次作业(hash distinct)
大数据系统与大规模数据分析第一次作业一、作业内容从HDFS中读出数据对读出的数据进行hash去重将处理好的数据存入Hbase二、作业代码import java.util.*;import java.util.regex.Matcher;import java.util.regex.Pattern;import java.io.*;import java.net.URI;import java.ne
·
国科大大数据系统与大规模数据分析第一次作业(hash distinct)
一、作业内容
-
从HDFS中读出数据
-
对读出的数据进行hash去重
-
将处理好的数据存入Hbase
二、作业代码
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.*;
/**
* @ClassName Hw1Grp4
* @Description: hw1 code
* @Author: cgg
* @CreateDate: 2022/3/21 11:22
* @UpdateUser: cgg
* @UpdateDate: 2022/3/25 10:22
* @UpdateRemark: info
* @Version: 1.1
*/
@SuppressWarnings("all")
public class Hw1Grp4 {
public static ArrayList<String> TestData_List = null;//raw data
public static ArrayList<String> DataAfterSelect = null;//Data After Select
public static String FilePath = null; //File path
public static int SelectRow = 0;//Columns to select
public static String SelectCondition = null; //Select Condition
public static String SelectValue = null;//Condition value
public static ArrayList<Integer> DistinctRow = null;//Columns to be redone
public static LinkedHashMap<Integer, String> linkedHashMap = null; //Columns to be redone
/**
* @auther: cgg
* @Description //TODO Read the data according to the parameters passed in the main function
* @param: [args] parameters passed in the main function
* @return: void
* @date: 2022/4/1 14:09
*/
public static void Read_Data(String[] args) throws IOException {
//Read Data
TestData_List = new ArrayList<String>();
if (args.length <= 0) {
System.out.println("Usage: HDFSTest <hdfs-file-path>");
System.exit(1);
}
String file = FilePath;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);
Path path = new Path(file);
FSDataInputStream in_stream = fs.open(path);
BufferedReader in = new BufferedReader(new InputStreamReader(in_stream));
String s;
while ((s=in.readLine())!=null) {
TestData_List.add(s);
}
in.close();
fs.close();
}
/**
* @auther: cgg
* @Description //TODO Initialize to get the file name, selected column,selection condition and column to be de duplicated
* @param: [args] parameters passed in the main function
* @return: void
* @date: 2022/4/1 14:17
*/
public static void init(String[] args) {
String arg0 = args[0];
String arg1 = args[1];
String arg2 = args[2];
//regular expression arg0
String regStr = "^R=(.*)$";
Pattern pattern = Pattern.compile(regStr);
Matcher matcher = pattern.matcher(arg0);
while (matcher.find()) {
FilePath = matcher.group(1);
break;
}
//regular expression arg1
regStr = "^select:(.*)$";
pattern = Pattern.compile(regStr);
matcher = pattern.matcher(arg1);
while (matcher.find()) {
arg1 = matcher.group(1);
break;
}
String[] temp = arg1.split(",");
SelectRow = Integer.parseInt(temp[0].substring(temp[0].length() - 1));
SelectCondition = temp[1];
SelectValue = temp[2];
//regular expression arg2
regStr = "^distinct:(.*)$";
pattern = Pattern.compile(regStr);
matcher = pattern.matcher(arg2);
while (matcher.find()) {
arg2 = matcher.group(1);
break;
}
temp = arg2.split(",");
DistinctRow = new ArrayList<Integer>();
for (int i = 0; i < temp.length; i++) {
DistinctRow.add(Integer.parseInt(temp[i].substring(temp[i].length() - 1)));
}
}
/**
* @auther: cgg
* @Description //TODO Robustness test, if the input parameters are irregular, exit the program
* @param: []
* @return: void
* @date: 2022/4/1 14:21
*/
public static void Robustness() {
if (FilePath == null || FilePath == "") System.exit(1);
if (SelectRow < 0 || SelectRow >= TestData_List.get(0).split("\\|").length) System.exit(1);
if (!SelectCondition.equals("gt") && !SelectCondition.equals("ge") && !SelectCondition.equals("eq")
&& !SelectCondition.equals("ne") && !SelectCondition.equals("le") && !SelectCondition.equals("lt")) {
System.exit(1);
}
for (int i = 0; i < DistinctRow.size(); i++) {
if (DistinctRow.get(i) < 0 || DistinctRow.get(i) >= TestData_List.get(0).split("\\|").length) {
System.exit(1);
}
}
}
/**
* @auther: cgg
* @Description //TODO Judge whether the data in this column meets the filtering conditions
* @param: [content, condition, value] content:Content value to compare; condition:Conditions to compare; value: Comparison value
* @return: boolean
* @date: 2022/4/1 14:22
*/
public static boolean IsSelectData(String content, String condition, String value) {
switch (condition) {
case "gt"://>
return Double.parseDouble(content) > Double.parseDouble(value);
case "ge"://>=
return Double.parseDouble(content) >= Double.parseDouble(value);
case "eq"://==
return content.equals(value);
case "ne"://!=
return !content.equals(value);
case "le"://<=
return Double.parseDouble(content) <= Double.parseDouble(value);
case "lt"://<
return Double.parseDouble(content) < Double.parseDouble(value);
default:
System.out.println("Illegal instruction!!");
break;
}
return false;
}
/**
* @auther: cgg
* @Description //TODO //Select:The filtered data is put into a list
* @param: []
* @return: void
* @date: 2022/4/1 14:25
*/
public static void Select() {
DataAfterSelect = new ArrayList<String>();
for (Object data : TestData_List) {
String[] temp = ((String) data).split("\\|");
if (IsSelectData(temp[SelectRow], SelectCondition, SelectValue)) {
String TempDataStr = "";
for (int i = 0; i < DistinctRow.size(); i++) {
if (i == 0) TempDataStr += temp[(int) DistinctRow.get(i)];
else TempDataStr += "|" + temp[(int) DistinctRow.get(i)];
}
DataAfterSelect.add(TempDataStr);
}
}
}
/**
* @auther: cgg
* @Description //TODO //De duplication of filtered data
* @param: []
* @return: void
* @date: 2022/4/1 14:26
*/
public static void Distinct() {
linkedHashMap = new LinkedHashMap<Integer, String>();
for (Object o : DataAfterSelect) {
String data = (String) o;
linkedHashMap.put(data.hashCode(), data);
}
}
/**
* @auther: cgg
* @Description //TODO //get anser print
* @param: []
* @return: void
* @date: 2022/4/1 14:27
*/
public static void GetAnser() {
int cnt = 0;
Iterator it = linkedHashMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
String[] temp = ((String) entry.getValue()).split("\\|");
for (int i = 0; i < temp.length; i++) {
System.out.print("(row key=" + cnt + ",res:R" + DistinctRow.get(i) + "=" + temp[i] + ")");
}
System.out.println();
cnt++;
}
}
/**
* @auther: cgg
* @Description //TODO //write in Hbase
* @param: []
* @return: void
* @date: 2022/4/1 14:29
*/
public static void WritertoHbase() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
Logger.getRootLogger().setLevel(Level.WARN);
// create table descriptor
String tableName= "Result";
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
// create column descriptor
HColumnDescriptor cf = new HColumnDescriptor("res");
htd.addFamily(cf);
// configure HBase
Configuration configuration = HBaseConfiguration.create();
HBaseAdmin hAdmin = new HBaseAdmin(configuration);
if (hAdmin.tableExists(tableName)) {
hAdmin.deleteTable(tableName);
hAdmin.createTable(htd);
System.out.println("Table already exists");
}
else {
hAdmin.createTable(htd);
System.out.println("table "+tableName+ " created successfully");
}
hAdmin.close();
HTable table = new HTable(configuration,tableName);
//Traversal result writing
int cnt=0;
Iterator it = linkedHashMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
Put put;
String[] temp = ((String) entry.getValue()).split("\\|");
for (int i = 0; i < temp.length; i++) {
put = new Put(("row key="+cnt).getBytes());
put.add("res".getBytes(),("R" + DistinctRow.get(i)).getBytes(),temp[i].getBytes());
table.put(put);
}
cnt++;
}
table.close();
System.out.println("put successfully");
}
public static void main(String[] args) throws IOException {
init(args);
Read_Data(args);
Robustness();
Select();
Distinct();
GetAnser();
System.out.println("start write");
WritertoHbase();
System.out.println("end write");
}
}
三、总结
1. 使用LinkedHashMap能够去重且保证顺序,大大减少去重编程的难度
2. 要熟练掌握在HDFS上存取数据操作,要注意Hbase的存储格式
3. 要学会使用正则表达式进行文本字符的处理
更多推荐
所有评论(0)