/*
 * Decompiled with CFR 0.152.
 */
package com.bwanms.services.impl;

import com.bwanms.backend.fault.EventTemplateProcessor;
import com.bwanms.backend.fault.alarmstate.AlarmStateProcessor;
import com.bwanms.backend.fault.correlation.CorrelationContext;
import com.bwanms.backend.fault.correlation.CorrelationProcessor;
import com.bwanms.backend.fault.correlation.CorrelationProcessorAdapter;
import com.bwanms.backend.services.EventDAO;
import com.bwanms.backend.services.EventHistoryDAO;
import com.bwanms.be.QueueHelper;
import com.bwanms.be.QueueI;
import com.bwanms.model.fault.Event;
import com.bwanms.services.ServiceException;
import com.bwanms.services.impl.EntityEventProcessor;
import com.bwanms.services.impl.QueueListenerWorker;
import com.bwanms.util.ExecutorUtils;
import com.bwanms.util.NamedThreadFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class EntityEventProcessorImpl2
implements EntityEventProcessor {
    private static final Logger log = Logger.getLogger(EntityEventProcessorImpl2.class);
    private int processorThreads = 5;
    private AlarmQueueListener alarmListener;
    private EventQueueListener eventListener;
    private ExecutorService correlationExecutor;
    private AtomicBoolean run;
    private CorrelationProcessorAdapter correlationAdapter;
    private ArrayBlockingQueue<List<Event>> correlationBatchQueue;

    public EntityEventProcessorImpl2() {
        this.configure();
    }

    private void configure() {
        this.correlationAdapter = new CorrelationProcessorAdapter(new CorrelationProcessor(new CorrelationContext()));
        this.correlationBatchQueue = new ArrayBlockingQueue(10);
        QueueI<Event> alarmQueue = QueueHelper.getQueueI("queue/EntityAlarm", Event.class);
        this.alarmListener = new AlarmQueueListener(alarmQueue, this.correlationBatchQueue);
        QueueI<Event> eventQueue = QueueHelper.getQueueI("queue/EntityEvent", Event.class);
        this.eventListener = new EventQueueListener(eventQueue);
        this.correlationExecutor = Executors.newFixedThreadPool(this.processorThreads, new NamedThreadFactory("EntityEventProcessor"));
        this.run = new AtomicBoolean(true);
    }

    public void start() {
        for (int i = 0; i < this.processorThreads; ++i) {
            this.correlationExecutor.execute(new Runnable(){

                public void run() {
                    while (EntityEventProcessorImpl2.this.run.get()) {
                        List events = (List)EntityEventProcessorImpl2.this.correlationBatchQueue.poll();
                        if (events != null && events.size() > 0) {
                            log.debug((Object)String.format("[%s] corelate %d alarms ...", Thread.currentThread().getName(), events != null ? events.size() : 0));
                            HashMap<Long, ArrayList<Event>> pending = new HashMap<Long, ArrayList<Event>>();
                            for (Event event : events) {
                                Long eqId = event.getEquipmentId();
                                ArrayList<Event> pendingList = (ArrayList<Event>)pending.get(eqId);
                                if (pendingList == null) {
                                    pendingList = new ArrayList<Event>();
                                    pending.put(eqId, pendingList);
                                }
                                pendingList.add(event);
                            }
                            for (Long eqId : pending.keySet()) {
                                ArrayList pendingList = (ArrayList)pending.get(eqId);
                                List<Event> alarms = EntityEventProcessorImpl2.this.correlateAlarms(eqId, pendingList);
                                AlarmStateProcessor.computeAlarmState(eqId, alarms);
                            }
                        }
                        try {
                            Thread.sleep(50L);
                        }
                        catch (InterruptedException e) {
                            EntityEventProcessorImpl2.this.run.set(false);
                        }
                    }
                }
            });
        }
        new Thread((Runnable)this.alarmListener, "ALARMS QUEUE LISTENER THREAD").start();
        new Thread((Runnable)this.eventListener, "EVENTS QUEUE LISTENER THREAD").start();
    }

    public void stop() {
        this.alarmListener.prepareToStop();
        this.eventListener.prepareToStop();
        ExecutorUtils.shutdownAndWait("EntityEventProcessor - workers", this.correlationExecutor);
        this.run.set(false);
        this.correlationExecutor = null;
        this.correlationBatchQueue = null;
    }

    @Override
    public int getProcessorThreads() {
        return this.processorThreads;
    }

    @Override
    public void setProcessorThreads(int processorThreads) {
        this.processorThreads = processorThreads;
    }

    public List<Event> correlateAlarms(Long eqId, List<Event> events) {
        long startTime = System.currentTimeMillis();
        List<Event> openAlarms = this.correlationAdapter.process(eqId, events);
        log.debug((Object)(Thread.currentThread().getName() + " - Corelate alarms took " + (System.currentTimeMillis() - startTime) + " milliseconds"));
        return openAlarms;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class EventQueueListener
    extends QueueListenerWorker {
        public EventQueueListener(QueueI<Event> queue) {
            super(queue, "EVENT");
        }

        @Override
        protected void templateProcessing(ArrayList<Event> list) {
            try {
                EventTemplateProcessor.getInstance().setProcessingEvents(list).process();
            }
            catch (Exception ex) {
                log.debug((Object)ex.getMessage(), (Throwable)ex);
            }
        }

        @Override
        protected void dbPersistence(ArrayList<Event> list) {
            log.debug((Object)("QueueListener worker EVENT: db persistence start - " + list.size() + " events"));
            try {
                if (list.size() > 0) {
                    LinkedList<Event> reqAckList = new LinkedList<Event>();
                    for (Event event : list) {
                        if (event.isAckRequired()) {
                            reqAckList.add(event);
                            continue;
                        }
                        event.setDeleted((byte)1);
                    }
                    EventHistoryDAO.insert(list);
                    if (reqAckList.size() > 0) {
                        EventDAO.insert(Event.Table.Operational, reqAckList);
                    }
                }
            }
            catch (ServiceException e) {
                e.printStackTrace();
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class AlarmQueueListener
    extends QueueListenerWorker {
        private final ArrayBlockingQueue<List<Event>> batchQueue;

        public AlarmQueueListener(QueueI<Event> queue, ArrayBlockingQueue<List<Event>> batchQueue) {
            super(queue, "ALARM");
            this.batchQueue = batchQueue;
        }

        @Override
        protected void templateProcessing(ArrayList<Event> list) {
            try {
                EventTemplateProcessor.getInstance().setProcessingEvents(list).process();
            }
            catch (Exception ex) {
                log.error((Object)ex.getMessage(), (Throwable)ex);
            }
        }

        @Override
        protected void dbPersistence(ArrayList<Event> list) {
            try {
                if (list.size() > 0) {
                    EventHistoryDAO.insert(list);
                    EventDAO.insert(Event.Table.Operational, list);
                }
            }
            catch (ServiceException e) {
                e.printStackTrace();
            }
        }

        @Override
        protected void passListToCorelate(ArrayList<Event> list) {
            log.debug((Object)("QueueListener worker ALARM: Exchanging queue size " + this.batchQueue.size()));
            if (!this.batchQueue.offer(list)) {
                log.warn((Object)("QueueListener worker ALARM: Exchanging alarms to corelation queue is full " + list.size()));
            } else {
                log.debug((Object)("QueueListener worker ALARM: Passed alarms to corelation queue " + list.size()));
            }
        }
    }
}

