最近在工作上做了点数据开发,与以往不同的是,以前是按批处理的思路来写,这次要改成按实时调用程序的思路写。说这两种思路有差异的人是我的直属领导(工作能力很强的那个),从我这个菜鸟的角度看其实没撒差别,都是先计算基础特征–>然后调用模型–>最后返回结果,只不过以前做的是批量计算,这次改成实时计算,并且有很强的时效限制。基本上做完以后,我倒确实咂摸出来两点差异:
- 在一大段脚本或程序之中,时常需要打印每个小步骤的结果,确保将来出问题时能够快速核查具体哪个步骤出了问题。
- 需要记录和返回代表正常或异常的状态值,这是为了方便在多个环节相互衔接时,要是其实一个环节出现异常,其他环节好制定应对策略。
下面是一个简单的 R 脚本,每个 item_id
是一条数据的唯一标识,这个脚本实现的内容也很简单,就是先从 table_model_feature
这个表中获取每个 item_id
的基本特征,然后根据导入的模型文件计算结果即 predict()
这步,最后将结果插入到 table_model_score
这个表中,所有步骤都被包装到了 calc_model_score
这个函数中。
library(xgboost)
library(DBI)
library(rJava)
library(RJDBC)
calc_model_score <- function(item_id) {
print(paste(Sys.time(), ">>>calc model id:", item_id))
# 连接数据库
drv <-
JDBC("oracle.jdbc.driver.OracleDriver", ".../ojdbc6-11.2.0.1.0.jar")
conn <-
dbConnect(drv, "jdbc:oracle:thin:@IP:端口:数据库", "账号", "密码")
# 导入数据
data_test <-
dbGetQuery(conn, "select * from table_model_feature where item_id = ?", item_id)
#如果没查到数据则返回0
data_size <- nrow(data_test)
print(paste(Sys.time(), ">>>result model data rows:", data_size))
if (data_size == 0) {
return(0)
} else
{
colnames(data_test) <- tolower(colnames(data_test))
# 加载模型文件
model <- xgboost::xgb.load('.../xgb_model.model')
# 计算评分
score <- predict(model, newdata = as.matrix(data_test))
result <- cbind(data_test, score)
dbSendUpdate(conn, "delete from table_model_score t where t.item_id = ?", item_id)
print(paste(Sys.time(), "<<<predict model score:", result$score))
dbWriteTable(conn, "table_model_score", result, overwrite = FALSE, append = TRUE, force = TRUE)
dbDisconnect(conn)
return(1)
}
}
上面的脚本中有三处添加了 print()
函数,用来打印每个步骤的结果。在 R 环境中,可以直接调用calc_model_score
这个函数,在别的地方就需要先加载上面的 R 脚本,再调用函数。如果一切正常,在返回计算结果的同时,也会打印如下内容。
> source(".../load_model.R")
> calc_model_score(245849955)
[1] "2023-06-02 17:30:09 >>>calc model id: 245849955"
[1] "2023-06-02 17:30:09 >>>result model data rows: 1"
[1] "2023-06-02 17:30:09 <<<predict model score: 0.0270512625575066"
下面是一段 ORACLE 中的 sql 程序脚本,在业务上一个 policy_id
会对应多个 item_id
,程序实现的内容是这样的:
- 开启一个循环(
for ... loop
)- 1.其他地方传递过来的数据存在
v_item_base_list
这个表中,根据每个policy_id
去获取其对应的所有item_id
。 - 2.使用
f_get_model_type
函数获取每一个item_id
对应的模型类别。 - 3.如果模型类别不为空,调用
p001_calc_item_id
程序计算每个item_id
的基础数据。 - 4.根据判断所得模型类别分别计算各个子模型所需要的特征,当模型类别为
MODEL_TYPE_A
时调用p002_calc_MODEL_TYPE_A
,当模型类别为MODEL_TYPE_B
时调用p003_calc_MODEL_TYPE_B
。 - 5.从一个序列
SEQUENCE
中获取一个唯一 id,连同每个policy_id
、item_id
以及对应的模型类别model_type
一起存入到一个表t_calc_model_type
里,用于将来判断每条数据具体调用哪个模型来计算结果。
- 1.其他地方传递过来的数据存在
- 结束循环(
end loop
) - 返回一个状态值,并且将出现异常时的报错信息存到一个表里。
以上许多步骤也要打印执行结果,只不过在 R 里面打印结果的函数是 print
,在 ORACLE 里面是put_line
。
procedure p004_calc_score(policy_id in number, exec_result out number) is
v_policy_id number;
v_item_id number;
v_model_type varchar2(10);
e_errcode varchar2(500);
v_proc_id number(20);
v_error_desc varchar2(4000);
err_num number(4);
begin
v_policy_id := policy_id;
for item in (select nvl(item_id, 0) item_id
from v_item_base_list
where policy_id = v_policy_id) loop
dbms_output.put_line('item_id:' || item.item_id);
v_model_type := f_get_model_type(item.item_id);
dbms_output.put_line('v_model_type:' || v_model_type);
if v_model_type is not null then
p001_calc_item_id(v_inparam => '',
e_errcode => e_errcode,
policy_id => v_policy_id);
end if;
if v_model_type = 'MODEL_TYPE_A' then
p002_calc_MODEL_TYPE_A(v_inparam => '',
e_errcode => e_errcode,
item_id => item.item_id);
dbms_output.put_line('MODEL_TYPE_A:' || e_errcode);
elsif v_model_type = 'MODEL_TYPE_B' then
p003_calc_MODEL_TYPE_B(v_inparam => '',
e_errcode => e_errcode,
item_id => item.item_id);
dbms_output.put_line('MODEL_TYPE_B:' || e_errcode);
end if;
select SEQUENCE.Nextval into v_proc_id from dual;
delete from t_calc_model_type t where t.policy_id = v_policy_id;
commit;
v_item_id := item.item_id;
insert into t_calc_model_type
(id, policy_id, item_id, model_type, load_date)
values
(v_proc_id, v_policy_id, v_item_id, v_model_type, sysdate);
commit;
end loop;
exec_result := 1;
exception
when others then
v_error_desc := substr(sqlerrm, 1, 2000);
exec_result := -1;
err_num := sqlcode;
insert into T_OLAP_ERRORS
(pkg_name, proc_name, error_num, error_msg, record_date)
values
('pkg_name',
'p004_calc_score',
err_num,
v_error_desc,
sysdate);
commit;
raise;
end;
其中,程序中的序列 SEQUENCE 是在 ORACLE 中创建的。
/*创建一个序列(sequence),每次新增 id 从序列中取值 Nextval*/
create sequence SEQUENCE
minvalue 1
maxvalue 9999999999999999999999999999
start with 1
increment by 1
cache 20;