[Doris Stream Load] java代码如何调用stream load

333人浏览 / 0人评论

1. pom引用

<dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>
        <dependency>
            <groupId>com.vipkid.commons</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>2.1.6</version>
        </dependency>
    </dependencies>

2. java代码

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.File;
import java.nio.charset.StandardCharsets;

public class StreamLoadApp {
    //doris账号
    static String dorisUser = "xxxxx";
    //doris密码
    static String dorisPassword = "xxxxx";
    //doris导入label, 需要有一定含义, 最好包含库表名
    static String label = "test-load-doris" + System.currentTimeMillis();
    //doris load地址
    static String loadUrl = "http://fe:8030/api/test/user_agg/_stream_load";


    static RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(300000)
            .setConnectionRequestTimeout(300000)
            .setSocketTimeout(300000).build();

    static HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
        @Override
        protected boolean isRedirectable(String method) {
            return true;
        }
    });
    static HttpPut httpPuts = new HttpPut(loadUrl);

    public static void main(String[] args) {
        httpPuts.setConfig(requestConfig);
        //doris必须包含的header
        httpPuts.setHeader(HttpHeaders.EXPECT, "100-continue");
        //doris必须包含的header
        httpPuts.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(dorisUser, dorisPassword));
        //doris必须包含的header, 数据分隔符。 默认是\t
        httpPuts.setHeader("column_separator", ",");
        //doris必须包含的header
        httpPuts.setHeader("label", label);

        loadStringToDoris();
    }

    /**
     * 实时导入文件到doris
     */
    public static void loadFileToDoris() {
        //本地文件路径
        String filePath = "/Users/sangli/Desktop/a.txt";

        File file = new File(filePath);

        FileEntity fileEntity = new FileEntity(file);
        httpPuts.setEntity(fileEntity);

        try (CloseableHttpClient client = httpClientBuilder.build()) {
            try (CloseableHttpResponse response = client.execute(httpPuts)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                System.out.println(loadResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 实时导入字符串数据到doris
     */
    public static void loadStringToDoris() {
        try (CloseableHttpClient client = httpClientBuilder.build()) {

            StringEntity entity = new StringEntity("2020-01-02,124,francis,12\n2020-01-03,1245,francis,11", "UTF-8");
            httpPuts.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(httpPuts)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                System.out.println(loadResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
}

全部评论