- 数据结构分析:
(1)京津冀三省的2015年度的科技成果数据原始表,为Access数据库,;
(2)要求将三省的科技成果数据汇总到同一表中(要求结果表为MySql数据表);
(3)三个原始数据表结构不一致,要求结果表中包括所有的字段,表达意思相同或相似的字段要进行合并,不允许丢失字段(若只有本表独有字段,在结果表中其他两表数据在该字段填入空值)。
- 数据同步练习:要求采编程实现三个原始表数据同步功能,将三个表的数据同步到一个结果表中。
- 数据清洗练习:
(1)重复记录清洗,分析结果表中是否存在重复的数据记录,主要是地域和成果名称相同即判定为重复记录,保留一条记录,并补充其他重复记录中独有的数据字段内容,再删除其余记录。
(2)在结果表中追加年份和地域两个标准维度字段,如果原始表中存在该字段则直接转化成维度字段,若不存在则根据单位名称确定地域字段内容,天津科技成果表中不存在年度字段,则直接将年度维度字段确定为2015年。
- 数据分析:
根据提供的已知字段名称,自动将科技成果分类,并且分析京津冀三地的科技优势。
- 将最终的MySQL数据表导入,放入源程序,将文件夹命名为为班级学号姓名提交。
数据读取,合并并保存到mysql中
import com.healthmarketscience.jackcess.*;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.json.JSONArray;
import org.json.JSONObject;
import org.xm.Similarity;
public class FileRead {
private static Connection conn;
private static List<List<String>> columnList=new ArrayList<>();
private static Map<String,String> columnMap=new HashMap<>();
@SneakyThrows
public static void main(String[] args) {
String mysqlUrl = "jdbc:mysql://localhost:3307/sparktest";
String mysqlUsername = "root";
String mysqlPassword = "123456";
conn = DriverManager.getConnection(mysqlUrl, mysqlUsername, mysqlPassword);
read_data("src/main/resources/data/1.1京津冀科技成果(技术).accdb");
read_data("src/main/resources/data/1.2河北科技成果2015年第1-2-3期(技术).mdb");
read_data("src/main/resources/data/1.3天津科技成果(技术).mdb");
read_data("src/main/resources/data/1.4北京推介项目成果.accdb");
// System.out.println(compute_columns());
createConTable();
insertTotalData("src/main/resources/data/1.1京津冀科技成果(技术).accdb");
insertTotalData("src/main/resources/data/1.2河北科技成果2015年第1-2-3期(技术).mdb");
insertTotalData("src/main/resources/data/1.3天津科技成果(技术).mdb");
insertTotalData("src/main/resources/data/1.4北京推介项目成果.accdb");
conn.close();
}
private static String check(String s){
switch (s) {
case "评价单位名称":
return "鉴定部门";
case "主要人员":
return "完成人";
case "单位名称":
return "完成单位";
case "行业":
return "应用行业名称";
case "年度":
return "成果分布年份";
case "序号":
return "ID";
case "单位":
return "完成单位";
case "负责人":
return "完成人";
case "所属技术领域":
return "应用行业名称";
case "项目简介":
return "成果简介";
case "领域":
return "应用行业名称";
case "成果产出単位":
return "完成单位";
case "技术领域":
return "应用行业名称";
case "成果产出单位":
return "完成单位";
case "取得的知识产权":
return "知识产权情况";
case "名称":
return "成果名称";
case "知识产权":
return "知识产权情况";
default:
return s;
}
}
private static List<String> compute_columns(){
List<String> columns=new ArrayList<>();
List<String> temp=columnList.get(0);
for (String i:temp){
columnMap.put(i,i);
columns.add(i);
}
for(int i=1;i<columnList.size();i++){
temp=columnList.get(i);
for (String j:temp){
String s=check(j);
if(s.equals(j)&&!columns.contains(s)){
columns.add(j);
columnMap.put(j,j);
}
else{
columnMap.put(j,s);
}
}
}
return columns;
}
@SneakyThrows
private static void createConTable(){
List<String> columns=compute_columns();
StringBuilder createTableSql = new StringBuilder("CREATE TABLE IF NOT EXISTS TOTAL_DATA" );
createTableSql.append(" (");
for (String column : columns) {
createTableSql.append("`" + column + "`").append(" TEXT, ");
}
createTableSql.delete(createTableSql.length() - 2, createTableSql.length());
createTableSql.append(")");
PreparedStatement createTableStmt = conn.prepareStatement(createTableSql.toString());
createTableStmt.executeUpdate();
}
@SneakyThrows
private static void insertTotalData(String filepath) {
Database db = DatabaseBuilder.open(new File(filepath));
for (String tableName : db.getTableNames()) {
Table table = db.getTable(tableName);
List<Column> columns = (List<Column>) table.getColumns();
StringBuilder insertDataSql = new StringBuilder("INSERT INTO ");
insertDataSql.append("TOTAL_DATA").append(" (");
for (Column column : columns) {
String v = columnMap.get(column.getName().replace(":", ""));
insertDataSql.append("`" + v + "`").append(",");
}
insertDataSql.deleteCharAt(insertDataSql.length() - 1);
insertDataSql.append(") VALUES (");
for (int i = 0; i < columns.size(); i++) {
insertDataSql.append("?,");
}
insertDataSql.deleteCharAt(insertDataSql.length() - 1);
insertDataSql.append(")");
PreparedStatement insertDataStmt = conn.prepareStatement(insertDataSql.toString());
for (Row row : table) {
for (int i = 0; i < columns.size(); i++) {
insertDataStmt.setObject(i + 1, row.get(columns.get(i).getName()).toString().replace("\\\\",""));
}
insertDataStmt.executeUpdate();
}
}
db.close();
}
@SneakyThrows
private static void read_data(String filepath){
Database db = DatabaseBuilder.open(new File(filepath));
for (String tableName : db.getTableNames()) {
Table table = db.getTable(tableName);
List<Column> columns = (List<Column>) table.getColumns();
List<String> temp=new ArrayList<>();
for (Column column : columns) {
String v = column.getName().replace(":", "");
System.out.printf(v+"\t");
temp.add(v);
}
columnList.add(temp);
System.out.println("\n");
}
// saveTxT(filepath);
// createTable(filepath);
// insertData(filepath);
}
@SneakyThrows
private static void saveTxT(String filepath){
Database db = DatabaseBuilder.open(new File(filepath));
for (String tableName : db.getTableNames()) {
Table table = db.getTable(tableName);
String fileName = tableName.replace("/","_");
File outputFile = new File("src/main/resources/data/" + fileName + ".txt");
FileWriter fw = new FileWriter(outputFile);
BufferedWriter bw = new BufferedWriter(fw);
for (Row row : table) {
String rowData = row.toString();
bw.write(rowData);
bw.newLine();
}
bw.close();
fw.close();
}
db.close();
}
@SneakyThrows
private static void createTable(String filepath){
Database db = DatabaseBuilder.open(new File(filepath));
for (String tableName : db.getTableNames()) {
Table table = db.getTable(tableName);
String fileName = tableName.replace("/","_");
List<Column> columns = (List<Column>) table.getColumns();
StringBuilder createTableSql = new StringBuilder("CREATE TABLE IF NOT EXISTS " + fileName);
createTableSql.append(" (");
List<String> temp=new ArrayList<>();
for (Column column : columns) {
String v = column.getName().replace(":", "");
temp.add(v);
createTableSql.append("`" + v + "`").append(" TEXT, ");
}
columnList.add(temp);
createTableSql.delete(createTableSql.length() - 2, createTableSql.length());
createTableSql.append(")");
PreparedStatement createTableStmt = conn.prepareStatement(createTableSql.toString());
System.out.println(createTableSql.toString());
createTableStmt.executeUpdate();
}
db.close();
}
@SneakyThrows
private static void insertData(String filepath) {
Database db = DatabaseBuilder.open(new File(filepath));
for (String tableName : db.getTableNames()) {
Table table = db.getTable(tableName);
String fileName = tableName.replace("/","_");
List<Column> columns = (List<Column>) table.getColumns();
StringBuilder insertDataSql = new StringBuilder("INSERT INTO ");
insertDataSql.append(fileName).append(" (");
for (Column column : columns) {
String v = column.getName().replace(":", "");
insertDataSql.append("`" + v + "`").append(",");
}
insertDataSql.deleteCharAt(insertDataSql.length() - 1);
insertDataSql.append(") VALUES (");
for (int i = 0; i < columns.size(); i++) {
insertDataSql.append("?,");
}
insertDataSql.deleteCharAt(insertDataSql.length() - 1);
insertDataSql.append(")");
PreparedStatement insertDataStmt = conn.prepareStatement(insertDataSql.toString());
for (Row row : table) {
for (int i = 0; i < columns.size(); i++) {
insertDataStmt.setObject(i + 1, row.get(columns.get(i).getName()));
}
insertDataStmt.executeUpdate();
}
}
db.close();
}
}
将合并的表进行数据清洗导出并分析
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.catalyst.dsl.expressions.{DslAttr, StringToAttributeConversionHelper}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
object DataClean {
def inferArea(area: Column): Column = {
when(area.contains("天津"), "天津")
.when(area.contains("北京"), "北京")
.otherwise("河北")
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = SparkSession.builder
.appName("DataClean")
.master("local[*]")
.getOrCreate()
val mysqlHost = "localhost"
val mysqlPort = "3307"
val mysqlDatabase = "sparktest"
val mysqlTable = "total_data"
val mysqlUser = "root"
val mysqlPassword = "123456"
val jdbcUrl = s"jdbc:mysql://$mysqlHost:$mysqlPort/$mysqlDatabase?useSSL=false"
var df: DataFrame = sc.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", mysqlTable)
.option("user", mysqlUser)
.option("password", mysqlPassword)
.load()
val stringColumns = df.schema.fields.filter(_.dataType == StringType).map(_.name)
df = stringColumns.foldLeft(df)((accDF, colName) =>
accDF.withColumn(colName, regexp_replace(col(colName), "\"", ""))
)
val columns=df.columns
df =df.withColumn("年份", when(df("成果分布年份") !== null, df("成果分布年份")).otherwise(2015))
df =df.withColumn("地域", when(df("省市") !== null, df("省市")).otherwise(inferArea(df("完成单位"))))
df=df.dropDuplicates(Seq("地域", "成果名称")).repartition(1)
df.write
.format("csv")
.option("header", "true")
.option("encoding", "GBK")
.save("src/main/resources/data/out")
sc.stop()
}
}
import pandas as pd
import matplotlib.pyplot as plt
import pymysql
from matplotlib import rcParams
if __name__ == '__main__':
mydb = pymysql.connect(
host="localhost",
port=3307,
user="root",
password="123456",
database="sparktest"
)
query="SELECT * FROM res_data"
df = pd.read_sql(query, mydb)
grouped = df.groupby('地域').size()
rcParams['font.family'] = 'SimHei'
# 绘制饼状图
plt.figure(figsize=(10, 7))
grouped.plot(kind='pie', autopct='%1.1f%%', startangle=140)
plt.title('地域分布饼状图')
plt.ylabel('')
plt.axis('equal') # 使饼状图保持圆形
plt.show()
grouped = df.groupby('应用行业名称').size()
grouped = grouped[grouped > 50]
# 绘制直方图
plt.figure(figsize=(12, 6))
grouped.plot(kind='bar', color='skyblue')
plt.title('数量大于50的应用行业分布直方图')
plt.xlabel('应用行业')
plt.ylabel('数量')
plt.xticks(rotation=45, ha='right') # 设置 x 轴标签旋转角度
plt.tight_layout() # 调整布局以防止标签重叠
plt.show()
mydb.close()
标签:String,insertDataSql,试卷,练习,df,append,import,课堂,columns From: https://www.cnblogs.com/liyiyang/p/18121786