001 /*
002 * Copyright 2011 The Kuali Foundation.
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.kfs.sys.batch;
017
018
019 import java.net.InetAddress;
020 import java.net.UnknownHostException;
021 import java.text.DateFormat;
022 import java.text.ParseException;
023 import java.text.SimpleDateFormat;
024 import java.util.Date;
025 import java.util.Iterator;
026 import java.util.List;
027
028 import org.apache.commons.lang.StringUtils;
029 import org.apache.log4j.Appender;
030 import org.apache.log4j.Logger;
031 import org.kuali.kfs.gl.GeneralLedgerConstants;
032 import org.kuali.kfs.sys.KFSConstants;
033 import org.kuali.kfs.sys.batch.service.SchedulerService;
034 import org.kuali.kfs.sys.context.ProxyUtils;
035 import org.kuali.kfs.sys.context.SpringContext;
036 import org.kuali.kfs.sys.service.impl.KfsParameterConstants;
037 import org.kuali.rice.kew.exception.WorkflowException;
038 import org.kuali.rice.kns.UserSession;
039 import org.kuali.rice.kns.service.DateTimeService;
040 import org.kuali.rice.kns.service.ParameterEvaluator;
041 import org.kuali.rice.kns.service.ParameterService;
042 import org.kuali.rice.kns.util.ErrorMap;
043 import org.kuali.rice.kns.util.GlobalVariables;
044 import org.kuali.rice.kns.util.KNSConstants;
045 import org.kuali.rice.kns.util.MessageList;
046 import org.quartz.InterruptableJob;
047 import org.quartz.JobDataMap;
048 import org.quartz.JobExecutionContext;
049 import org.quartz.JobExecutionException;
050 import org.quartz.StatefulJob;
051 import org.quartz.UnableToInterruptJobException;
052 import org.springframework.aop.framework.Advised;
053 import org.springframework.aop.support.AopUtils;
054 import org.springframework.util.StopWatch;
055
056 public class Job implements StatefulJob, InterruptableJob {
057
058 public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
059 public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
060 public static final String STEP_RUN_PARM_NM = "RUN_IND";
061 public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
062 public static final String STEP_USER_PARM_NM = "USER";
063
064 private static final Logger LOG = Logger.getLogger(Job.class);
065 private SchedulerService schedulerService;
066 private ParameterService parameterService;
067 private DateTimeService dateTimeService;
068 private List<Step> steps;
069 private Step currentStep;
070 private Appender ndcAppender;
071 private boolean notRunnable;
072 private transient Thread workerThread;
073
074 /**
075 * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
076 */
077 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
078 workerThread = Thread.currentThread();
079 if (isNotRunnable()) {
080 if (LOG.isInfoEnabled()) {
081 LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
082 }
083 return;
084 }
085 int startStep = 0;
086 try {
087 startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
088 }
089 catch (NumberFormatException ex) {
090 // not present, do nothing
091 }
092 int endStep = 0;
093 try {
094 endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
095 }
096 catch (NumberFormatException ex) {
097 // not present, do nothing
098 }
099 Date jobRunDate = dateTimeService.getCurrentDate();
100 int currentStepNumber = 0;
101 try {
102 LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
103 for (Step step : getSteps()) {
104 currentStepNumber++;
105 // prevent starting of the next step if the thread has an interrupted status
106 if (workerThread.isInterrupted()) {
107 LOG.warn("Aborting Job execution due to manual interruption");
108 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
109 return;
110 }
111 if (startStep > 0 && currentStepNumber < startStep) {
112 if (LOG.isInfoEnabled()) {
113 LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
114 }
115 continue; // skip to next step
116 }
117 else if (endStep > 0 && currentStepNumber > endStep) {
118 if (LOG.isInfoEnabled()) {
119 LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
120 }
121 break;
122 }
123 step.setInterrupted(false);
124 try {
125 if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
126 break;
127 }
128 }
129 catch (InterruptedException ex) {
130 LOG.warn("Stopping after step interruption");
131 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
132 return;
133 }
134 if (step.isInterrupted()) {
135 LOG.warn("attempt to interrupt step failed, step continued to completion");
136 LOG.warn("cancelling remainder of job due to step interruption");
137 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
138 return;
139 }
140 }
141 }
142 catch (Exception e) {
143 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
144 throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
145 }
146 LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
147 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
148 }
149
150 public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
151
152 boolean continueJob = true;
153 if (GlobalVariables.getUserSession() == null) {
154 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
155 }
156 else {
157 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
158 }
159
160 if (!skipStep(parameterService, step, jobRunDate)) {
161
162 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
163 Class stepClass = unProxiedStep.getClass();
164
165 GlobalVariables.setErrorMap(new ErrorMap());
166 GlobalVariables.setMessageList(new MessageList());
167
168 String stepUserName = KFSConstants.SYSTEM_USER;
169 if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
170 stepUserName = parameterService.getParameterValue(stepClass, STEP_USER_PARM_NM);
171 }
172 if (LOG.isInfoEnabled()) {
173 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
174 }
175 GlobalVariables.setUserSession(new UserSession(stepUserName));
176 if (LOG.isInfoEnabled()) {
177 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
178 }
179 StopWatch stopWatch = new StopWatch();
180 stopWatch.start(jobName);
181 try {
182 continueJob = step.execute(jobName, jobRunDate);
183 }
184 catch (InterruptedException e) {
185 LOG.error("Exception occured executing step", e);
186 throw e;
187 }
188 catch (RuntimeException e) {
189 LOG.error("Exception occured executing step", e);
190 throw e;
191 }
192 stopWatch.stop();
193 LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
194 if (!continueJob) {
195 LOG.info("Stopping job after successful step execution");
196 }
197 }
198
199 LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
200 return continueJob;
201 }
202
203 /**
204 * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters.
205 * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE.
206 * If RUN_DATE exists, but contains an empty value the step will not be skipped.
207 */
208 protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
209 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
210 Class stepClass = unProxiedStep.getClass();
211
212 DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
213 String dateFormat = parameterService.getParameterValue(KNSConstants.KNS_NAMESPACE, KNSConstants.DetailTypes.ALL_DETAIL_TYPE, KNSConstants.SystemGroupParameterNames.DATE_TO_STRING_FORMAT_FOR_USER_INTERFACE);
214
215 //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step
216 //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step
217
218 boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
219 boolean runInd = (runIndExists ? parameterService.getIndicatorParameter(stepClass, STEP_RUN_PARM_NM) : true);
220
221 boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
222 boolean runDateIsEmpty = (runDateExists ? StringUtils.isEmpty(parameterService.getParameterValue(stepClass, STEP_RUN_ON_DATE_PARM_NM)) : true);
223 boolean runDateContainsTodaysDate = (runDateExists ? parameterService.getParameterValues(stepClass, STEP_RUN_ON_DATE_PARM_NM).contains(dTService.toString(jobRunDate, dateFormat)): true);
224
225 if (!runInd && !runDateExists) {
226 if (LOG.isInfoEnabled()) {
227 LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
228 }
229 return true;
230 }
231 else if (!runInd && !runDateIsEmpty && !runDateContainsTodaysDate) {
232 if (LOG.isInfoEnabled()) {
233 LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + " and " + STEP_RUN_ON_DATE_PARM_NM +" for "+ stepClass.getName());
234 }
235 return true;
236 }
237 else if (!runIndExists && !runDateIsEmpty && !runDateContainsTodaysDate) {
238 if (LOG.isInfoEnabled()) {
239 LOG.info("Skipping step due to system parameter: " + STEP_RUN_ON_DATE_PARM_NM +" for "+ stepClass.getName());
240 }
241 return true;
242 }
243 else { //run step
244 return false;
245 }
246 }
247
248 /**
249 * @throws UnableToInterruptJobException
250 */
251 public void interrupt() throws UnableToInterruptJobException {
252 // ask the step to interrupt
253 if (currentStep != null) {
254 currentStep.interrupt();
255 }
256 // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps
257 workerThread.interrupt();
258 }
259
260 public void setParameterService(ParameterService parameterService) {
261 this.parameterService = parameterService;
262 }
263
264 public void setSteps(List<Step> steps) {
265 this.steps = steps;
266 }
267
268 public Appender getNdcAppender() {
269 return ndcAppender;
270 }
271
272 public void setNdcAppender(Appender ndcAppender) {
273 this.ndcAppender = ndcAppender;
274 }
275
276 public void setNotRunnable(boolean notRunnable) {
277 this.notRunnable = notRunnable;
278 }
279
280 protected boolean isNotRunnable() {
281 return notRunnable;
282 }
283
284 public ParameterService getParameterService() {
285 return parameterService;
286 }
287
288 public List<Step> getSteps() {
289 return steps;
290 }
291
292 public void setSchedulerService(SchedulerService schedulerService) {
293 this.schedulerService = schedulerService;
294 }
295
296 public void setDateTimeService(DateTimeService dateTimeService) {
297 this.dateTimeService = dateTimeService;
298 }
299
300 protected String jobDataMapToString(JobDataMap jobDataMap) {
301 StringBuilder buf = new StringBuilder();
302 buf.append("{");
303 Iterator keys = jobDataMap.keySet().iterator();
304 boolean hasNext = keys.hasNext();
305 while (hasNext) {
306 String key = (String) keys.next();
307 Object value = jobDataMap.get(key);
308 buf.append(key).append("=");
309 if (value == jobDataMap) {
310 buf.append("(this map)");
311 }
312 else {
313 buf.append(value);
314 }
315 hasNext = keys.hasNext();
316 if (hasNext) {
317 buf.append(", ");
318 }
319 }
320 buf.append("}");
321 return buf.toString();
322 }
323
324 protected String getMachineName() {
325 try {
326 return InetAddress.getLocalHost().getHostName();
327 }
328 catch (UnknownHostException e) {
329 return "Unknown";
330 }
331 }
332 }