c语言sscanf函数的用法是什么
236
2022-12-01
hera源码剖析:一次任务触发的执行流程
文章目录
触发任务work端master端
run方法
在 hera 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 Master#run 方法(开发中心任务触发接口在 Master#debug )。
这里就讲一下手动执行的任务触发流程
触发任务
在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 hera 操作文档查看,这里以手动恢复为例
work端
首先看下 work 端的堆栈信息
writeAndFlush:28, NettyChannel (com.dfire.core.netty)writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)execute:409, ScheduleOperatorController (com.dfire.controller)invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)invoke:204, MethodProxy (org.springframework.cglib.proxy)invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)around:72, HeraAspect (com.dfire.config)//省略部分
通过堆栈信息我们可以看到,在 controller 方法被调用之前会先调用一个通过 AOP 实现的权限校验的方法HeraAspect#around,当权限校验通过后会继续调用ScheduleOperatorController#execute 方法,该方法就是任务执行的入口。再往后就是调用 WorkerHandleWebRequest#handleWebExecute 和 WorkerHandleWebRequest#buildMessage 方法来创建 netty 消息体,最后通过一个快速失败的容错方式(FailFastCluster#writeAndFlush)来向 master 发送一条 netty 消息
下面仔细分析下,controller 入口
RequestMapping(value = "/manual", method = RequestMethod.GET) @ResponseBody @ApiOperation("手动执行接口") public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId , @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType, @RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException { //省略部分校验代码 String configs = heraJob.getConfigs(); //新建hera_action_history 对象,并向mysql插入执行记录 HeraJobHistory actionHistory = HeraJobHistory.builder().build(); actionHistory.setJobId(heraAction.getJobId()); actionHistory.setActionId(heraAction.getId()); actionHistory.setTriggerType(triggerTypeEnum.getId()); actionHistory.setOperator(heraJob.getOwner()); actionHistory.setIllustrate(execUser); actionHistory.setStatus(StatusEnum.RUNNING.toString()); actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime()); actionHistory.setHostGroupId(heraAction.getHostGroupId()); heraJobHistoryService.insert(actionHistory); heraAction.setScript(heraJob.getScript()); heraAction.setHistoryId(actionHistory.getId()); heraAction.setConfigs(configs); heraAction.setAuto(heraJob.getAuto()); heraAction.setHostGroupId(heraJob.getHostGroupId()); heraJobActionService.update(heraAction); //向master 发送任务执行的请求 workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId()); String ownerId = getOwnerId(); if (ownerId == null) { ownerId = "0"; } //添加操作记录 addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId); return new JsonResponse(true, String.valueOf(actionId)); }
这部分的代码很简单,主要分为三部分 1.创建 hera_action_history 对象,向 mysql 插入任务的执行记录 2.通过 netty 向 master 发送任务执行的消息 3.添加任务执行记录
需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看
public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException { RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS); if (response.getStatus() == ResponseStatus.Status.ERROR) { ErrorLog.error(response.getErrorText()); } }
这部分代码调用了 WorkerHandleWebRequest.handleWebExecute 并返回一个future,通过 future.get 来阻塞我们的请求,继续往下看
public static Future
在 handleWebExecute 方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 operator 参数为 WebOperate.ExecuteJob,id 为hera_action_history 记录的 id。 然后在 buildMessage 方法中有三个比较关键的代码 1.CountDownLatch latch = new CountDownLatch(1); 该锁会在一个线程池的异步操作中等待,并且会在WorkResponseListener 中被释放。 2.WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);
public class WorkResponseListener extends ResponseListenerAdapter { private RpcWebRequest.WebRequest request; private volatile Boolean receiveResult; private CountDownLatch latch; private RpcWebResponse.WebResponse webResponse; @Override public void onWebResponse(RpcWebResponse.WebResponse response) { if (request.getRid() == response.getRid()) { try { webResponse = response; receiveResult = true; } catch (Exception e) { ErrorLog.error("work release exception {}", e); } finally { latch.countDown(); } } }}
在 onWebResponse 方法中,当发现request.getRid() == response.getRid()时会释放锁,并标志 receiveResult 为 true 3.调用 workContext.getServerChannel().writeAndFlush 方法来向master发送任务执行的消息,在上篇hera源码剖析:项目启动之分布式锁 已经说过 workContext 是什么时候设置的 serverChannel
master端
master 接收所有 netty 消息的处理类为 MasterHandler,也就是说上面work 发送的执行任务请求最终会在MasterHandler#channelRead被调用
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { SocketMessage socketMessage = (SocketMessage) msg; Channel channel = ctx.channel(); switch (socketMessage.getKind()) { //省略部分代码 case WEB_REQUEST: final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build(); switch (webRequest.getOperate()) { case ExecuteJob: completionService.submit(() -> new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest))); break; //省略部分代码 } //省略部分代码 } }
MasterHandler 直接把 work 的任务执行请求异步分发给 MasterHandlerWebResponse.handleWebExecute 来处理,并且返回了一个失败重试封装的 channel
public static WebResponse handleWebExecute(MasterContext context, WebRequest request) { if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) { Long historyId = Long.parseLong(request.getId()); HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId); HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory); context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId())); WebResponse webResponse = WebResponse.newBuilder() .setRid(request.getRid()) .setOperate(WebOperate.ExecuteJob) .setStatus(Status.OK) .build(); TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = {} ", history.getJobId()); return webResponse; } else if (request.getEk() == ExecuteKind.DebugKind) { Long debugId = Long.parseLong(request.getId()); HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId); TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId); context.getMaster().debug(debugHistory); WebResponse webResponse = WebResponse.newBuilder() .setRid(request.getRid()) .setOperate(WebOperate.ExecuteJob) .setStatus(Status.OK) .build(); TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = {}", debugId); return webResponse; } return WebResponse.newBuilder() .setRid(request.getRid()) .setErrorText("未识别的操作类型" + request.getEk()) .setStatus(Status.ERROR) .build(); }
在这里主要是根据request.getEk() 来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:ExecuteKind.ManualKind,直接看 if 部分代码。
首先根据hera_action_history 的 id 来查询在 work 端插入的那条记录调用master#run 方法创建webResponse 对象,返回执行任务 ok 的标志
run方法
public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) { Long actionId = heraJobHistory.getActionId(); //重复job检测 //1:检测任务是否已经在队列或者正在执行 if (checkJobExists(heraJobHistory, false)) { return; } HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId); Set
run 方法的主要功能是将要执行的任务根据类型放到不同的队列。 由于代码较多分段分析
checkJobExists 方法检测任务是否已经在队列或者正在执行,如果是允许重复执行任务或者任务重跑触发的任务不会进行检测对于非执行区域任务直接设置为成功并且广播通知下游任务,该参数由application.yml中的hera.area 配置决定。另外,如果区域设置为all,则所有区域都能执行。在数据库中set 一些执行任务所需的必须值 然后再加入任务队列组装JobElement,该对象最终会被放到执行队列中。主要参数有:costMinute(任务的预计最大执行分钟数)、jobId(任务的执行实例id)、hostGroupId(任务的执行机器组)、priorityLevel(任务的有限级别)、historyId(该任务对应的执行记录 id)、fixedEmr 是否在固定集群执行、owner任务的创建人所在组将任务根据不同的触发类型,放入不同的任务扫描队列,等待master 的扫描线程扫描
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~