[Doris Stream Load] java代码如何调用stream load
1. pom引用
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>com.vipkid.commons</groupId>
<artifactId>commons-httpclient</artifactId>
<version>2.1.6</version>
</dependency>
</dependencies>
2. java代码
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
public class StreamLoadApp {
//doris账号
static String dorisUser = "xxxxx";
//doris密码
static String dorisPassword = "xxxxx";
//doris导入label, 需要有一定含义, 最好包含库表名
static String label = "test-load-doris" + System.currentTimeMillis();
//doris load地址
static String loadUrl = "http://fe:8030/api/test/user_agg/_stream_load";
static RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(300000)
.setConnectionRequestTimeout(300000)
.setSocketTimeout(300000).build();
static HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
static HttpPut httpPuts = new HttpPut(loadUrl);
public static void main(String[] args) {
httpPuts.setConfig(requestConfig);
//doris必须包含的header
httpPuts.setHeader(HttpHeaders.EXPECT, "100-continue");
//doris必须包含的header
httpPuts.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(dorisUser, dorisPassword));
//doris必须包含的header, 数据分隔符。 默认是\t
httpPuts.setHeader("column_separator", ",");
//doris必须包含的header
httpPuts.setHeader("label", label);
loadStringToDoris();
}
/**
* 实时导入文件到doris
*/
public static void loadFileToDoris() {
//本地文件路径
String filePath = "/Users/sangli/Desktop/a.txt";
File file = new File(filePath);
FileEntity fileEntity = new FileEntity(file);
httpPuts.setEntity(fileEntity);
try (CloseableHttpClient client = httpClientBuilder.build()) {
try (CloseableHttpResponse response = client.execute(httpPuts)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
System.out.println(loadResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 实时导入字符串数据到doris
*/
public static void loadStringToDoris() {
try (CloseableHttpClient client = httpClientBuilder.build()) {
StringEntity entity = new StringEntity("2020-01-02,124,francis,12\n2020-01-03,1245,francis,11", "UTF-8");
httpPuts.setEntity(entity);
try (CloseableHttpResponse response = client.execute(httpPuts)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
System.out.println(loadResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
}
全部评论