package anastore.client;

import anastore.net.ReceiveChannel;
import anastore.net.SendChannel;
import anastore.proto.HandshakeReq;
import anastore.store.VersionStore;
import anastore.util.BlockData;
import anastore.util.BoundSet;
import anastore.util.Log;
import anastore.util.NotImplementedError;
import anastore.util.ProtocolInterruptedException;
import anastore.util.ProtocolViolationException;

import java.io.File;
import java.io.IOException;
import java.util.Map;

/**
 * A block store, supporting read-only and read-write serializable
 * transactions.
 */
public class BlockStore
{
    private static final Log log = new Log();
    public static final double DEFAULT_DEFAULT_FRESHNESS = 2.0;

    private final SendChannel _chan;
    private final VersionStore<BlockData> _cache;
    private final double _defaultFreshness;

    private final DeprecationThread _depThread;

    private final BoundSet _pins = new BoundSet(new Expirer());

    private class Expirer implements BoundSet.BoundChangeHandler
    {
        public void onLowerBoundChange(long lowerBound)
        {
            try {
                _cache.expire(lowerBound-1);
            } catch (IOException e) {
                // Uh...
                e.printStackTrace();
            }
        }
    }

    private BlockStore(SendChannel chan, File dir, double defaultFreshness,
                       TimeSource timeSource)
        throws IOException
    {
        _chan = chan;
        _cache = new VersionStore<BlockData>(dir, BlockData.FACTORY,
                                             new BlockRetriever(_chan));
        _defaultFreshness = defaultFreshness;

        Map<Long, Long> unbounded = _cache.getUnboundedSummary();

        log.println("Beginning handshake");
        ReceiveChannel rc;
        try {
            rc = _chan.send(new HandshakeReq(unbounded));
        } catch (IOException e) {
            throw new ProtocolViolationException("Network error", e);
        }
        _depThread = new DeprecationThread(_chan, rc, _cache,
                                           timeSource, _pins);
        _depThread.start();
        log.println("Waiting for handshake reply");
        try {
            _depThread.waitForHandshakeRep();
        } catch (InterruptedException e) {
            throw new ProtocolInterruptedException("Waiting for handshake", e);
        }
        log.println("Received handshake reply");
    }

    /**
     * Construct a block store that communicates to the remote block
     * manager on the other end of the channel.  It will use the
     * default default freshness of 2 seconds for read-only
     * transactions that don't specify freshness.
     *
     * @param chan A channel connected to a remote block manager
     * @param dir The directory in which to store the cache
     */
    public static BlockStore connect(SendChannel chan, File dir)
        throws IOException
    {
        return connect(chan, dir,
                       DEFAULT_DEFAULT_FRESHNESS, TimeSource.WALL_CLOCK);
    }

    /**
     * Construct a block store that communicates to the remote block
     * manager on the other end of the channel.
     *
     * @param chan A channel connected to a remote block manager
     * @param dir The directory in which to store the cache
     * @param defaultFreshness The number of seconds to use as a
     * freshness requirement for read-only transactions that don't
     * specify freshness.
     */
    public static BlockStore connect(SendChannel chan, File dir,
                                     double defaultFreshness)
        throws IOException
    {
        return connect(chan, dir, defaultFreshness, TimeSource.WALL_CLOCK);
    }

    /**
     * Construct a block store that communicates to the remote block
     * manager on the other end of the channel.
     *
     * @param chan A channel connected to a remote block manager
     * @param dir The directory in which to store the cache
     * @param defaultFreshness The time difference to use as a
     * freshness requirement for read-only transactions that don't
     * specify freshness.
     * @param timeSource The time source to use for checking
     * freshness.  The units used by this time source must match the
     * units used for defaultFreshness.
     */
    public static BlockStore connect(SendChannel chan, File dir,
                                     double defaultFreshness,
                                     TimeSource timeSource)
        throws IOException
    {
        return new BlockStore(chan, dir, defaultFreshness, timeSource);
    }

    /**
     * Begin a read-write transaction.  Read-write transactions are
     * guaranteed to operate on the latest version of the data, but
     * can abort at essentially any time.
     */
    public RWSnapshot beginRW()
    {
        return new RWSnapshot(_chan, _cache, _depThread);
    }

    /**
     * Begin a read-only transaction that uses the default freshness
     * requirement.
     */
    public ROSnapshot beginRO()
    {
        return beginRO(_defaultFreshness);
    }

    /**
     * Begin a read-only transaction.  Read-only transactions may
     * operate on stale, locally cached data; however, they are
     * guaranteed to observe the effects of all local read-write
     * transactions that committed prior to the read-only transaction
     * beginning.
     *
     * @param freshness Specifies a bound, in seconds, on the
     * staleness of the data that this transaction operates on
     * @throws IllegalArgumentException if freshness is negative
     */
    public ROSnapshot beginRO(double freshness)
        throws IllegalArgumentException
    {
        if (freshness < 0)
            throw new IllegalArgumentException
                ("Freshness must be non-negative");
        _depThread.ensureFreshness(freshness);
        // XXX The cache LUB is the upper bound on the timestamp of
        // the RO snapshot, but we could start it as far back as one
        // past the timestamp of our last committed write (constrained
        // by the freshness)
        return new ROSnapshot(_cache, _cache.getLeastUpperBound(),
                              _pins);
    }
}
