[Doris Stream Load 超时问题排查2] - stream load源码排查定位- timeout by txn manager

33人浏览 / 0人评论

上一篇可以知道, fe将stream load的请求转发给be了, 并讲了fe的针对stream load的处理过程. 下面我们将分析be是如何处理stream load的.

1. 确认be处理stream load的代码位置

如果你比较熟悉be的代码结构, 可以轻易找到在http_service.cpp中注册了处理stream load请求的处理类. http_service.cpp中注册stream load的核心代码如下: 通古这段代码我们知道: StreamLoadAction.cpp这个类主要负责处理stream load.

2. 那超时时间是怎么设置的呢?

在StreamLoadAction.cpp泪中我们看到有很多方法, 我们看到on_header()这个方法时, 可以猜想这个方法应该是处理http header的方法, 我们打开看看, 代码如下: .

通过这个方法我们可以看到有预先设置的很多http header的key名字, 比如: HTTP_DB_KEY, HTTP_TABLE_KEY等. 是否有可能超时时间的key也是通过这个设置的呢? 我们找到这些定义这些key的地方: 查看全部key的定义我们终于找到了可能的那个超时http header: HTTP_TIMEOUT = "timeout“, 接下来我们看看这个变量在什么地方被使用了, 我们得到了如下代码:

这里是重点, 我们看到3个红框, 我分别讲解下

  • 红框1: 从http header中把key=timeout的值取出来, 放在ctx变量中. 这个也是be封装的一个上下文变量.
  • 红框2: ctx设置完成后, 调用 begin_txn()方法, 开始一个导入事物. 这个方法会调用fe的RPC接口, 告诉fe需要开始一个stream load导入任务的事物.
  • 红框3: 开始接受和临时保存stream load上传的文件. 具体过程这里不详细展开.

重点在于红框2中, begin_txn()方法是如何调用fe的, 发送了什么请求参数给fe?

3. be请求fe, 开始一个事物begin_txn()

通过上一步的讲解, 我们查看 begin_txn() 方法的具体实现, 核心代码如下:
通古查看代码, 我们看到:

#ifndef BE_TEST

这个代码是一个宏定义, 表示当不是be测试时代码将生效, 这段代码中核心变量我讲解下:

  • ctx: 指第二步中封装的请求上下文信息, 包含超时数据
  • request: RPC接口的请求对象.
  • master_addr: 这个表示fe master的ip地址. 每个事物都是fe的master管理.
  • result: 调用RPC接口loadTxnBegin()的结果对象. 如果是be测试, 我们可以得知这个值恒等于k_stream_load_begin_result

这里我们知道了be调用fe的RPC接口, 将超时参数传递给fe了, fe怎么处理的呢?

4. Fe如何处理 loadTxnBegin()的RPC调用

如果不知道fe中是如何实现RPC接口的, 我们可以全局搜索下loadTxnBegin()方法的定义: 当然, 这里thrift代码我是事先生成的, 不然RPC部分的代码会找不到. 如果你没有生成, 就直接全局搜索这个方法名, 看看什么地方做了具体实现. 如果你幸运的话, 你会找到如下:

找到这里, 我们猜测下FrontendServiceImpl.java应该是Fe的RPC服务的实现类.

我们跟随这个截图中 loadTxnBeginImpl( )方法, 看看这个方法的核心实现: 这里我们看到了, 单独设置了超时时间, 如果超时时间没有设置, 则使用默认的: Config.stream_load_default_timeout_second,

看到了曙光

, 我们沿着这个方法loadTxnBeginImpl()方法中 timeoutSecond 一直找, 会经过: loadTxnBeginImpl() -> Catalog.getCurrentGlobalTransactionMgr().beginTransaction() -> DatabaseTransactionMgr.beginTransaction() -> new TransactionState()构造函数;

看到这里我们看看TransactionState构造函数, 会将超时时间赋予timeoutMs这个变量, 我们查看这个变量的引用可以得到以下代码: 查看红框中引用了超时变量的方法代码如下:

 // return true if txn is running but timeout
    public boolean isTimeout(long currentMillis) {
        return transactionStatus == TransactionStatus.PREPARE && currentMillis - prepareTime > timeoutMs;
    }

通过查看注释和代码逻辑, 我们可以得知这个应该是我们需要找的超时判断的方法. 我们查看这个方法的调用方法, 可以依次得到: transactionState.isTimeout() <-- DatabaseTransactionMgr.getTimeoutTxns() <-- DatabaseTransactionMgr.removeExpiredAndTimeoutTxns(); 找到这里我们终于看到了与超时报错一样的文案: “timeout by txn manager” . 具体报错展示详见之前的一片文章: [Doris Stream Load 超时问题排查1] - stream load源码排查定位- 504 Gateway Time-out

这里超时问题也就查到根本原因了, 用户用户发送stream load请求时, 可以通过设置http header的方法(key=timeout)修改stream load导入超时时间. 如果不设置则等于: Config.stream_load_default_timeout_second, 默认300秒.

全部评论