1.立项背景
目前国家政策的主导之下,新能源相关项目和公司的竞争愈演愈烈,很多新能源公司开始大规模的布局数仓,来从自己的产生的大量的数据中提取价值,进而在新能源市场有自己的一席之地。那么本项目就是以此为背景,结合spark sql,为大家从0到1构建一个“麻雀虽小五脏俱全”的项目
这里假设我们的数据源是一个新能源公司的数据库,包含以下几张表:electricity_meter, solar_panel, weather_data。这些表包含了电表读数、太阳能板数据和天气数据等信息。其中相关的表结构如下:
1.electricity_meter表结构
id:电表ID
date:日期
energy_consumption:能量消耗量
location:地点
2.solar_panel表结构
id:太阳能板ID
date:日期
solar_power:太阳能发电量
location:地点
3.weather_data表结构
id:天气数据ID
date:日期
temperature:温度
humidity:湿度
location:地点
示例数据
electricity_meter表示例数据
| id | date | energy_consumption | location |
|----|------------|--------------------|----------|
| 1 | 2023-07-01 | 100 | New York |
| 2 | 2023-07-01 | 80 | London |
| 3 | 2023-07-02 | 120 | New York |
| 4 | 2023-07-02 | 90 | London |
solar_panel表示例数据
| id | date | solar_power | location |
|----|------------|-------------|----------|
| 1 | 2023-07-01 | 50 | New York |
| 2 | 2023-07-01 | 40 | London |
| 3 | 2023-07-02 | 60 | New York |
| 4 | 2023-07-02 | 45 | London |
weather_data表示例数据
| id | date | temperature | humidity | location |
|----|------------|-------------|----------|----------|
| 1 | 2023-07-01 | 30 | 70 | New York |
| 2 | 2023-07-01 | 25 | 65 | London |
| 3 | 2023-07-02 | 32 | 75 | New York |
| 4 | 2023-07-02 | 28 | 68 | London |
2.需求分析
我们的目标是将新能源公司的数据整合到数仓中,以支持分析和报表需求。我们关注的指标可能包括每日电量消耗、太阳能发电量、天气对发电量的影响等。
3.需求拆解
1.ODS层(操作数据存储)
创建ODS数据库,并在该数据库中创建以下表:
electricity_meter_ods
solar_panel_ods
weather_data_ods
2.DWD层(明细数据存储)
创建DWD数据库,并在该数据库中创建以下表:
electricity_meter_dwd
solar_panel_dwd
weather_data_dwd
3.DWS层(宽表数据存储)
创建DWS数据库,并在该数据库中创建以下表:
daily_energy_usage_dws
solar_power_generation_dws
4.DIM层(维度数据存储)
创建DIM数据库,并在该数据库中创建以下表:
date_dim
location_dim
5.ADS层(分析数据存储)
创建ADS数据库,并在该数据库中创建以下表:
daily_energy_report_ads
solar_power_report_ads
接下来,我们以Scala为开发语言,Spark为计算引擎,演示一个完整的项目代码,非常容易理解一个业务的数仓开发建设。
object NewEnergy {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("New Energy Data Warehouse Project")
.master("local")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
val url = "jdbc:mysql://hadoop001:3306/test?useSSL=false"
val properties: Properties = new Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("user", "root")
properties.put("password", "root")
val electricity_meter_df = spark.read.jdbc(url, "electricity_meter", properties)
val solar_panel_df = spark.read.jdbc(url, "solar_panel", properties)
val weather_data_df = spark.read.jdbc(url, "weather_data", properties)
spark.sql("CREATE DATABASE IF NOT EXISTS ods")
spark.sql("USE ods")
electricity_meter_df.write.saveAsTable("electricity_meter_ods")
solar_panel_df.write.saveAsTable("solar_panel_ods")
weather_data_df.write.saveAsTable("weather_data_ods")
spark.sql("CREATE DATABASE IF NOT EXISTS dwd")
spark.sql("USE dwd")
spark.sql("SELECT * FROM ods.electricity_meter_ods")
.write
.saveAsTable("electricity_meter_dwd")
spark.sql("SELECT * FROM ods.solar_panel_ods")
.write
.saveAsTable("solar_panel_dwd")
spark.sql("SELECT * FROM ods.weather_data_ods")
.write
.saveAsTable("weather_data_dwd")
spark.sql("CREATE DATABASE IF NOT EXISTS dws")
spark.sql("USE dws")
spark.sql("""
|SELECT
| date, SUM(energy_consumption) AS energy_usage
|FROM
| dwd.electricity_meter_dwd
|GROUP BY
| date
|""".stripMargin)
.write
.saveAsTable("daily_energy_usage_dws")
spark.sql("""
|SELECT
| date, SUM(solar_power) AS solar_power_generation
|FROM
| dwd.solar_panel_dwd
|GROUP BY
| date
|""".stripMargin)
.write
.saveAsTable("solar_power_generation_dws")
spark.sql("CREATE DATABASE IF NOT EXISTS dim")
spark.sql("USE dim")
spark.sql("""
|SELECT
| DISTINCT date
|FROM
| dwd.electricity_meter_dwd
|""".stripMargin)
.write
.saveAsTable("date_dim")
spark.sql("""
|SELECT
| DISTINCT location
|FROM
| dwd.solar_panel_dwd
|""".stripMargin)
.write
.saveAsTable("location_dim")
spark.sql("CREATE DATABASE IF NOT EXISTS ads")
spark.sql("USE ads")
spark.sql("""
|SELECT
| d.date, u.energy_usage
|FROM
| dim.date_dim d
|JOIN
| dws.daily_energy_usage_dws u
|ON
| d.date = u.date
|""".stripMargin)
.write
.saveAsTable("daily_energy_report_ads")
spark.sql("""
|SELECT
| d.date, s.solar_power_generation
|FROM
| dim.date_dim d
|JOIN
| dws.solar_power_generation_dws s
|ON
| d.date = s.date
|""".stripMargin)
.write
.saveAsTable("solar_power_report_ads")
}
}
4.附录:数据生成工具
public class DataGenerator {
private static final String DB_URL = "jdbc:mysql://hadoop001:3306/test";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "root";
private static final int NUM_RECORDS = 1000000;
public static void main(String[] args) {
try {
Class.forName("com.mysql.jdbc.Driver");
Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
generateElectricityMeterData(conn);
generateSolarPanelData(conn);
generateWeatherData(conn);
conn.close();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
}
private static void generateElectricityMeterData(Connection conn) throws SQLException {
String sql = "INSERT INTO electricity_meter (id, date, energy_consumption, location) VALUES (?, ?, ?, ?)";
Random random = new Random();
PreparedStatement stmt = conn.prepareStatement(sql);
for (int i = 1; i <= NUM_RECORDS; i++) {
stmt.setInt(1, i);
stmt.setString(2, generateRandomDate());
stmt.setInt(3, random.nextInt(200) + 1);
stmt.setString(4, generateRandomLocation());
stmt.executeUpdate();
}
stmt.close();
}
private static void generateSolarPanelData(Connection conn) throws SQLException {
String sql = "INSERT INTO solar_panel (id, date, solar_power, location) VALUES (?, ?, ?, ?)";
Random random = new Random();
PreparedStatement stmt = conn.prepareStatement(sql);
for (int i = 1; i <= NUM_RECORDS; i++) {
stmt.setInt(1, i);
stmt.setString(2, generateRandomDate());
stmt.setInt(3, random.nextInt(100) + 1);
stmt.setString(4, generateRandomLocation());
stmt.executeUpdate();
}
stmt.close();
}
private static void generateWeatherData(Connection conn) throws SQLException {
String sql = "INSERT INTO weather_data (id, date, temperature, humidity, location) VALUES (?, ?, ?, ?, ?)";
Random random = new Random();
PreparedStatement stmt = conn.prepareStatement(sql);
for (int i = 1; i <= NUM_RECORDS; i++) {
stmt.setInt(1, i);
stmt.setString(2, generateRandomDate());
stmt.setInt(3, random.nextInt(40) + 20);
stmt.setInt(4, random.nextInt(70) + 30);
stmt.setString(5, generateRandomLocation());
stmt.executeUpdate();
}
stmt.close();
}
private static String generateRandomDate() {
String baseDateStr = "2023-07-01";
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDate baseDate = LocalDate.parse(baseDateStr, formatter);
List<String> dates = new ArrayList<>();
for (int i = -10; i <= 0; i++) {
LocalDate currentDate = baseDate.plusDays(i);
String formattedDate = currentDate.format(formatter);
dates.add(formattedDate);
}
for (int i = 1; i <= 10; i++) {
LocalDate currentDate = baseDate.plusDays(i);
String formattedDate = currentDate.format(formatter);
dates.add(formattedDate);
}
int i = new Random().nextInt(21);
return dates.get(i);
}
private static String generateRandomLocation() {
String[] locations = {"New York", "London", "Tokyo", "Sydney", "Beijing", "Shanghai", "Tianjin"};
int randomIndex = new Random().nextInt(locations.length);
return locations[randomIndex];
}
}
新能源趋势下一个简单的数仓项目,助力理解数仓模型
标签:数仓,助力,dwd,模型,meter,solar,sql,date,spark From: https://blog.51cto.com/u_15346267/9145398