001 /* 002 * Copyright 2011 The Kuali Foundation. 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.kfs.sys.batch; 017 018 019 import java.net.InetAddress; 020 import java.net.UnknownHostException; 021 import java.text.DateFormat; 022 import java.text.ParseException; 023 import java.text.SimpleDateFormat; 024 import java.util.Date; 025 import java.util.Iterator; 026 import java.util.List; 027 028 import org.apache.commons.lang.StringUtils; 029 import org.apache.log4j.Appender; 030 import org.apache.log4j.Logger; 031 import org.kuali.kfs.gl.GeneralLedgerConstants; 032 import org.kuali.kfs.sys.KFSConstants; 033 import org.kuali.kfs.sys.batch.service.SchedulerService; 034 import org.kuali.kfs.sys.context.ProxyUtils; 035 import org.kuali.kfs.sys.context.SpringContext; 036 import org.kuali.kfs.sys.service.impl.KfsParameterConstants; 037 import org.kuali.rice.kew.exception.WorkflowException; 038 import org.kuali.rice.kns.UserSession; 039 import org.kuali.rice.kns.service.DateTimeService; 040 import org.kuali.rice.kns.service.ParameterEvaluator; 041 import org.kuali.rice.kns.service.ParameterService; 042 import org.kuali.rice.kns.util.ErrorMap; 043 import org.kuali.rice.kns.util.GlobalVariables; 044 import org.kuali.rice.kns.util.KNSConstants; 045 import org.kuali.rice.kns.util.MessageList; 046 import org.quartz.InterruptableJob; 047 import org.quartz.JobDataMap; 048 import org.quartz.JobExecutionContext; 049 import org.quartz.JobExecutionException; 050 import org.quartz.StatefulJob; 051 import org.quartz.UnableToInterruptJobException; 052 import org.springframework.aop.framework.Advised; 053 import org.springframework.aop.support.AopUtils; 054 import org.springframework.util.StopWatch; 055 056 public class Job implements StatefulJob, InterruptableJob { 057 058 public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP"; 059 public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP"; 060 public static final String STEP_RUN_PARM_NM = "RUN_IND"; 061 public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE"; 062 public static final String STEP_USER_PARM_NM = "USER"; 063 064 private static final Logger LOG = Logger.getLogger(Job.class); 065 private SchedulerService schedulerService; 066 private ParameterService parameterService; 067 private DateTimeService dateTimeService; 068 private List<Step> steps; 069 private Step currentStep; 070 private Appender ndcAppender; 071 private boolean notRunnable; 072 private transient Thread workerThread; 073 074 /** 075 * @see org.quartz.Job#execute(org.quartz.JobExecutionContext) 076 */ 077 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { 078 workerThread = Thread.currentThread(); 079 if (isNotRunnable()) { 080 if (LOG.isInfoEnabled()) { 081 LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName()); 082 } 083 return; 084 } 085 int startStep = 0; 086 try { 087 startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP)); 088 } 089 catch (NumberFormatException ex) { 090 // not present, do nothing 091 } 092 int endStep = 0; 093 try { 094 endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP)); 095 } 096 catch (NumberFormatException ex) { 097 // not present, do nothing 098 } 099 Date jobRunDate = dateTimeService.getCurrentDate(); 100 int currentStepNumber = 0; 101 try { 102 LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap())); 103 for (Step step : getSteps()) { 104 currentStepNumber++; 105 // prevent starting of the next step if the thread has an interrupted status 106 if (workerThread.isInterrupted()) { 107 LOG.warn("Aborting Job execution due to manual interruption"); 108 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE); 109 return; 110 } 111 if (startStep > 0 && currentStepNumber < startStep) { 112 if (LOG.isInfoEnabled()) { 113 LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep); 114 } 115 continue; // skip to next step 116 } 117 else if (endStep > 0 && currentStepNumber > endStep) { 118 if (LOG.isInfoEnabled()) { 119 LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep); 120 } 121 break; 122 } 123 step.setInterrupted(false); 124 try { 125 if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) { 126 break; 127 } 128 } 129 catch (InterruptedException ex) { 130 LOG.warn("Stopping after step interruption"); 131 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE); 132 return; 133 } 134 if (step.isInterrupted()) { 135 LOG.warn("attempt to interrupt step failed, step continued to completion"); 136 LOG.warn("cancelling remainder of job due to step interruption"); 137 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE); 138 return; 139 } 140 } 141 } 142 catch (Exception e) { 143 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE); 144 throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false); 145 } 146 LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName()); 147 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE); 148 } 149 150 public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException { 151 152 boolean continueJob = true; 153 if (GlobalVariables.getUserSession() == null) { 154 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>")); 155 } 156 else { 157 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName())); 158 } 159 160 if (!skipStep(parameterService, step, jobRunDate)) { 161 162 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step); 163 Class stepClass = unProxiedStep.getClass(); 164 165 GlobalVariables.setErrorMap(new ErrorMap()); 166 GlobalVariables.setMessageList(new MessageList()); 167 168 String stepUserName = KFSConstants.SYSTEM_USER; 169 if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) { 170 stepUserName = parameterService.getParameterValue(stepClass, STEP_USER_PARM_NM); 171 } 172 if (LOG.isInfoEnabled()) { 173 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName)); 174 } 175 GlobalVariables.setUserSession(new UserSession(stepUserName)); 176 if (LOG.isInfoEnabled()) { 177 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass)); 178 } 179 StopWatch stopWatch = new StopWatch(); 180 stopWatch.start(jobName); 181 try { 182 continueJob = step.execute(jobName, jobRunDate); 183 } 184 catch (InterruptedException e) { 185 LOG.error("Exception occured executing step", e); 186 throw e; 187 } 188 catch (RuntimeException e) { 189 LOG.error("Exception occured executing step", e); 190 throw e; 191 } 192 stopWatch.stop(); 193 LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString()); 194 if (!continueJob) { 195 LOG.info("Stopping job after successful step execution"); 196 } 197 } 198 199 LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName())); 200 return continueJob; 201 } 202 203 /** 204 * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters. 205 * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE. 206 * If RUN_DATE exists, but contains an empty value the step will not be skipped. 207 */ 208 protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) { 209 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step); 210 Class stepClass = unProxiedStep.getClass(); 211 212 DateTimeService dTService = SpringContext.getBean(DateTimeService.class); 213 String dateFormat = parameterService.getParameterValue(KNSConstants.KNS_NAMESPACE, KNSConstants.DetailTypes.ALL_DETAIL_TYPE, KNSConstants.SystemGroupParameterNames.DATE_TO_STRING_FORMAT_FOR_USER_INTERFACE); 214 215 //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step 216 //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step 217 218 boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM); 219 boolean runInd = (runIndExists ? parameterService.getIndicatorParameter(stepClass, STEP_RUN_PARM_NM) : true); 220 221 boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM); 222 boolean runDateIsEmpty = (runDateExists ? StringUtils.isEmpty(parameterService.getParameterValue(stepClass, STEP_RUN_ON_DATE_PARM_NM)) : true); 223 boolean runDateContainsTodaysDate = (runDateExists ? parameterService.getParameterValues(stepClass, STEP_RUN_ON_DATE_PARM_NM).contains(dTService.toString(jobRunDate, dateFormat)): true); 224 225 if (!runInd && !runDateExists) { 226 if (LOG.isInfoEnabled()) { 227 LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName()); 228 } 229 return true; 230 } 231 else if (!runInd && !runDateIsEmpty && !runDateContainsTodaysDate) { 232 if (LOG.isInfoEnabled()) { 233 LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + " and " + STEP_RUN_ON_DATE_PARM_NM +" for "+ stepClass.getName()); 234 } 235 return true; 236 } 237 else if (!runIndExists && !runDateIsEmpty && !runDateContainsTodaysDate) { 238 if (LOG.isInfoEnabled()) { 239 LOG.info("Skipping step due to system parameter: " + STEP_RUN_ON_DATE_PARM_NM +" for "+ stepClass.getName()); 240 } 241 return true; 242 } 243 else { //run step 244 return false; 245 } 246 } 247 248 /** 249 * @throws UnableToInterruptJobException 250 */ 251 public void interrupt() throws UnableToInterruptJobException { 252 // ask the step to interrupt 253 if (currentStep != null) { 254 currentStep.interrupt(); 255 } 256 // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps 257 workerThread.interrupt(); 258 } 259 260 public void setParameterService(ParameterService parameterService) { 261 this.parameterService = parameterService; 262 } 263 264 public void setSteps(List<Step> steps) { 265 this.steps = steps; 266 } 267 268 public Appender getNdcAppender() { 269 return ndcAppender; 270 } 271 272 public void setNdcAppender(Appender ndcAppender) { 273 this.ndcAppender = ndcAppender; 274 } 275 276 public void setNotRunnable(boolean notRunnable) { 277 this.notRunnable = notRunnable; 278 } 279 280 protected boolean isNotRunnable() { 281 return notRunnable; 282 } 283 284 public ParameterService getParameterService() { 285 return parameterService; 286 } 287 288 public List<Step> getSteps() { 289 return steps; 290 } 291 292 public void setSchedulerService(SchedulerService schedulerService) { 293 this.schedulerService = schedulerService; 294 } 295 296 public void setDateTimeService(DateTimeService dateTimeService) { 297 this.dateTimeService = dateTimeService; 298 } 299 300 protected String jobDataMapToString(JobDataMap jobDataMap) { 301 StringBuilder buf = new StringBuilder(); 302 buf.append("{"); 303 Iterator keys = jobDataMap.keySet().iterator(); 304 boolean hasNext = keys.hasNext(); 305 while (hasNext) { 306 String key = (String) keys.next(); 307 Object value = jobDataMap.get(key); 308 buf.append(key).append("="); 309 if (value == jobDataMap) { 310 buf.append("(this map)"); 311 } 312 else { 313 buf.append(value); 314 } 315 hasNext = keys.hasNext(); 316 if (hasNext) { 317 buf.append(", "); 318 } 319 } 320 buf.append("}"); 321 return buf.toString(); 322 } 323 324 protected String getMachineName() { 325 try { 326 return InetAddress.getLocalHost().getHostName(); 327 } 328 catch (UnknownHostException e) { 329 return "Unknown"; 330 } 331 } 332 }