package anastore.client;


import anastore.net.Message;
import anastore.net.ReceiveChannel;
import anastore.net.SendChannel;
import anastore.proto.CommitRep;
import anastore.proto.CommitReq;
import anastore.proto.ConflictRep;
import anastore.store.DeprecationHandler;
import anastore.store.NoSuchDatum;
import anastore.store.Version;
import anastore.store.VersionStore;
import anastore.util.BlockData;
import anastore.util.Log;
import anastore.util.Pair;
import anastore.util.ProtocolInterruptedException;
import anastore.util.ProtocolViolationException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

/**
 * A read-write snapshot.
 */
public class RWSnapshot extends Snapshot
{
    private static final boolean LOG = false;
    private static final Log log = new Log(LOG);

    private final SendChannel _chan;
    private final VersionStore<BlockData> _cache;
    private final DeprecationThread _depThread;

    private final DeprecationHandler _aborter = new Aborter();
    private final static Random RANDOM = new Random();
    private final RWState _state = new RWState();
    private Thread _committing = null;

    /**
     * The set of existing block ID's read from.
     */
    private final Map<Long, Long> _r = new HashMap<Long, Long>();
    /**
     * The side-store of blocks that have been written to, including
     * newly created blocks.  This maps from the block's ID to a
     * writable array-backed byte buffer of the block's contents.
     *<p>
     * Note that these byte buffers should never be returned directly
     * from a method.  Instead, a duplicate should be returned so that
     * the original buffer's locations cannot be changed.
     *<p>
     * All of the ID's in this map must appear in either _r or _n.
     */
    private final Map<Long, ByteBuffer> _w =
        new HashMap<Long, ByteBuffer>();
    /**
     * The set of ID's of new blocks.  All of these blocks must appear
     * in _w and none may appear in _r.
     */
    private final Set<Long> _n = new HashSet<Long>();

    /**
     * A deprecation handler that aborts the current transaction when
     * there is a deprecation.
     */
    private class Aborter implements DeprecationHandler
    {
        public void onDeprecation(Set<Long> deps, long upperBound)
        {
            synchronized (RWSnapshot.this) {
                if (_state.get() == RWState.COMMITTED)
                    // We don't care any more.  This can happen, for
                    // example, when we get deprecations for blocks
                    // that we read after we've already committed
                    return;
                abort();
            }
        }

        @Override
        public String toString()
        {
            return RWSnapshot.this.toString() + ".Aborter";
        }
    }

    /**
     * Construct a read-write snapshot backed by the given cache.  It
     * will use the given channel to communicate commit messages to
     * the server.
     */
    RWSnapshot(SendChannel chan, VersionStore<BlockData> cache,
               DeprecationThread depThread)
    {
        _chan = chan;
        _cache = cache;
        _depThread = depThread;
        if (LOG)
            log.println("Started " + this);
    }

    public synchronized ByteBuffer readBlock(long id)
        throws AbortedException, NoSuchDatum, IllegalStateException
    {
        _state.checkActive();

        // Retrieve it from the side-store, in case we've written this
        // particular block
        if (_w.containsKey(id)) {
            if (LOG)
                log.println(this + " Read block " + id + " from side-store");
            return _w.get(id).asReadOnlyBuffer();
        }
        // Get the latest copy from the cache
        if (LOG)
            log.println(this + " Reading block " + id + " from cache");
        try {
            Version<BlockData> v = _cache.getLatest(id, _aborter);
            BlockData block = v.get();
            _r.put(id, v.getRange().getLowerBound());
            return block.toROByteBuffer();
        } catch (IOException e) {
            throw new ProtocolViolationException(e);
        }
    }

    /**
     * Retrieve a writable block.
     *
     * @throws AbortedException if the transaction has been aborted
     * @throws IllegalStateException if the transaction has been
     * committed or is in the process of committing
     * @throws NoSuchDatum if id refers to a datum that does not exist
     * in the current snapshot
     */
    public synchronized ByteBuffer readWriteBlock(long id)
        throws AbortedException, IllegalStateException, NoSuchDatum
    {
        _state.checkActive();

        // Retrieve it from the side-store
        if (_w.containsKey(id)) {
            if (LOG)
                log.println(this + " Read writable block " + id +
                            " from side-store");
            return _w.get(id).duplicate();
        }
        // Get the latest copy from the cache
        if (LOG)
            log.println(this + " Reading writable block " + id +
                        " from cache");
        try {
            Version<BlockData> v = _cache.getLatest(id, _aborter);
            BlockData block = v.get();
            ByteBuffer buf = block.copyToRWByteBuffer();
            _r.put(id, v.getRange().getLowerBound());
            _w.put(id, buf);
            return buf.duplicate();
        } catch (IOException e) {
            throw new ProtocolViolationException(e);
        }
    }

