hive写入star,csv格式的streamload
注意字符串中的转移字符直接拼接\n而要显示\\是非转义字符
public class GcyDataTrans {
private static String STARROCKS_HOST = "IP";
private static String STARROCKS_HTTP_PORT = "8030";
private static String STARROCKS_DB = "test";
private static String STARROCKS_TABLE = "test";
private static String STARROCKS_USER = "test";
private static String STARROCKS_PASSWORD = "test12345";
// String starUtl = "http://IP:8030/api/ic_pms_ods/test_pms_ods_bd_staff_ds_s/_stream_load";
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("参数异常");
return;
}
String hTableName = args[0];
String srTableName = args[1].split("\\.")[1];
dealData(hTableName, srTableName);
// StringBuilder list = new StringBuilder();
// list.append("1");
// list.append("2\n");
// list.append("3\t");
// list.append("\\N");
// list.append("4");
// System.out.println(list.toString());
}
public static void dealData(String hTableName, String srTableName) {
SparkSession spark = SparkSession.builder().appName("gcySync").master("yarn").enableHiveSupport().getOrCreate();
Dataset<Row> table =
spark.read().table(hTableName);
table.show(10);
table.map(new MapFunction<Row, String>() {
@Override
public String call(Row value) throws Exception {
StringBuilder list = new StringBuilder();
for (int i = 0; i < value.length(); i++) {
if(i==value.length()-1){
if (value.getString(i) == null || value.getString(i) == "") {
list.append("\\N");
} else {
list.append(value.getString(i));
}
}else{
if (value.getString(i) == null || value.getString(i) == "") {
list.append("\\N\t");
} else {
list.append(value.getString(i)+"\t");
}
}
}
String data = list.toString();
list.setLength(0);
return data;
}
}, Encoders.STRING())
.mapPartitions(new MapPartitionsFunction<String, String>() {
StringBuilder sb = new StringBuilder();
@Override
public Iterator<String> call(Iterator<String> input) throws Exception {
while (input.hasNext()) {
sb.append(input.next() + "\n");
if (sb.length() > 500) {
sendData(sb, srTableName);
sb.setLength(0);
}
}
System.out.println("send data to sr size:"+sb.length());
sendData(sb, srTableName);
sb.setLength(0);
return input;
}
}, Encoders.STRING())
.show(2);
spark.stop();
}
private static void sendData(StringBuilder content, String srTableName) throws Exception {
String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
STARROCKS_HOST,
STARROCKS_HTTP_PORT,
STARROCKS_DB,
srTableName);
final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(content.toString(), "UTF-8");
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
// the label header is optional, not necessary
// use label header can ensure at most once semantics
// put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
// statusCode 200 just indicates that starrocks be service is ok, not stream load
// you should see the output content to find whether stream load is success
if (statusCode != 200) {
throw new IOException(
String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
}
System.out.println(loadResult);
}
}
}
private static String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
标签:STARROCKS,star,String,list,hive,static,streamload,new,append
From: https://www.cnblogs.com/hbym/p/18196505