package anastore.client;

import anastore.net.ReceiveChannel;
import anastore.net.SendChannel;
import anastore.store.VersionStore;
import anastore.net.Message;
import anastore.proto.DeprecationRep;
import anastore.proto.HandshakeRep;
import anastore.proto.HiReq;
import anastore.util.BlockData;
import anastore.util.BoundSet;
import anastore.util.Log;
import anastore.util.ProtocolInterruptedException;
import anastore.util.ProtocolViolationException;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * A thread that listens for and processes deprecation messages.
 */
class DeprecationThread extends Thread
{
    private static final Log log = new Log();

    private final SendChannel _sendChan;
    private final ReceiveChannel _depChan;
    private final VersionStore<BlockData> _cache;
    private final TimeSource _timeSource;

    /**
     * The physical timestamp of the last GLUB update.  This indicates
     * how stale the cache data is.  To update this, the _pubLock must
     * be held.  When this is updated, the _pubLock must be notified.
     */
    private volatile double _physicalUpperBound = Double.NEGATIVE_INFINITY;
    private final Object _pubLock = new Object();

    /**
     * Protected by _pubLock.
     */
    private long lastGLUB = 0;
    private final BoundSet _pins;

    /**
     * A lock that protects the GLUB update process.  This prevents
     * multiple threads from concurrently issuing GLUB update
     * requests.
     */
    private final Object _hiLock = new Object();

    private final BlockingQueue<HandshakeRep> _handshake =
        new ArrayBlockingQueue<HandshakeRep>(1);

    /**
     * Construct a deprecation thread that listens for messages on the
     * given channel and performs deprecations in the given cache.
     * Don't forget to start the thread once it has been constructed.
     */
    public DeprecationThread(SendChannel sendChan, ReceiveChannel depChan,
                             VersionStore<BlockData> cache,
                             TimeSource timeSource,
                             BoundSet pins)
    {
        super("Deprecation thread");
        setDaemon(true);

        _sendChan = sendChan;
        _depChan = depChan;
        _cache = cache;
        _timeSource = timeSource;
        _pins = pins;
        _pins.add(lastGLUB);
    }

    /**
     * Loop forever, receiving and processing deprecation messages.
     */
    public void run()
    {
        while (true) {
            Message msg;
            try {
                msg = _depChan.receiveAsync();
            } catch (IOException e) {
                throw new ProtocolViolationException("Network error", e);
            } catch (InterruptedException e) {
                throw new ProtocolInterruptedException
                    ("Waiting for deprecation", e);
            }
            if (msg instanceof HandshakeRep) {
                HandshakeRep rep = (HandshakeRep)msg;
                log.println("Received handshake reply: " + rep);
                if (!_handshake.offer(rep))
                    throw new ProtocolViolationException
                        (rep, "Multiple HandshakeRep's");
            } else if (msg instanceof DeprecationRep) {
                DeprecationRep rep = (DeprecationRep)msg;
                log.println("Received deprecation: " + rep);
                if (rep.blocks.isEmpty() && rep.start == rep.timestamp + 1) {
                    // This is a special "null" deprecation that is
                    // only meant to update the PUB
                } else {
                    _cache.deprecate(rep.start, rep.timestamp, rep.blocks);
                }
                // Now that the GLUB is up-to-date, update our PUB
                updatePhysicalUpperBound();
            } else {
                throw new ProtocolViolationException
                    (msg, "Unexpected message on deprecation channel");
            }
        }
    }

    /**
     * Update the physical upper bound of the cache to the current
     * time.  This should be called <i>after</i> any operation that
     * changes the cache's GLUB.
     */
    public void updatePhysicalUpperBound()
    {
        synchronized (_pubLock) {
            long glub = _cache.getLeastUpperBound();
            _pins.add(glub);
            _pins.remove(lastGLUB);
            lastGLUB = glub;

            double now = _timeSource.currentTime();
            if (now > _physicalUpperBound) {
                _physicalUpperBound = now;
                log.println("Updated PUB to " + now);
                _pubLock.notifyAll();
            }
        }
    }

    /**
     * Ensure that the physical upper bound of the cache is no more
     * than freshness less than the current time.  If necessary, this
     * will send an update request to the server.
     */
    public void ensureFreshness(double freshness)
    {
        assert freshness >= 0;

        double now = _timeSource.currentTime();

        // Are we already at least this fresh?
        if (now - freshness <= _physicalUpperBound)
            return;

        // We're not fresh enough.  Say 'Hi' to the server so it'll
        // update our GLUB
        synchronized (_hiLock) {
            // Did somebody else update the PUB while we were waiting?
            if (now - freshness <= _physicalUpperBound)
                return;

            log.println("Requesting GLUB update..");
            try {
                _sendChan.send(new HiReq());
            } catch (IOException e) {
                throw new ProtocolViolationException(e);
            }

            // Wait for the PUB to update
            log.println("..Need PUB to reach " + (now - freshness) +
                        " from " + _physicalUpperBound);
            synchronized (_pubLock) {
                while (now - freshness > _physicalUpperBound) {
                    try {
                        log.println("..Waiting for PUB to change");
                        _pubLock.wait();
                    } catch (InterruptedException e) {
                        throw new ProtocolInterruptedException
                            ("Waiting for PUB update", e);
                    }
                }
            }
            log.println("..PUB is now minty fresh " + _physicalUpperBound);
        }
    }

    /**
     * Block until a HandshakeRep has been received on the deprecation
     * channel.  This must be called <i>at most once</i>.
     */
    public HandshakeRep waitForHandshakeRep()
        throws InterruptedException
    {
        return _handshake.take();
    }
}
