CREATE DEFINER=`root`@`%` PROCEDURE `print_create_runwhole_es_sql`()
begin
DECLARE p_name varchar(50);
DECLARE p_name_mix text;
DECLARE p_yw_key varchar(50);
DECLARE p_ori_table_id varchar(50);
DECLARE p_ori_table_name varchar(50);
declare stopflag int DEFAULT 0;
declare cursor_runwhole_es cursor for
select temp.es_table_name, dti.column_name , temp.ori_bus_key ,temp.ori_table_id,temp.ori_table_name
from ES_TEST_TEMP temp join
(
select M.ori_table_id , GROUP_CONCAT( concat('a.',M.column_name) order by M.seq asc ) column_name from (
select cast( id as SIGNED ) seq,tb_id ori_table_id, column_name
FROM data_item_info
order by cast( id as SIGNED ) ASC
)M group by M.ori_table_id
)dti on dti.ori_table_id =temp.ori_table_id
order by temp.ori_table_id asc
;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET stopflag = 1 ;
open cursor_runwhole_es;
##目标表
drop table print_create_runwhole_es_sql;
create table print_create_runwhole_es_sql
(
es_table_name varchar(100),
create_runwhole_es TEXT
);
out_loop: LOOP
FETCH NEXT FROM cursor_runwhole_es INTO p_name , p_name_mix ,p_yw_key ,p_ori_table_id ,p_ori_table_name;
IF stopflag = 1 THEN
LEAVE out_loop;
END IF;
##循环体
insert into print_create_runwhole_es_sql
select p_name , concat ('spark {
spark.streaming.batchDuration = 5
spark.app.name = \"application_hiveToEs_', p_ori_table_id,'\"
spark.executor.instances =\"5\"
spark.executor.cores=\"8\"
spark.executor.memory=\"20g\"
spark.driver.memory=\"6g\"
spark.driver.cores=\"8\"
}
input {
hive {
pre_sql = \"select ',p_name_mix ,' from (select * ,row_number()over(partition by ',p_yw_key,' order by rksfm desc ) riadd from original.',p_ori_table_name,' ) a where a.riadd =1 \"
result_table_name = \"hive_test_1110\"
}
}
filter{}
output {
com.zdxf.bdwh1.datareverse.output.es.EsOutputRdd1{
es.nodes = \"53.80.10.101,53.80.10.102,53.80.10.103\"
es.port = \"39200\"
es.index.name = \"',p_name,'\"
es.batch.size.entries = 100000
}
stdout {}
}' );
set stopflag=0;
end loop out_loop ;
select * from print_create_runwhole_es_sql;
close cursor_runwhole_es;
end