hera源码剖析:一次任务触发的执行流程

网友投稿 236 2022-12-01

hera源码剖析:一次任务触发的执行流程

文章目录

​​触发任务​​​​work端​​​​master端​​

​​run方法​​

在 ​​hera​​​ 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 ​​Master#run​​​ 方法(开发中心任务触发接口在 ​​Master#debug​​ )。

这里就讲一下手动执行的任务触发流程

触发任务

在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 ​​hera​​ 操作文档查看,这里以手动恢复为例

work端

首先看下 ​​work​​ 端的堆栈信息

writeAndFlush:28, NettyChannel (com.dfire.core.netty)writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)execute:409, ScheduleOperatorController (com.dfire.controller)invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)invoke:204, MethodProxy (org.springframework.cglib.proxy)invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)around:72, HeraAspect (com.dfire.config)//省略部分

通过堆栈信息我们可以看到,在 ​​controller​​​ 方法被调用之前会先调用一个通过 ​​AOP​​​ 实现的权限校验的方法​​HeraAspect#around​​​,当权限校验通过后会继续调用​​ScheduleOperatorController#execute​​​ 方法,该方法就是任务执行的入口。再往后就是调用 ​​WorkerHandleWebRequest#handleWebExecute​​​ 和 ​​WorkerHandleWebRequest#buildMessage​​​ 方法来创建 ​​netty​​​ 消息体,最后通过一个快速失败的容错方式(​​FailFastCluster#writeAndFlush​​​)来向 ​​master​​​ 发送一条 ​​netty​​ 消息

下面仔细分析下,​​controller​​ 入口

RequestMapping(value = "/manual", method = RequestMethod.GET) @ResponseBody @ApiOperation("手动执行接口") public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId , @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType, @RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException { //省略部分校验代码 String configs = heraJob.getConfigs(); //新建hera_action_history 对象,并向mysql插入执行记录 HeraJobHistory actionHistory = HeraJobHistory.builder().build(); actionHistory.setJobId(heraAction.getJobId()); actionHistory.setActionId(heraAction.getId()); actionHistory.setTriggerType(triggerTypeEnum.getId()); actionHistory.setOperator(heraJob.getOwner()); actionHistory.setIllustrate(execUser); actionHistory.setStatus(StatusEnum.RUNNING.toString()); actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime()); actionHistory.setHostGroupId(heraAction.getHostGroupId()); heraJobHistoryService.insert(actionHistory); heraAction.setScript(heraJob.getScript()); heraAction.setHistoryId(actionHistory.getId()); heraAction.setConfigs(configs); heraAction.setAuto(heraJob.getAuto()); heraAction.setHostGroupId(heraJob.getHostGroupId()); heraJobActionService.update(heraAction); //向master 发送任务执行的请求 workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId()); String ownerId = getOwnerId(); if (ownerId == null) { ownerId = "0"; } //添加操作记录 addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId); return new JsonResponse(true, String.valueOf(actionId)); }

这部分的代码很简单,主要分为三部分 1.创建 ​​​hera_action_history​​​ 对象,向 ​​mysql​​​ 插入任务的执行记录 2.通过 ​​​netty​​​ 向 ​​master​​​ 发送任务执行的消息 3.添加任务执行记录

需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看

public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException { RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS); if (response.getStatus() == ResponseStatus.Status.ERROR) { ErrorLog.error(response.getErrorText()); } }

这部分代码调用了 ​​WorkerHandleWebRequest.handleWebExecute​​​ 并返回一个​​future​​​,通过 ​​future.get​​ 来阻塞我们的请求,继续往下看

public static Future handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id) { return buildMessage(WebRequest.newBuilder() .setRid(AtomicIncrease.getAndIncrement()) .setOperate(WebOperate.ExecuteJob) .setEk(kind) .setId(String.valueOf(id)) .build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id); } private static Future buildMessage(WebRequest request, WorkContext workContext, String errorMsg) { CountDownLatch latch = new CountDownLatch(1); WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null); workContext.getHandler().addListener(responseListener); Future future = workContext.getWorkWebThreadPool().submit(() -> { latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS); if (!responseListener.getReceiveResult()) { ErrorLog.error(errorMsg); } workContext.getHandler().removeListener(responseListener); return responseListener.getWebResponse(); }); try { workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder() .setKind(SocketMessage.Kind.WEB_REQUEST) .setBody(request.toByteString()) .build()); SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId ={}", request.getRid()); } catch (RemotingException e) { workContext.getHandler().removeListener(responseListener); ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e); } return future; }

