风控系统(risk-admittance)业务框架
流程介绍
根据系统流程介绍可知,系统核心是由多条流程组成,下图为流程类的类图
classDiagram
Flow <|.. BaseFlow : implements
Flow: + void main(bizNo, params)
Flow: + void access(access, type, reqParam)
Flow: + void access(beanName, reqParam)
BaseFlow <|-- CompanyFlow : Inheritance
BaseFlow <|-- GuaranteeFlow : Inheritance
BaseFlow <|-- MerchFlow : Inheritance
BaseFlow <|-- PersonalFlow : Inheritance
BaseFlow <|-- PreFlow : Inheritance
BaseFlow <|-- ReleFlow : Inheritance
BaseFlow: +ProcessHandler start
BaseFlow: +ProcessHandler terminate
BaseFlow: +HandlerContextPipeLine pipeLine
BaseFlow: +Map<String, Map<String, ConditionFunc>> accessMap
BaseFlow: + void main(bizNo, params)
BaseFlow: + void access(access, type, reqParam)
BaseFlow: + void access(beanName, reqParam)
BaseFlow: + abstract void init()
BaseFlow: + void afterPropertiesSet()
CompanyFlow:+void init()
GuaranteeFlow:+void init()
MerchFlow:+void init()
PersonalFlow:+void init()
PreFlow:+void init()
ReleFlow:+void init()
Flow
- main: 从starthandler开始执行整个流程
- access(access, type, reqParam): 从mq接入流程内
- access:接入节点对象
- type:是pass|cancel|reject
- reqParam:进件大对象
- access(beanName, reqParam): 从测试接口接入流程内,只测某个或某些处理器
- access:开始节点beanName
- reqParam:进件大对象
BaseFlow
- start: 开始处理器
- terminate: 结束处理器
- pipeLine: 流程中的所有功能处理器都是由pipeLine进行串联的
- accessMap: 所有接入节点的集合,以接入节点对象的beanName为key,ConditionFunc对象为value的map结构体
- main: 实现接口
- access(access, type, reqParam): 实现接口
- access(beanName, reqParam): 实现接口
- init(): 抽象方法,子类实现
- afterPropertiesSet: 初始化HandlerContextPipeLine
CompanyFlow和所有子流程
功能处理器
每个流程是由多个功能处理器组成
特性介绍
我们通过两个纬度对每个功能处理器赋予特性:
- 流程节点 | 接入节点
public enum HandlerType {
/**
* 异步接收外部系统mq,将其接入到流程的节点定义为接入节点
*/
ACCESS("接入节点", 1),
/**
* 每个flow初始化时定义的handler节点
*/
PROCESS("流程节点", 2);
}
2. 静态特性 | 动态特性
- 静态特性:该功能处理器在初始化时已知前驱节点和后驱节点
- 动态特性:该功能处理器根据运行过程中的数据判断是否有后驱节点和后驱节点的beanName
由以上两种特性可衍生出三种节点类型
public enum FeatureType {
/**
- 根据初始化的顺序已经指定前驱节点和后驱节点
*/
NORMAL("普通节点", 1),
/**
- 该节点为结束节点,有两种节点会是结束节点
-
- 请求外部系统,等待异步接收mq消息返回
-
- 最后一个节点
*/
OVER("结束节点", 2),
/**
- 根据运行时的数据判断该节点为以下三种状态
-
- 该节点为结束节点
-
- 该节点下个节点为初始化默认的后驱节点
-
- 该节点的后驱节点为计算后的节点
*/
CONDITION("条件节点", 3);
}
一个流程中的所有节点,主要分为流程节点和接入节点,其次根据每个节点的动静特性分为普通节点/结束节点/条件节点
但是无论是流程节点还是接入节点,都会被封装为DefaultHandlerContext对象
public class DefaultHandlerContext<T> extends AbstractHandlerContext {
/**
* 空节点,next节点等于该节点,表示当前节点为结束节点
*/
public static final DefaultHandlerContext NULL_OBJECT = new DefaultHandlerContext();
/**
* 业务处理handler
*/
private ProcessHandler<T> handler;
/**
* handler的beanName
*/
private String beanName;
...
}
public abstract class AbstractHandlerContext<T> implements ProcessHandler<T> {
/**
* 默认为普通节点
*/
protected FeatureType featureType = FeatureType.NORMAL;
/**
* 默认为流程节点
*/
protected HandlerType handlerType = HandlerType.PROCESS;
/**
* 当前节点的前驱节点
*/
protected DefaultHandlerContext prev;
/**
* 当前节点的后驱节点
*/
protected DefaultHandlerContext next;
/**
* 当前节点为条件节点时的执行语句,以判断当前节点是否是结束节点,如果不是,获取下个节点的handler beanName
*/
protected ConditionFunc<T> conditionFun;
...
}
流程节点介绍
DefaultHandlerContext中的handler类图如下:
classDiagram
ProcessHandler <|.. AbstractProcessHandler : implements
ProcessHandler: + String process(reqParam)
ProcessHandler: + String getBeanName()
ProcessHandler: + interface Internal<T>
AbstractProcessHandler <|-- inTransitOrder : Inheritance
AbstractProcessHandler <|-- fraudQuery : Inheritance
AbstractProcessHandler <|-- fraudReceiver : Inheritance
AbstractProcessHandler <|-- 其它子类功能处理器 : Inheritance
AbstractProcessHandler: +String beanName
AbstractProcessHandler: +Internal<T> internal
AbstractProcessHandler: + T process(reqParam)
AbstractProcessHandler: + void doBefore(reqParam)
AbstractProcessHandler: + void doAfter(reqParam, result)
AbstractProcessHandler: + T abstract doProcess(reqParam)
AbstractProcessHandler: + Invoker<T> getInvoker()
AbstractProcessHandler: + final class InternalImpl implements Internal<T>
inTransitOrder:+Invoker invoker
inTransitOrder:+Void doProcess(ReqParam reqParam)
fraudQuery:+Invoker invoker
fraudQuery:+Void doProcess(ReqParam reqParam)
fraudReceiver:+Invoker invoker
fraudReceiver:+Void doProcess(ReqParam reqParam)
其它子类功能处理器:+Invoker invoker
其它子类功能处理器:+Void doProcess(ReqParam reqParam)
ProcessHandler
- process: 处理业务方法
- getBeanName: 获取该handler beanName
- interface Internal: 内部接口 收集该handler处理器的返回值和需要更新的表数据等
AbstractProcessHandler
-
process 处理业务组合方法
public T process(ReqParam reqParam) {
doBefore(reqParam);
T result = doProcess(reqParam);
doAfter(reqParam, result);
internal.intern(reqParam, result);
return result;
}
-
doBefore: 调用业务处理器的前置方法, 默认实现
-
doProcess: 抽象方法,供子类处理器实现,真正业务处理方法
-
doAfter: 调用业务处理器的后置方法,默认实现
-
InternalImpl: 实现接口Internal,完成对业务处理运行时的数据收集,可供前端展示和问题定位分析使用
子类功能处理器(inTransitOrder、fraudQuery、fraudReceiver等)
- invoker: 处理非业务的数据处理执行器,如根据返回值判断该节点是否需要操作数据库、操作哪些表、返回值的存储等
- doProcess: 处理该功能处理器的业务方法,如在途订单:调用第三方接口,获取数据以判断是否含有在途订单等业务逻辑
流程内的节点可以是普通节点、结束节点、条件节点;
举例分析:
- 结束节点:
- fraudQuery:反欺诈异步请求节点,流程在执行该节点后,线程已结束,等待策略mq消息返回
- 普通节点:
- quotaMatchOfProList:多金融方案进行匹配完后,默认next节点就是discriOver调度节点
- 条件节点:
- prePaymentQuery:额度缓存判断处理器,额度缓存存在则下个节点是quotaMatchOfProList(多金融方案匹配节点),不存在则异步请求基础风控策略,当前节点为结束节点
- fraudReceiver:反欺诈策略接收处理器,反欺诈通过则下个节点是prePaymentQuery(额度缓存处理器节点),反欺诈拒绝则下个节点为preMqSendV3(发送mq消息)
接入节点介绍
Access类图如下:
classDiagram
FlowAccess <|.. BaseAccess : implements
FlowAccess: + String getBeanName()
BaseAccess <|-- FraudAccess : Inheritance
BaseAccess <|-- KnoxAccess : Inheritance
BaseAccess <|-- AuditAccess : Inheritance
BaseAccess <|-- 其它子类接入流程处理器 : Inheritance
BaseAccess: +String beanName
BaseAccess: + void pass(reqParam)
BaseAccess: + void reject(reqParam)
BaseAccess: + void cancel(reqParam)
FlowAccess:
- getBeanName: 获取该Access beanName
BaseAccess
- getBeanName: 获取该Access beanName
- pass: 通过(基础风控通过、电审通过、提额通过、担保通过等)
- reject: 拒绝(基础风控拒绝、电审拒绝、提额拒绝、担保拒绝等)
- cancel: 取消(提额拒绝、担保拒绝等)
子类接入流程处理器
- 复写父类的pass/reject/cancel方法
执行流程分析:
以前置三期流程为例
类图如下:
classDiagram
Flow接口 <|.. BaseFlow基础实现类 : 实现
Flow接口: + String getBeanName()
BaseFlow基础实现类 <|-- PreFLow : 继承
BaseFlow基础实现类: +String beanName
BaseFlow基础实现类: + void pass(reqParam)
BaseFlow基础实现类: + void reject(reqParam)
BaseFlow基础实现类: + void cancel(reqParam)
PreFLow --* HandlerContextPipeLine : Contains 1:1
HandlerContextPipeLine --> "many" DefaultHandlerContext流程内节点 : Contains 1:many
AbstractHandlerContext <|-- DefaultHandlerContext流程内节点 : 继承
DefaultHandlerContext流程内节点 "1" --> "1..*" ProcessHandler实现类处理器: handler属性
AbstractProcessHandler基础实现类 <|-- ProcessHandler实现类处理器: 继承
ProcessHandler业务处理器接口 <|.. AbstractProcessHandler基础实现类 : 实现
ProcessHandler业务处理器接口 <|.. AbstractHandlerContext : 实现
AbstractHandlerContext <|-- DefaultHandlerContext接入节点 : 继承
FlowAccess接入接口 <|.. BaseAccess基础实现类 : 实现
FlowAccess接入接口: + String getBeanName()
BaseAccess基础实现类 <|-- 接入节点实现类 : 继承
BaseAccess基础实现类: +String beanName
BaseAccess基础实现类: + void pass(reqParam)
BaseAccess基础实现类: + void reject(reqParam)
BaseAccess基础实现类: + void cancel(reqParam)
DefaultHandlerContext接入节点 --* 接入节点实现类 : 包含access BeanName和ConditionFunc执行器
流程图:参考系统流程.md文件
所用到的功能处理器归纳如下:
beanName |
描述 |
流程/接入节点 |
静态/动态特性 |
beanName |
描述 |
流程/接入节点 |
静态/动态特性 |
start |
开始节点 |
流程节点 |
普通节点 |
inTransitOrder |
判断在途订单节点 |
流程节点 |
条件节点 |
fraudQuery |
反欺诈异步请求节点 |
流程节点 |
结束节点 |
fraudAccess |
反欺诈mq接收节点 |
接入节点 |
条件节点 |
fraudReceiver |
反欺诈接收节点 |
流程节点 |
条件节点 |
KnoxAccess |
基础风控策略接入节点 |
接入节点 |
条件节点 |
prePaymentQuery |
额度缓存节点 |
流程节点 |
条件节点 |
AuditAccess |
电审策略接入节点 |
接入节点 |
条件节点 |
quotaMatch |
金融方案匹配节点 |
流程节点 |
条件节点 |
QuotaAccess |
提额接入节点 |
接入节点 |
条件节点 |
quotaMatchOfProList |
多金融方案匹配节点 |
流程节点 |
普通节点 |
GuaAccess |
担保接入节点 |
接入节点 |
条件节点 |
discriOver |
调度节点 |
流程节点 |
条件节点 |
GuaReuseAccess |
担保复用接入节点 |
接入节点 |
条件节点 |
xQuery |
X策略异步请求节点 |
流程节点 |
结束节点 |
DownPayAccess |
高首付接入节点 |
接入节点 |
条件节点 |
xReceiver |
X策略接收节点 |
流程节点 |
普通节点 |
xPolicyAccess |
x策略mq接收节点 |
接入节点 |
条件节点 |
recommendBankQuery |
银行推荐策略异步请求节点 |
流程节点 |
结束节点 |
recommendBankAccess |
银行推荐策略mq接收节点 |
接入节点 |
条件节点 |
recommendBankReceiver |
银行推荐策略接收节点 |
流程节点 |
条件节点 |
guaranteeQuery |
可担保策略异步请求节点 |
流程节点 |
结束节点 |
guaPolicyAccess |
可担保策略mq接收节点 |
接入节点 |
条件节点 |
guaranteeReceive |
可担保策略接收节点 |
流程节点 |
条件节点 |
preMqSend |
前置发送mq消息节点 |
流程节点 |
条件节点 |
preMqSendV3 |
前置三期发送mq消息节点 |
流程节点 |
普通节点 |
terminate |
结束处理器 |
流程节点 |
结束节点 |
初始化流程
BaseFlow#
public void afterPropertiesSet() throws Exception {
pipeLine = new HandlerContextPipeLine(new DefaultHandlerContext(start), new DefaultHandlerContext(terminate, FeatureType.OVER));
init();
}
HandlerContextPipeLine#
// 当前flow流程的所有DefaultHandlerContext
private Map<String, DefaultHandlerContext> contextMap = new HashMap<String, DefaultHandlerContext>() {{
// 包含NULL_OBJECT
put(null, DefaultHandlerContext.NULL_OBJECT);
}};
/**
* HandlerContextPipeLine 构造函数
* @param head
* @param end
*/
public HandlerContextPipeLine(DefaultHandlerContext head, DefaultHandlerContext end) {
this.head = head;
this.end = end;
contextMap.put(head.getBeanName(), head);
contextMap.put(end.getBeanName(), end);
head.setNext(end);
end.setPrev(head);
}
- 先初始化流程中的pipeLine,每个HandlerContextPipeLine初始化都有start和terminate节点
- pipeLine中的每个节点都有前驱节点和后驱节点属性
- contextMap:该流程中所包含的所有功能处理器
preflow#
/**
* 流程节点初始化
*/
@Override
@SuppressWarnings("unchecked")
public void init() {
pipeLine
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("inTransitOrder"), inTransitFun))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudQuery"), FeatureType.OVER))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudReceiver"), fraudReceiverFun))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("prePaymentQuery"), prePaymentFun))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("quotaMatch"), quotaMatchFun))
.add(new DefaultHandlerContext(HandlerUtil.getHandler("quotaMatchOfProList")))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("discriOver"), discrFun))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("xQuery"), FeatureType.OVER))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("xReceiver")))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("recommendBankQuery"), recommendBankQryFun))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("recommendBankReceiver"), recommendBankRecFun))
.add(new DefaultHandlerContext(HandlerUtil.getHandler("guaranteeQuery"), FeatureType.OVER))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("guaranteeReceive"), guaFun))
.add(new DefaultHandlerContext<>(HandlerUtil.getHandler("preMqSend"), preMqSendFun))
.add(new DefaultHandlerContext(HandlerUtil.getHandler("preMqSendV3")));
}
HandlerContextPipeLine#
/**
-
新增节点时: 当前节点是普通节点时,设置当前节点的next节点为end
-
--------- 上游节点是普通节点时,设置当前节点为上游节点的next节点
-
-
@param handlerContext
-
@return
*/
public HandlerContextPipeLine add(DefaultHandlerContext handlerContext) {
DefaultHandlerContext preContext = end.getPrev();
end.setPrev(handlerContext);
if (handlerContext.getFeatureType() == FeatureType.NORMAL) {
handlerContext.setNext(end);
}
handlerContext.setPrev(preContext);
if (preContext.getFeatureType() == FeatureType.NORMAL) {
preContext.setNext(handlerContext);
}
contextMap.put(handlerContext.getBeanName(), handlerContext);
return this;
}
DefaultHandlerContext#
public DefaultHandlerContext(ProcessHandler handler, FeatureType type) {
this(handler);
this.featureType = type;
if (featureType == FeatureType.OVER) {
next = NULL_OBJECT;
}
}
> - 当前节点为普通节点时,会初始化前驱节点和后驱节点
> - 当前节点为条件节点时,初始化不会设置next节点
> - 当前节点为结束节点时,初始化next节点为NULL_OBJECT
---
#### 进件流程
初始化结束后,进件执行流程如下:
根据进件选择执行具体流程
RiskExecutor#
public void execute(String bizNo, RiskAdmittanceQueryParams params) {
Flow flow = getFlow(params);
flow.main(bizNo, params);
}
执行flow的main方法:
BaseFlow#
/**
* 从starthandler开始流程
*
* @param bizNo
* @param params
*/
@Override
public void main(String bizNo, RiskAdmittanceQueryParams params) {
pipeLine.start(getDataUtil().initReqParam(bizNo, params), getDataUtil());
}
执行HandlerContextPipeLine的start方法:
HandlerContextPipeLine#
/**
* 流程的开始节点
*
* @param reqParam
* @param dataUtil
*/
public void start(ReqParam reqParam, DataUtil dataUtil) {
head.execute(reqParam, dataUtil, handlerFun);
}
执行AbstractHandlerContext的execute方法:
AbstractHandlerContext#
void execute(ReqParam reqParam, DataUtil dataUtil, Function<String, DefaultHandlerContext> handlerFun) {
log.info(" === bizNo: {} - execute {} handler start", reqParam.getBizNo(), this.getBeanName());
DefaultHandlerContext temp;
if (FeatureType.CONDITION == featureType) {
temp = handlerFun.apply(conditionFun.invoke(reqParam, this.process(reqParam)));
} else {
this.process(reqParam);
temp = next;
}
// 对数据进行落库操作
updateData(reqParam, dataUtil);
// 判断该handler处理完是否终结
if (temp == NULL_OBJECT) {
log.info("=== bizNo: {} - execute {} handler end,当前流程已结束,数据:{}", reqParam.getBizNo(), this.getBeanName(), JSON.toJSONString(reqParam));
return;
}
log.info("=== bizNo: {} - execute {} handler end, 当前handler处理后的数据:{},下个handlerName:{}", reqParam.getBizNo(), this.getBeanName(), JSON.toJSONString(reqParam.getCurrentInfo()), temp.getBeanName());
temp.execute(reqParam, dataUtil, handlerFun);
}
> - 判断当前节点是否是条件节点
> - 是,执行当前节点process方法,将返回值传入条件对象ConditFunc中,获取next节点
> - 否,执行当前节点process方法,获取当前节点的默认next节点
> - 根据当前节点的HandlerInfo数据进行落库操作
> - 判断下个节点是否是NULL_OBJECT
> - 是,当前线程结束,打印数据
> - 否,执行下个节点的execute方法
---
#### 接入流程
> 接入节点通常都是条件节点
以反欺诈接入节点为例:
@Component
@Slf4j
@Data
public class FraudAccess extends BaseAccess {
}
@Access
@Data
@Slf4j
public abstract class BaseAccess implements FlowAccess {
...
/**
- 通过
-
- @param reqParam
*/
public void pass(ReqParam reqParam) {
log.info("进入接入{}类内,调用pass方法", this.getBeanName());
// 额度匹配
reqParam.getFlow().access(this, "pass", reqParam);
}
...
}
> FraudAccess是空实现,直接调用父类方法
> 通过reqParam可知当前接入节点所接入的流程,从而调用BaseFlow中的access方法
BaseFlow#
Map<String, Map<String, ConditionFunc>> accessMap = new HashMap<String, Map<String, ConditionFunc>>() {{
// 反欺诈
put("fraudAccess",
new HashMap<String, ConditionFunc>() {{
put("pass", (reqParam, o) -> "fraudReceiver");
}});
...
}};
/**
* 从mq接入流程内
*
* @param access
* @param reqParam
* @param type
*/
@Override
public void access(FlowAccess access, String type, ReqParam reqParam) {
log.info("access:{}, type:{} 接入流程内", access.getBeanName(), type);
String funcKey = access.getBeanName();
ConditionFunc func = null;
if (accessMap.containsKey(funcKey)) {
func = accessMap.get(funcKey).get(type);
} else if (access instanceof MatchQuotaAccess) {
func = accessMap.get("matchAccess").get(type);
}
Assert.notNull(func, access.getBeanName() + "func should not null");
pipeLine.access(funcKey, func, reqParam, getDataUtil());
}
...
}
> accessMap: 流程初始化时,以access beanName为key,ConditionFunc(确认下个节点的beanName)对象为value的map数据
> access: 通过access beanName获取条件对象ConditionFunc, 调用HandlerContextPipeLine#access方法
HandlerContextPipeLine#
/**
* mq接收数据接入流程
*
* @param beanName access name
* @param func 要执行的func
* @param reqParam
* @param dataUtil
*/
public void access(String beanName, ConditionFunc<ReqParam> func, ReqParam reqParam, DataUtil dataUtil) {
log.info("mq接收数据节点接入HandlerContextPipeLine内,beanName:{}", beanName);
DefaultHandlerContext<ReqParam> handlerContext = new DefaultHandlerContext<>(func, beanName);
handlerContext.execute(reqParam, dataUtil, handlerFun);
}
> 1. 通过beanName和ConditionFunc对象初始化DefaultHandlerContext对象
> 2. 执行AbstractContext.execute方法,由上述进件流程可知,因1创建的DefaultHandlerContext是条件节点,执行ConditionFunc执行器获取下个节点(流程中的节点),从而进入流程节点
ConditionFunc执行器接口定义:
@FunctionalInterface
public interface ConditionFunc {
String invoke(ReqParam reqParam, T t);
}
---
总结:该篇主要介绍系统业务架构的思想
1. 首先根据业务性质区分功能处理器特性
2. 用HandlerContextPipeLine将流程内的节点进行串接
3. 每个节点设置prev(前驱节点)和next(后驱节点),以达到系统灵活的扩展性