    /**
     * Create a new writable block of the given size.  The contents of
     * the block will initially be all zeroes.
     *
     * @returns The identifier of the block, plus the block data
     * @throws AbortedException if the transaction has been aborted
     * @throws IllegalStateException if the transaction has been
     * committed or is in the process of committing
     */
    public synchronized Pair<Long, ByteBuffer> newBlock(int size)
        throws AbortedException, IllegalStateException
    {
        _state.checkActive();

        long id;
        do {
            id = RANDOM.nextLong();
        } while (!_n.add(id));
        if (LOG)
            log.println(this + " Creating new block " + id);
        ByteBuffer buf = ByteBuffer.allocate(size);
        _w.put(id, buf);
        return new Pair<Long, ByteBuffer>(id, buf.duplicate());
    }

    /**
     * Commit all of the changes made to blocks by this snapshot.
     *
     * @throws AbortedException if the transaction has been aborted
     * @throws IllegalStateException if the transaction has already
     * been committed or is in the process of committing
     */
    public void commit()
        throws AbortedException, IllegalStateException
    {
        // Send commit request
        ReceiveChannel rc;
        synchronized (this) {
            if (LOG)
                log.println(this + " Committing..");
            _state.checkActive();
            _state.set(RWState.COMMITTING);
            _committing = Thread.currentThread();
            Map<Long, byte[]> w = new HashMap<Long, byte[]>();
            for (Map.Entry<Long, ByteBuffer> entry : _w.entrySet())
                w.put(entry.getKey(), entry.getValue().array());
            try {
                rc = _chan.send(new CommitReq(_r, w, _n));
            } catch (IOException e) {
                throw new ProtocolViolationException("Network error", e);
            }
        }
        // Wait for response or interrupt
        Message msg;
        try {
            msg = rc.receiveAsync();
        } catch (InterruptedException e) {
            if (_state.get() != RWState.ABORTED)
                throw new ProtocolInterruptedException("Receiving commit response");
            if (LOG)
                log.println(this + " ..Commit aborted asynchronously");
            _committing = null;
            throw new AbortedException();
        } catch (IOException e) {
            throw new ProtocolViolationException("Network error", e);
        }
        // Handle response
        synchronized (this) {
            if (msg instanceof ConflictRep) {
                if (LOG)
                    log.println(this + " ..Commit aborted by conflict");
                _committing = null;
                abort();
                throw new AbortedException();
            } else if (msg instanceof CommitRep) {
                CommitRep rep = (CommitRep)msg;
                if (LOG)
                    log.println(this + " ..Committing to timestamp " + rep.timestamp);
                Map<Long, BlockData> w = new HashMap<Long, BlockData>(_w.size());
                for (Map.Entry<Long, ByteBuffer> entry : _w.entrySet())
                    w.put(entry.getKey(), new BlockData(entry.getValue()));
                try {
                    _cache.write(rep.timestamp, w, null, _aborter);
                } catch (IOException e) {
                    throw new ProtocolViolationException(e);
                }
                _depThread.updatePhysicalUpperBound();
                _state.set(RWState.COMMITTED);
                if (LOG)
                    log.println(this + " ..Committed");
                _r.clear();
                _w.clear();
                _n.clear();
            } else {
                throw new ProtocolViolationException(msg);
            }
        }
    }

    /**
     * Abort all of the modifications made in this snapshot.
     *
     * @throws IllegalStateException if the transaction has been
     * committed
     */
    public synchronized void abort()
        throws IllegalStateException
    {
        if (LOG)
            log.println(this + " Aborting");
        if (_state.set(RWState.ABORTED) == RWState.COMMITTING) {
            // XXX The server currently sends conflict messages for
            // write conflicts, so don't bother interrupting the
            // committer.
            //_committing.interrupt();
        }
        _r.clear();
        _w.clear();
        _n.clear();
    }

    @Override
    public String toString()
    {
        return "RWSnapshot<" + Integer.toHexString(hashCode()) + ">";
    }
}