在 ​​handleWebExecute​​​ 方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 ​​operator​​​ 参数为 ​​WebOperate.ExecuteJob​​​,​​id​​​ 为​​hera_action_history​​​ 记录的 ​​id​​​。 然后在 ​​​buildMessage​​​ 方法中有三个比较关键的代码 1.​​​CountDownLatch latch = new CountDownLatch(1);​​​ 该锁会在一个线程池的异步操作中等待,并且会在​​WorkResponseListener​​​ 中被释放。 2.​​​WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);​​

public class WorkResponseListener extends ResponseListenerAdapter { private RpcWebRequest.WebRequest request; private volatile Boolean receiveResult; private CountDownLatch latch; private RpcWebResponse.WebResponse webResponse; @Override public void onWebResponse(RpcWebResponse.WebResponse response) { if (request.getRid() == response.getRid()) { try { webResponse = response; receiveResult = true; } catch (Exception e) { ErrorLog.error("work release exception {}", e); } finally { latch.countDown(); } } }}

在 ​​onWebResponse​​​ 方法中,当发现​​request.getRid() == response.getRid()​​​时会释放锁,并标志 ​​receiveResult​​​ 为 ​​true​​​ 3.调用 ​​workContext.getServerChannel().writeAndFlush​​ 方法来向master发送任务执行的消息,在上篇​​hera源码剖析:项目启动之分布式锁​​ 已经说过 ​​workContext​​ 是什么时候设置的 ​​serverChannel​​

master端

​​master​​​ 接收所有 ​​netty​​​ 消息的处理类为 ​​MasterHandler​​​,也就是说上面​​work​​​ 发送的执行任务请求最终会在​​MasterHandler#channelRead​​被调用

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SocketMessage socketMessage = (SocketMessage) msg; Channel channel = ctx.channel(); switch (socketMessage.getKind()) { //省略部分代码 case WEB_REQUEST: final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build(); switch (webRequest.getOperate()) { case ExecuteJob: completionService.submit(() -> new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest))); break; //省略部分代码 } //省略部分代码 } }

​​MasterHandler​​​ 直接把 ​​work​​​ 的任务执行请求异步分发给 ​​MasterHandlerWebResponse.handleWebExecute​​​ 来处理,并且返回了一个失败重试封装的 ​​channel​​

public static WebResponse handleWebExecute(MasterContext context, WebRequest request) { if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) { Long historyId = Long.parseLong(request.getId()); HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId); HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory); context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId())); WebResponse webResponse = WebResponse.newBuilder() .setRid(request.getRid()) .setOperate(WebOperate.ExecuteJob) .setStatus(Status.OK) .build(); TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = {} ", history.getJobId()); return webResponse; } else if (request.getEk() == ExecuteKind.DebugKind) { Long debugId = Long.parseLong(request.getId()); HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId); TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId); context.getMaster().debug(debugHistory); WebResponse webResponse = WebResponse.newBuilder() .setRid(request.getRid()) .setOperate(WebOperate.ExecuteJob) .setStatus(Status.OK) .build(); TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = {}", debugId); return webResponse; } return WebResponse.newBuilder() .setRid(request.getRid()) .setErrorText("未识别的操作类型" + request.getEk()) .setStatus(Status.ERROR) .build(); }

在这里主要是根据​​request.getEk()​​​ 来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:​​ExecuteKind.ManualKind​​​,直接看 ​​if​​ 部分代码。

首先根据​​hera_action_history​​ 的 ​​id​​ 来查询在 ​​work​​ 端插入的那条记录调用​​master#run​​ 方法创建​​webResponse​​ 对象,返回执行任务 ​​ok​​ 的标志

run方法

