001    /*
002     * Copyright 2011 The Kuali Foundation.
003     * 
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     * 
008     * http://www.opensource.org/licenses/ecl2.php
009     * 
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.kfs.sys.batch;
017    
018    
019    import java.net.InetAddress;
020    import java.net.UnknownHostException;
021    import java.text.DateFormat;
022    import java.text.ParseException;
023    import java.text.SimpleDateFormat;
024    import java.util.Date;
025    import java.util.Iterator;
026    import java.util.List;
027    
028    import org.apache.commons.lang.StringUtils;
029    import org.apache.log4j.Appender;
030    import org.apache.log4j.Logger;
031    import org.kuali.kfs.gl.GeneralLedgerConstants;
032    import org.kuali.kfs.sys.KFSConstants;
033    import org.kuali.kfs.sys.batch.service.SchedulerService;
034    import org.kuali.kfs.sys.context.ProxyUtils;
035    import org.kuali.kfs.sys.context.SpringContext;
036    import org.kuali.kfs.sys.service.impl.KfsParameterConstants;
037    import org.kuali.rice.kew.exception.WorkflowException;
038    import org.kuali.rice.kns.UserSession;
039    import org.kuali.rice.kns.service.DateTimeService;
040    import org.kuali.rice.kns.service.ParameterEvaluator;
041    import org.kuali.rice.kns.service.ParameterService;
042    import org.kuali.rice.kns.util.ErrorMap;
043    import org.kuali.rice.kns.util.GlobalVariables;
044    import org.kuali.rice.kns.util.KNSConstants;
045    import org.kuali.rice.kns.util.MessageList;
046    import org.quartz.InterruptableJob;
047    import org.quartz.JobDataMap;
048    import org.quartz.JobExecutionContext;
049    import org.quartz.JobExecutionException;
050    import org.quartz.StatefulJob;
051    import org.quartz.UnableToInterruptJobException;
052    import org.springframework.aop.framework.Advised;
053    import org.springframework.aop.support.AopUtils;
054    import org.springframework.util.StopWatch;
055    
056    public class Job implements StatefulJob, InterruptableJob {
057    
058        public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
059        public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
060        public static final String STEP_RUN_PARM_NM = "RUN_IND";    
061        public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
062        public static final String STEP_USER_PARM_NM = "USER";
063        
064        private static final Logger LOG = Logger.getLogger(Job.class);
065        private SchedulerService schedulerService;
066        private ParameterService parameterService;
067        private DateTimeService dateTimeService;
068        private List<Step> steps;
069        private Step currentStep;
070        private Appender ndcAppender;
071        private boolean notRunnable;
072        private transient Thread workerThread;
073    
074        /**
075         * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
076         */
077        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
078            workerThread = Thread.currentThread();
079            if (isNotRunnable()) {
080                if (LOG.isInfoEnabled()) {
081                    LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
082                }
083                return;
084            }
085            int startStep = 0;
086            try {
087                startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
088            }
089            catch (NumberFormatException ex) {
090                // not present, do nothing
091            }
092            int endStep = 0;
093            try {
094                endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
095            }
096            catch (NumberFormatException ex) {
097                // not present, do nothing
098            }
099            Date jobRunDate = dateTimeService.getCurrentDate();
100            int currentStepNumber = 0;
101            try {
102                LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
103                for (Step step : getSteps()) {
104                    currentStepNumber++;
105                    // prevent starting of the next step if the thread has an interrupted status
106                    if (workerThread.isInterrupted()) {
107                        LOG.warn("Aborting Job execution due to manual interruption");
108                        schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
109                        return;
110                    }
111                    if (startStep > 0 && currentStepNumber < startStep) {
112                        if (LOG.isInfoEnabled()) {
113                            LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
114                        }
115                        continue; // skip to next step
116                    }
117                    else if (endStep > 0 && currentStepNumber > endStep) {
118                        if (LOG.isInfoEnabled()) {
119                            LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
120                        }
121                        break;
122                    }
123                    step.setInterrupted(false);
124                    try {
125                        if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
126                            break;
127                        }
128                    }
129                    catch (InterruptedException ex) {
130                        LOG.warn("Stopping after step interruption");
131                        schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
132                        return;
133                    }
134                    if (step.isInterrupted()) {
135                        LOG.warn("attempt to interrupt step failed, step continued to completion");
136                        LOG.warn("cancelling remainder of job due to step interruption");
137                        schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
138                        return;
139                    }
140                }
141            }
142            catch (Exception e) {
143                schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
144                throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
145            }
146            LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
147            schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
148        }
149    
150        public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
151            
152            boolean continueJob = true;
153            if (GlobalVariables.getUserSession() == null) {
154                LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
155            }
156            else {
157                LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
158            }        
159            
160            if (!skipStep(parameterService, step, jobRunDate)) {
161                
162                Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
163                Class stepClass = unProxiedStep.getClass();
164                
165                GlobalVariables.setErrorMap(new ErrorMap());
166                GlobalVariables.setMessageList(new MessageList());
167                
168                String stepUserName = KFSConstants.SYSTEM_USER;
169                if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
170                    stepUserName = parameterService.getParameterValue(stepClass, STEP_USER_PARM_NM);
171                }
172                if (LOG.isInfoEnabled()) {
173                    LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
174                }
175                GlobalVariables.setUserSession(new UserSession(stepUserName));
176                if (LOG.isInfoEnabled()) {
177                    LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
178                }
179                StopWatch stopWatch = new StopWatch();
180                stopWatch.start(jobName);
181                try {
182                    continueJob = step.execute(jobName, jobRunDate);
183                }
184                catch (InterruptedException e) {
185                    LOG.error("Exception occured executing step", e);
186                    throw e;
187                }
188                catch (RuntimeException e) {
189                    LOG.error("Exception occured executing step", e);
190                    throw e;
191                }
192                stopWatch.stop();
193                LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
194                if (!continueJob) {
195                    LOG.info("Stopping job after successful step execution");
196                }
197            }
198            
199            LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
200            return continueJob;
201        }
202        
203        /**
204         * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters.
205         * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE. 
206         * If RUN_DATE exists, but contains an empty value the step will not be skipped.
207         */
208        protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
209            Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
210            Class stepClass = unProxiedStep.getClass();
211            
212            DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
213            String dateFormat = parameterService.getParameterValue(KNSConstants.KNS_NAMESPACE, KNSConstants.DetailTypes.ALL_DETAIL_TYPE, KNSConstants.SystemGroupParameterNames.DATE_TO_STRING_FORMAT_FOR_USER_INTERFACE);
214            
215            //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step
216            //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step
217            
218            boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
219            boolean runInd = (runIndExists ? parameterService.getIndicatorParameter(stepClass, STEP_RUN_PARM_NM) : true);
220            
221            boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
222            boolean runDateIsEmpty = (runDateExists ? StringUtils.isEmpty(parameterService.getParameterValue(stepClass, STEP_RUN_ON_DATE_PARM_NM)) : true);
223            boolean runDateContainsTodaysDate = (runDateExists ? parameterService.getParameterValues(stepClass, STEP_RUN_ON_DATE_PARM_NM).contains(dTService.toString(jobRunDate, dateFormat)): true);
224    
225            if (!runInd && !runDateExists) {
226                if (LOG.isInfoEnabled()) {
227                    LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
228                }            
229                return true;
230            }
231            else if (!runInd && !runDateIsEmpty && !runDateContainsTodaysDate) {
232                if (LOG.isInfoEnabled()) {
233                    LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + " and " + STEP_RUN_ON_DATE_PARM_NM +" for "+ stepClass.getName());
234                }
235                return true;
236            }
237            else if (!runIndExists && !runDateIsEmpty && !runDateContainsTodaysDate) {
238                if (LOG.isInfoEnabled()) {
239                    LOG.info("Skipping step due to system parameter: " + STEP_RUN_ON_DATE_PARM_NM +" for "+ stepClass.getName());
240                }
241                return true;
242            }
243            else { //run step
244                return false;
245            }
246        }
247    
248        /**
249         * @throws UnableToInterruptJobException
250         */
251        public void interrupt() throws UnableToInterruptJobException {
252            // ask the step to interrupt
253            if (currentStep != null) {
254                currentStep.interrupt();
255            }
256            // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps
257            workerThread.interrupt();
258        }
259    
260        public void setParameterService(ParameterService parameterService) {
261            this.parameterService = parameterService;
262        }
263    
264        public void setSteps(List<Step> steps) {
265            this.steps = steps;
266        }
267    
268        public Appender getNdcAppender() {
269            return ndcAppender;
270        }
271    
272        public void setNdcAppender(Appender ndcAppender) {
273            this.ndcAppender = ndcAppender;
274        }
275    
276        public void setNotRunnable(boolean notRunnable) {
277            this.notRunnable = notRunnable;
278        }
279    
280        protected boolean isNotRunnable() {
281            return notRunnable;
282        }
283    
284        public ParameterService getParameterService() {
285            return parameterService;
286        }
287    
288        public List<Step> getSteps() {
289            return steps;
290        }
291    
292        public void setSchedulerService(SchedulerService schedulerService) {
293            this.schedulerService = schedulerService;
294        }
295    
296        public void setDateTimeService(DateTimeService dateTimeService) {
297            this.dateTimeService = dateTimeService;
298        }
299        
300        protected String jobDataMapToString(JobDataMap jobDataMap) {
301            StringBuilder buf = new StringBuilder();
302            buf.append("{");
303            Iterator keys = jobDataMap.keySet().iterator();
304            boolean hasNext = keys.hasNext();
305            while (hasNext) {
306                String key = (String) keys.next();
307                Object value = jobDataMap.get(key);
308                buf.append(key).append("=");
309                if (value == jobDataMap) {
310                    buf.append("(this map)");
311                }
312                else {
313                    buf.append(value);
314                }
315                hasNext = keys.hasNext();
316                if (hasNext) {
317                    buf.append(", ");
318                }
319            }
320            buf.append("}");
321            return buf.toString();
322        }
323        
324        protected String getMachineName() {
325            try {
326                return InetAddress.getLocalHost().getHostName();
327            }
328            catch (UnknownHostException e) {
329                return "Unknown";
330            }
331        }
332    }