头歌网约车大数据综合项目——基于Spark的数据清洗
本文介绍了两个数据清洗任务,分别针对网约车撤销订单和成功订单的数据处理。第一关的任务包括读取.dat文件、清理列名空格、过滤必填字段、处理“null”值、时间格式转换、行政区划代码转换、去重、选择并排序字段,最终输出为单个文件。第二关的任务涉及读取原始数据、删除不需要的列、空字符串转换、处理经纬度字段、时间字段处理、过滤空值和日期范围、格式化时间字段、读取MySQL中的t_address表、添加d
·
如果代码通过不了,可能是内存不足,多试几次总会成功!
第1关:网约车撤销订单数据清洗
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions;
public class TrainClean {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession spark = SparkSession.builder().master("local").appName("Boxoffice_Movie").getOrCreate();
// 1. 读取.dat文件
Dataset<Row> df = spark.read()
.option("header", "true")
.option("delimiter", ",") // 明确指定逗号分隔符
.csv("/data/workspace/myshixun/ProvOrderCancel/*.dat");
// 2. 清理列名空格
String[] columns = df.columns();
for (String col : columns) {
df = df.withColumnRenamed(col, col.trim());
}
// 3. 过滤必填字段
String[] mandatoryColumns = {"companyid", "address", "orderid", "ordertime", "canceltime","cancelreason"};
Column filterCond = functions.lit(true);
// 检查所有字段都存在(即原始数据每行恰好有8个字段)
String[] allColumns = df.columns();
for (String col : allColumns) {
filterCond = filterCond.and(df.col(col).isNotNull()); // 确保字段存在(非null)
}
// 原有必填字段非空检查
for (String col : mandatoryColumns) {
filterCond = filterCond.and(
functions.trim(df.col(col)).isNotNull()
.and(functions.trim(df.col(col)).notEqual(""))
);
}
// 添加时间前缀过滤
filterCond = filterCond.and(df.col("ordertime").startsWith("20190307"))
.and(df.col("canceltime").startsWith("20190307"));
df = df.filter(filterCond);
// 4. 处理cancelreason中的"null"
df = df.withColumn("cancelreason",
functions.when(
functions.trim(functions.lower(df.col("cancelreason"))).equalTo("null"),
"未知"
).otherwise(df.col("cancelreason"))
);
// 5. 时间格式转换
df = df.withColumn("ordertime",
functions.date_format(
functions.to_timestamp(df.col("ordertime"), "yyyyMMddHHmmss"),
"yyyy-MM-dd HH:mm:ss"
))
.withColumn("canceltime",
functions.date_format(
functions.to_timestamp(df.col("canceltime"), "yyyyMMddHHmmss"),
"yyyy-MM-dd HH:mm:ss"
));
// 6. 行政区划代码转换
Dataset<Row> addressDF = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mydb")
.option("dbtable", "t_address")
.option("user", "root")
.option("password", "123123")
.load()
.withColumn("address_code", functions.trim(functions.col("address_code")))
.dropDuplicates("address_code");
df = df.withColumn("address", functions.trim(df.col("address")));
df = df.join(addressDF,
df.col("address").equalTo(addressDF.col("address_code")),
"left")
.withColumn("districtname",
functions.coalesce(addressDF.col("address_name"), functions.lit("未知")))
.drop("address_code", "address_name");
// 7. 去重(保留第一个出现的订单)
df = df.dropDuplicates("orderid");
// 8. 选择并排序字段
df = df.select(
"companyid", "address", "districtname", "orderid",
"ordertime", "canceltime", "operator", "canceltypecode", "cancelreason"
).orderBy("orderid");
// 9. 输出为单个文件
df.coalesce(1)
.write()
.option("sep", "|")
.option("header", false)
.mode("overwrite")
.csv("/root/files");
spark.stop();
}
}
第2关:网约车成功订单数据清洗
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.functions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class OrderClean {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession spark = SparkSession.builder().master("local").appName("OrderClean").getOrCreate();
// 读取原始数据并赋予列名
Dataset<Row> rawData = spark.read()
.option("sep", ",")
.csv("/data/workspace/myshixun/ProvOrderCreate/*")
.toDF("companyid", "address", "orderid", "departtime", "ordertime", "passengernote", "departure",
"deplongitude", "deplatitude", "destination", "destlongitude", "destlatitude", "encrypt_c", "faretype");
// 删除不需要的列
Dataset<Row> filteredData = rawData.select(
rawData.col("companyid"),
rawData.col("address"),
rawData.col("orderid"),
rawData.col("departtime"),
rawData.col("ordertime"),
rawData.col("departure"),
rawData.col("deplongitude"),
rawData.col("deplatitude"),
rawData.col("destination"),
rawData.col("destlongitude"),
rawData.col("destlatitude")
);
// 将各列的空字符串转换为null
for (String colName : filteredData.columns()) {
filteredData = filteredData.withColumn(colName,
functions.when(functions.col(colName).notEqual(""), functions.col(colName)).otherwise(functions.lit(null)));
}
// 注册处理经纬度的UDF
UDF1<String, String> processLon = s -> {
if (s == null || s.isEmpty()) return null;
return s.length() < 3 ? s : s.substring(0, 3) + "." + s.substring(3);
};
spark.udf().register("processLon", processLon, DataTypes.StringType);
UDF1<String, String> processLat = s -> {
if (s == null || s.isEmpty()) return null;
return s.length() < 2 ? s : s.substring(0, 2) + "." + s.substring(2);
};
spark.udf().register("processLat", processLat, DataTypes.StringType);
// 处理经纬度字段
filteredData = filteredData.withColumn("deplongitude", functions.callUDF("processLon", functions.col("deplongitude")))
.withColumn("destlongitude", functions.callUDF("processLon", functions.col("destlongitude")))
.withColumn("deplatitude", functions.callUDF("processLat", functions.col("deplatitude")))
.withColumn("destlatitude", functions.callUDF("processLat", functions.col("destlatitude")));
// 处理时间字段并生成时间戳列
filteredData = filteredData
.withColumn("ordertime_ts", functions.to_timestamp(functions.col("ordertime"), "yyyyMMddHHmmss"))
.withColumn("departtime_ts", functions.to_timestamp(functions.col("departtime"), "yyyyMMddHHmmss"));
// 过滤空值
filteredData = filteredData.na().drop();
// 过滤日期范围
filteredData = filteredData.filter(
functions.col("ordertime_ts").between("2019-03-07 00:00:00", "2019-03-07 23:59:59")
.and(functions.col("departtime_ts").between("2019-03-07 00:00:00", "2019-03-07 23:59:59"))
);
// 格式化时间字段并删除临时列
filteredData = filteredData
.withColumn("ordertime", functions.date_format(functions.col("ordertime_ts"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("departtime", functions.date_format(functions.col("departtime_ts"), "yyyy-MM-dd HH:mm:ss"))
.drop("ordertime_ts", "departtime_ts");
// 读取MySQL中的t_address表
Dataset<Row> addressDF = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=utf-8")
.option("dbtable", "t_address")
.option("user", "root")
.option("password", "123123")
.load();
// 收集到Map
List<Row> addressList = addressDF.collectAsList();
Map<String, String> addressMap = new HashMap<>();
for (Row row : addressList) {
addressMap.put(row.getString(0), row.getString(1));
}
// 注册UDF并添加districtname列
spark.udf().register("getDistrict", (String code) -> addressMap.getOrDefault(code, "未知"), DataTypes.StringType);
filteredData = filteredData.withColumn("districtname", functions.callUDF("getDistrict", functions.col("address")));
// 调整列顺序并去重
filteredData = filteredData.select(
functions.col("companyid"),
functions.col("address"),
functions.col("districtname"),
functions.col("orderid"),
functions.col("departtime"),
functions.col("ordertime"),
functions.col("departure"),
functions.col("deplongitude"),
functions.col("deplatitude"),
functions.col("destination"),
functions.col("destlongitude"),
functions.col("destlatitude")
).dropDuplicates("orderid");
// 输出结果到单个文件
filteredData.coalesce(1)
.write()
.option("sep", "\t")
.option("header", false)
.mode("overwrite")
.csv("/root/files");
spark.stop();
}
}
更多推荐
所有评论(0)