public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) { Long actionId = heraJobHistory.getActionId(); //重复job检测 //1:检测任务是否已经在队列或者正在执行 if (checkJobExists(heraJobHistory, false)) { return; } HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId); Set areaList = areaList(heraJob.getAreaId()); //2:非执行区域任务直接设置为成功 if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA)) { ScheduleLog.info("非{}区域任务,直接设置为成功:{}", HeraGlobalEnv.getArea(), heraJob.getId()); heraAction.setLastResult(heraAction.getStatus()); heraAction.setStatus(StatusEnum.SUCCESS.toString()); heraAction.setHistoryId(heraJobHistory.getId()); heraAction.setReadyDependency("{}"); String host = "localhost"; heraAction.setHost(host); Date endTime = new Date(); heraAction.setStatisticStartTime(endTime); heraAction.setStatisticEndTime(endTime); masterContext.getHeraJobActionService().update(heraAction); heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功"); heraJobHistory.setStatusEnum(StatusEnum.SUCCESS); heraJobHistory.setEndTime(endTime); heraJobHistory.setStartTime(endTime); heraJobHistory.setExecuteHost(host); masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory)); HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory); masterContext.getDispatcher().forwardEvent(successEvent); return; } //3.先在数据库中set一些执行任务所需的必须值 然后再加入任务队列 heraAction.setLastResult(heraAction.getStatus()); heraAction.setStatus(StatusEnum.RUNNING.toString()); heraAction.setHistoryId(heraJobHistory.getId()); heraAction.setStatisticStartTime(new Date()); heraAction.setStatisticEndTime(null); masterContext.getHeraJobActionService().update(heraAction); heraJobHistory.getLog().append(ActionUtil.getTodayString() + " 进入任务队列"); masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory)); boolean isFixed; int priorityLevel = 3; Map configs = StringUtil.convertStringToMap(heraAction.getConfigs()); String priorityLevelValue = configs.get("run.priority.level"); if (priorityLevelValue != null) { priorityLevel = Integer.parseInt(priorityLevelValue); } String areaFixed = HeraGlobalEnv.getArea() + Constants.POINT + Constants.HERA_EMR_FIXED; if (configs.containsKey(Constants.HERA_EMR_FIXED) || configs.containsKey(areaFixed)) { isFixed = Boolean.parseBoolean(configs.get(areaFixed)) || Boolean.parseBoolean(configs.get(Constants.HERA_EMR_FIXED)); } else { isFixed = Boolean.parseBoolean(getInheritVal(heraAction.getGroupId(), areaFixed, Constants.HERA_EMR_FIXED)); } Integer endMinute = masterContext.getHeraJobService().findMustEndMinute(heraAction.getJobId()); //4.组装JobElement JobElement element = JobElement.builder() .jobId(heraJobHistory.getActionId()) .hostGroupId(heraJobHistory.getHostGroupId()) .priorityLevel(priorityLevel) .historyId(heraJobHistory.getId()) .fixedEmr(isFixed) .owner(heraJob.getOwner()) .costMinute(endMinute) .build(); try { element.setTriggerType(heraJobHistory.getTriggerType()); HeraAction cacheAction = heraActionMap.get(element.getJobId()); if (cacheAction != null) { cacheAction.setStatus(StatusEnum.RUNNING.toString()); } //5.放入任务扫描队列 switch (heraJobHistory.getTriggerType()) { case MANUAL: masterContext.getManualQueue().put(element); break; case AUTO_RERUN: masterContext.getRerunQueue().put(element); break; case MANUAL_RECOVER: case SCHEDULE: masterContext.getScheduleQueue().put(element); break; case SUPER_RECOVER: masterContext.getSuperRecovery().put(element); break; default: ErrorLog.error("不支持的调度类型:{},id:{}", heraJobHistory.getTriggerType().toName(), heraJobHistory.getActionId()); break; } } catch (InterruptedException e) { ErrorLog.error("添加任务" + element.getJobId() + "失败", e); } }

​​run​​​ 方法的主要功能是将要执行的任务根据类型放到不同的队列。 由于代码较多分段分析

​​checkJobExists​​ 方法检测任务是否已经在队列或者正在执行,如果是允许重复执行任务或者任务重跑触发的任务不会进行检测对于非执行区域任务直接设置为成功并且广播通知下游任务,该参数由application.yml中的​​hera.area​​​ 配置决定。另外,如果区域设置为​​all​​,则所有区域都能执行。在数据库中​​set​​ 一些执行任务所需的必须值 然后再加入任务队列组装​​JobElement​​​,该对象最终会被放到执行队列中。主要参数有:​​costMinute(任务的预计最大执行分钟数)、jobId(任务的执行实例id)、hostGroupId(任务的执行机器组)、priorityLevel(任务的有限级别)、historyId(该任务对应的执行记录 id)、fixedEmr 是否在固定集群执行、owner任务的创建人所在组​​将任务根据不同的触发类型,放入不同的任务扫描队列,等待​​master​​ 的扫描线程扫描

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:记一次 spark rdd 写数据到 hbase 报 NPE 的问题排查
下一篇:springboot集成camunda的实现示例
相关文章

 发表评论

暂时没有评论,来抢沙发吧~