为什么总是要打印结果(print)

· 1414字 · 3分钟

最近在工作上做了点数据开发,与以往不同的是,以前是按批处理的思路来写,这次要改成按实时调用程序的思路写。说这两种思路有差异的人是我的直属领导(工作能力很强的那个),从我这个菜鸟的角度看其实没撒差别,都是先计算基础特征–>然后调用模型–>最后返回结果,只不过以前做的是批量计算,这次改成实时计算,并且有很强的时效限制。基本上做完以后,我倒确实咂摸出来两点差异:

  1. 在一大段脚本或程序之中,时常需要打印每个小步骤的结果,确保将来出问题时能够快速核查具体哪个步骤出了问题。
  2. 需要记录和返回代表正常或异常的状态值,这是为了方便在多个环节相互衔接时,要是其实一个环节出现异常,其他环节好制定应对策略。

下面是一个简单的 R 脚本,每个 item_id 是一条数据的唯一标识,这个脚本实现的内容也很简单,就是先从 table_model_feature 这个表中获取每个 item_id 的基本特征,然后根据导入的模型文件计算结果即 predict() 这步,最后将结果插入到 table_model_score 这个表中,所有步骤都被包装到了 calc_model_score 这个函数中。

library(xgboost)
library(DBI)
library(rJava)
library(RJDBC)

calc_model_score <- function(item_id) {
  print(paste(Sys.time(), ">>>calc model id:", item_id))
  # 连接数据库
  drv <-
    JDBC("oracle.jdbc.driver.OracleDriver", ".../ojdbc6-11.2.0.1.0.jar")
  conn <-
    dbConnect(drv, "jdbc:oracle:thin:@IP:端口:数据库", "账号", "密码") 
  # 导入数据  
  data_test <-
    dbGetQuery(conn, "select * from table_model_feature where item_id = ?", item_id)
  
  #如果没查到数据则返回0
  data_size <- nrow(data_test)
  print(paste(Sys.time(), ">>>result model data rows:", data_size))
  
  if (data_size == 0) {
    return(0)
  } else
  {
    colnames(data_test) <- tolower(colnames(data_test))
    # 加载模型文件
    model <- xgboost::xgb.load('.../xgb_model.model')
    # 计算评分
    score <- predict(model, newdata = as.matrix(data_test))
    result <- cbind(data_test, score)
    
    dbSendUpdate(conn, "delete from table_model_score t where  t.item_id = ?", item_id)
    
    print(paste(Sys.time(), "<<<predict model score:", result$score))
    
    dbWriteTable(conn, "table_model_score", result, overwrite  = FALSE, append = TRUE, force = TRUE)
    dbDisconnect(conn)
    return(1)
  }
}

上面的脚本中有三处添加了 print() 函数,用来打印每个步骤的结果。在 R 环境中,可以直接调用calc_model_score 这个函数,在别的地方就需要先加载上面的 R 脚本,再调用函数。如果一切正常,在返回计算结果的同时,也会打印如下内容。

> source(".../load_model.R")
> calc_model_score(245849955)
[1] "2023-06-02 17:30:09 >>>calc model id: 245849955"
[1] "2023-06-02 17:30:09 >>>result model data rows: 1"
[1] "2023-06-02 17:30:09 <<<predict model score: 0.0270512625575066"

下面是一段 ORACLE 中的 sql 程序脚本,在业务上一个 policy_id 会对应多个 item_id,程序实现的内容是这样的:

  • 开启一个循环(for ... loop
    • 1.其他地方传递过来的数据存在 v_item_base_list 这个表中,根据每个 policy_id 去获取其对应的所有 item_id
    • 2.使用 f_get_model_type 函数获取每一个 item_id 对应的模型类别。
    • 3.如果模型类别不为空,调用 p001_calc_item_id 程序计算每个 item_id 的基础数据。
    • 4.根据判断所得模型类别分别计算各个子模型所需要的特征,当模型类别为 MODEL_TYPE_A 时调用 p002_calc_MODEL_TYPE_A,当模型类别为 MODEL_TYPE_B 时调用 p003_calc_MODEL_TYPE_B
    • 5.从一个序列 SEQUENCE 中获取一个唯一 id,连同每个 policy_iditem_id 以及对应的模型类别 model_type 一起存入到一个表 t_calc_model_type 里,用于将来判断每条数据具体调用哪个模型来计算结果。
  • 结束循环(end loop
  • 返回一个状态值,并且将出现异常时的报错信息存到一个表里。

以上许多步骤也要打印执行结果,只不过在 R 里面打印结果的函数是 print,在 ORACLE 里面是put_line

procedure p004_calc_score(policy_id in number, exec_result out number) is
    v_policy_id  number;
    v_item_id    number;
    v_model_type varchar2(10);
    e_errcode    varchar2(500);
    v_proc_id    number(20);
    v_error_desc varchar2(4000);
    err_num      number(4);
  begin
    v_policy_id := policy_id;
  
    for item in (select nvl(item_id, 0) item_id
                   from v_item_base_list
                  where policy_id = v_policy_id) loop
    
      dbms_output.put_line('item_id:' || item.item_id);
    
      v_model_type := f_get_model_type(item.item_id);
    
      dbms_output.put_line('v_model_type:' || v_model_type);
    
      if v_model_type is not null then
        p001_calc_item_id(v_inparam => '',
                             e_errcode => e_errcode,
                             policy_id => v_policy_id);
      end if;
    
      if v_model_type = 'MODEL_TYPE_A' then
        p002_calc_MODEL_TYPE_A(v_inparam => '',
                                 e_errcode => e_errcode,
                                 item_id   => item.item_id);
        dbms_output.put_line('MODEL_TYPE_A:' || e_errcode);
      elsif v_model_type = 'MODEL_TYPE_B' then
        p003_calc_MODEL_TYPE_B(v_inparam => '',
                                 e_errcode => e_errcode,
                                 item_id   => item.item_id);
        dbms_output.put_line('MODEL_TYPE_B:' || e_errcode);
      end if;
     
      select SEQUENCE.Nextval into v_proc_id from dual;
      delete from t_calc_model_type t where t.policy_id = v_policy_id;
      commit;
      v_item_id := item.item_id;
      insert into t_calc_model_type
        (id, policy_id, item_id, model_type, load_date)
      values
        (v_proc_id, v_policy_id, v_item_id, v_model_type, sysdate);
      commit;
    end loop;
  
    exec_result := 1;
  exception
    when others then
      v_error_desc := substr(sqlerrm, 1, 2000);
      exec_result  := -1;
      err_num      := sqlcode;
      insert into T_OLAP_ERRORS
        (pkg_name, proc_name, error_num, error_msg, record_date)
      values
        ('pkg_name',
         'p004_calc_score',
         err_num,
         v_error_desc,
         sysdate);
      commit;
      raise;
  end;

其中,程序中的序列 SEQUENCE 是在 ORACLE 中创建的。

 /*创建一个序列(sequence),每次新增 id 从序列中取值 Nextval*/
create sequence SEQUENCE
minvalue 1
maxvalue 9999999999999999999999999999
start with 1
increment by 1
cache 20;
R