/*
 * Decompiled with CFR 0.152.
 */
package com.bwanms.services.impl;

import com.bwanms.ddi.ConfigurationChangedEvent;
import com.bwanms.ddi.ConfigurationNotSynchronizedException;
import com.bwanms.ddi.DDInterface;
import com.bwanms.ddi.DDRegistry;
import com.bwanms.ddi.EquipmentSynchronizationStrategy;
import com.bwanms.domain.logic.Helpers;
import com.bwanms.mo.ManagedObject;
import com.bwanms.model.Equipment;
import com.bwanms.persistence.Datastore;
import com.bwanms.persistence.RunnableDatastoreDecorator;
import com.bwanms.platform.PlatformFactory;
import com.bwanms.services.impl.ConfigurationChangeProcessor;
import com.bwanms.util.ExecutorUtils;
import com.bwanms.util.NamedThreadFactory;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.apache.log4j.Logger;

public class ConfigurationChangeProcessorImpl
implements ConfigurationChangeProcessor,
MessageListener {
    private static final Logger log = Logger.getLogger(ConfigurationChangeProcessorImpl.class);
    private long eventsProcessed;
    private long totalEventProcessingTime;
    private long eventsErroded;
    private int processorThreads = 5;
    private HashMap<String, Collection<ProcessEventRunnable>> eventQueues;
    private HashMap<String, ExecutorService> processors;
    private Collection<Long> equipmentForReprocess;
    private Thread reprocessor;
    private long reprocessorDelay = 5000L;
    private boolean systemUp = false;
    private boolean shouldStop = false;

    public ConfigurationChangeProcessorImpl() {
        PlatformFactory.getApplicationLayerPlatform().getMessaging().createQueue("queue/ConfigurationChangedTopic");
    }

    public void start() {
        this.equipmentForReprocess = Collections.synchronizedCollection(new HashSet());
        this.eventQueues = new HashMap();
        this.processors = new HashMap();
        this.eventQueues.put(ManagedObject.class.getName(), Collections.synchronizedCollection(new LinkedHashSet()));
        this.processors.put(ManagedObject.class.getName(), new ThreadPoolExecutor(0, this.processorThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("ConfigChange-default")));
        for (DDInterface ddi : DDRegistry.getInstance().getDDInterfaces()) {
            for (List<Class<? extends ManagedObject>> clazzList : ddi.getMOClassesForIndividualProcessing()) {
                StringBuilder concatName = new StringBuilder();
                for (Class<? extends ManagedObject> clazz : clazzList) {
                    concatName.append(clazz.getName() + "-");
                }
                Collection eventQueue = Collections.synchronizedCollection(new LinkedHashSet());
                ExecutorService processor = Executors.newSingleThreadExecutor(new NamedThreadFactory("ConfigChange-" + concatName));
                for (Class<? extends ManagedObject> clazz : clazzList) {
                    this.eventQueues.put(clazz.getName(), eventQueue);
                    this.processors.put(clazz.getName(), processor);
                }
            }
        }
        PlatformFactory.getApplicationLayerPlatform().getMessaging().setQueueListener("queue/ConfigurationChangedTopic", this);
        this.scheduleForReprocess();
        this.shouldStop = false;
        this.reprocessor = new Thread(new Runnable(){

            public void run() {
                log.info((Object)"[Reprocessor] Starting");
                while (!ConfigurationChangeProcessorImpl.this.shouldStop) {
                    ConfigurationChangeProcessorImpl.this.performReprocess();
                    try {
                        Thread.sleep(ConfigurationChangeProcessorImpl.this.reprocessorDelay);
                    }
                    catch (InterruptedException e) {
                        log.debug((Object)e, (Throwable)e);
                    }
                }
                log.info((Object)"[Reprocessor] Stopping");
            }
        });
        if (!this.systemUp) {
            PlatformFactory.getApplicationLayerPlatform().getMessaging().addTopicListener("topic/Platform", new MessageListener(){

                public void onMessage(Message msg) {
                    ConfigurationChangeProcessorImpl.this.systemUp = true;
                    if (ConfigurationChangeProcessorImpl.this.reprocessor != null) {
                        ConfigurationChangeProcessorImpl.this.reprocessor.start();
                    }
                }
            });
        } else {
            this.reprocessor.start();
        }
    }

    private void scheduleForReprocess() {
        new RunnableDatastoreDecorator(new Runnable(){

            public void run() {
                for (DDInterface ddi : DDRegistry.getInstance().getDDInterfaces()) {
                    for (Class<? extends Equipment> clazz : ddi.getEquipmentWithAsyncConfig()) {
                        ConfigurationChangeProcessorImpl.this.equipmentForReprocess.addAll(Datastore.session().createQuery("select e.id from " + clazz.getName() + " as e").list());
                    }
                }
            }
        }).run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scheduleForReprocess(long equipmentId) {
        Collection<Long> collection = this.equipmentForReprocess;
        synchronized (collection) {
            this.equipmentForReprocess.add(equipmentId);
            this.equipmentForReprocess.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void performReprocess() {
        Long equipmentId;
        Collection<Long> collection = this.equipmentForReprocess;
        synchronized (collection) {
            if (this.equipmentForReprocess.isEmpty()) {
                try {
                    this.equipmentForReprocess.wait();
                }
                catch (InterruptedException e) {
                    log.debug((Object)e, (Throwable)e);
                }
                return;
            }
            equipmentId = this.equipmentForReprocess.iterator().next();
            this.equipmentForReprocess.remove(equipmentId);
            log.debug((Object)(this.equipmentForReprocess.size() + " scheduled for reprocess "));
        }
        final long finalEquipmentId = equipmentId;
        try {
            new RunnableDatastoreDecorator(new Runnable(){

                public void run() {
                    Equipment eq = (Equipment)Datastore.session().get(Equipment.class, (Serializable)Long.valueOf(finalEquipmentId));
                    if (eq != null && eq.getState() != -3) {
                        Helpers.getConfigurationHelper().reprocessConfiguration(eq);
                    }
                }
            }).run();
        }
        catch (Throwable t) {
            log.error((Object)t, t);
            this.scheduleForReprocess(equipmentId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.shouldStop = true;
        Collection<Long> collection = this.equipmentForReprocess;
        synchronized (collection) {
            this.equipmentForReprocess.notify();
        }
        PlatformFactory.getApplicationLayerPlatform().getMessaging().removeQueueListener(this);
        for (ExecutorService executor : this.processors.values()) {
            ExecutorUtils.shutdownAndWait("ConfigurationChange", executor);
        }
        this.eventQueues = null;
        this.processors = null;
        this.equipmentForReprocess = null;
        this.reprocessor = null;
    }

    public void onMessage(Message msg) {
        try {
            ObjectMessage objMsg = (ObjectMessage)msg;
            ConfigurationChangedEvent event = (ConfigurationChangedEvent)objMsg.getObject();
            log.debug((Object)("ConfigurationChangeProcessor: onMessage " + event.getEquipmentId()));
            this.addEventToQueue(event);
        }
        catch (JMSException e) {
            log.debug((Object)"", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addEventToQueue(ConfigurationChangedEvent event) {
        String key = event.getMoClass().getName();
        Collection<ProcessEventRunnable> eventQueue = this.eventQueues.get(key);
        if (eventQueue == null) {
            key = ManagedObject.class.getName();
            eventQueue = this.eventQueues.get(key);
        }
        Collection<ProcessEventRunnable> collection = eventQueue;
        synchronized (collection) {
            for (ProcessEventRunnable runnable : eventQueue) {
                if (!event.errode(runnable.event)) continue;
                log.debug((Object)"Event erroded");
                ++this.eventsErroded;
                return;
            }
            ProcessEventRunnable newRunnable = new ProcessEventRunnable(event);
            eventQueue.add(newRunnable);
            this.processors.get(key).submit(newRunnable);
            log.debug((Object)"Event added");
        }
    }

    private void processConfigurationChangeEvent(ConfigurationChangedEvent evt) {
        Equipment eq = (Equipment)Datastore.session().get(Equipment.class, (Serializable)new Long(evt.getEquipmentId()));
        if (eq == null) {
            log.debug((Object)(evt.getEquipmentId() + " not found."));
            return;
        }
        EquipmentSynchronizationStrategy syncStrategy = DDRegistry.getInstance().getDDInterfaceByEquipmentClass(eq.getClass().getName()).getSynchronizationStrategy(eq, Helpers.getDDCallback());
        try {
            syncStrategy.configurationChanged(evt);
        }
        catch (ConfigurationNotSynchronizedException e) {
            log.debug((Object)(evt.getEquipmentId() + " needs config download."));
        }
        catch (RuntimeException e) {
            log.error((Object)e, (Throwable)e);
            throw e;
        }
        catch (Throwable t) {
            log.error((Object)t, t);
            throw new RuntimeException(t);
        }
    }

    public int getProcessorThreads() {
        return this.processorThreads;
    }

    public void setProcessorThreads(int processorThreads) {
        this.processorThreads = processorThreads;
    }

    public long getEventsErroded() {
        return this.eventsErroded;
    }

    public long getEventsProcessed() {
        return this.eventsProcessed;
    }

    public long getTotalEventProcessingTime() {
        return this.totalEventProcessingTime;
    }

    public long getAverageProcessingTime() {
        return this.eventsProcessed == 0L ? 0L : this.totalEventProcessingTime / this.eventsProcessed;
    }

    private class ProcessEventRunnable
    implements Runnable {
        ConfigurationChangedEvent event;

        public ProcessEventRunnable(ConfigurationChangedEvent event) {
            this.event = event;
        }

        public void run() {
            long start = System.currentTimeMillis();
            Collection eventQueue = (Collection)ConfigurationChangeProcessorImpl.this.eventQueues.get(this.event.getMoClass().getName());
            if (eventQueue == null) {
                eventQueue = (Collection)ConfigurationChangeProcessorImpl.this.eventQueues.get(ManagedObject.class.getName());
            }
            eventQueue.remove(this);
            try {
                new RunnableDatastoreDecorator(new Runnable(){

                    public void run() {
                        ConfigurationChangeProcessorImpl.this.processConfigurationChangeEvent(ProcessEventRunnable.this.event);
                    }
                }).run();
                ConfigurationChangeProcessorImpl.this.eventsProcessed++;
                ConfigurationChangeProcessorImpl.this.totalEventProcessingTime += System.currentTimeMillis() - start;
            }
            catch (Throwable t) {
                log.error((Object)t, t);
                ConfigurationChangeProcessorImpl.this.scheduleForReprocess(this.event.getEquipmentId());
            }
        }
    }
}

