package anastore.store;

import anastore.util.*;

import java.io.IOException;
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;

/**
 * A multi-version store of fixed sized blocks.  This aggressively
 * caches data both in memory and on disk.  It can operate both as a
 * cache and as the ultimate repository for data, depending on whether
 * an instance is given a handler for cache misses.
 */
public class VersionStore<D extends Storable>
{
    private static final boolean LOG = false;
    private static final Log log = new Log(LOG);

    private final Disk _disk;
    private final MissHandler<D> _mh;
    private final StorableFactory<D> _sf;

    private final BatonBarrier _glub = new BatonBarrier();
    private volatile long _lowWaterMark = -1;

    /**
     * A set of versions of a single datum.  This contains information
     * about all of the versions of the block that are available in
     * the store, regardless of whether they are on disk or in memory.
     */
    private class VersionSet
    {
        private final long _id;
        /**
         * All of the available versions.  The map is from the lower
         * bound of the time range of each version to the object
         * representing that version.
         *<p>
         * Access to this map must be synchronized.
         */
        private final SortedMap<Long, Version<D>> _versions =
            new TreeMap<Long, Version<D>>();
        /**
         * The deprecation handlers to call when the current unbounded
         * version in this set is deprecated.
         *<p>
         * Access to this must be protected by the lock on _versions.
         */
        private final List<DeprecationHandler> _dhs =
            new ArrayList<DeprecationHandler>();
        /**
         * The slot on disk that stores this version.
         */
        private final Disk.Slot _slot;
        /**
         * When there is a cache miss, it is necessary to prevent
         * other concurrent cache misses from trying to retrieve the
         * same block, so we use a coarse-grained lock to prevent
         * this.
         *<p>
         * This lock comes before the lock on _versions in the lock
         * ordering.
         */
        private final Object _missLock = new Object();

        /**
         * Construct a new version set containing no data and with
         * initially no disk representation
         */
        public VersionSet(long id)
        {
            this(_disk.newSlot(id));
        }

        /**
         * Construct a version set from an existing slot.  If the slot
         * contains data, it will be loaded into the version set.
         */
        public VersionSet(Disk.Slot slot)
        {
            _id = slot.getID();
            _slot = slot;
            Disk.BlockHandle bh = slot.getLatest();
            if (bh != null) {
                Version<D> v = new Version<D>(_sf, bh);
                _versions.put(bh.getRange().getLowerBound(), v);
            }
        }

        /**
         * If this version set contains a version that contains
         * timestamp, return it.  Otherwise, call on the miss handler
         * and store its result.  Otherwise, throw NoSuchDatum. If the
         * version returned is unbounded, add the deprecation handler
         * dh.
         */
        public Version<D> get(long timestamp,
                              DeprecationHandler dh)
            throws IOException, NoSuchDatum
        {
            Version<D> v = getLocal(timestamp, dh);
            if (v != null)
                return v;

            // Cache miss.
            if (_mh == null)
                throw new NoSuchDatum(_id, timestamp);
            // Retrieve the block from the miss handler.  Note that we
            // have to hold some lock while we do this to prevent
            // another thread from trying to retrieve the same datum.
            // Ideally, we would lock the range from the upper bound
            // of the version preceding timestamp to the lower bound
            // of the version following timestamp, but that's hard, so
            // we use a general miss lock that at least doesn't
            // prevent reads of existing data.
            //
            // XXX This could become a point of contention that
            // could decrease our network parallelism.
            synchronized (_missLock) {
                // The version could have been retrieved in the mean
                // time, so we check again just in case
                v = getLocal(timestamp, dh);
                if (v != null)
                    return v;

                // We actually missed
                if (LOG)
                    log.println("Handling miss for " + _id + "@" + timestamp);
                Pair<TimeRange, D> res = _mh.get(_id, timestamp);
                if (!res.first.contains(timestamp, Long.MAX_VALUE))
                    throw new IndexOutOfBoundsException
                        ("Miss handler returned range " + res.first +
                         ", which does not contain requested timestamp " +
                         timestamp);
                TimeRange range = res.first;
                D datum = res.second;
                // Write to disk
                Disk.BlockHandle bh = _slot.add(range, datum.getData());
                // Construct the version
                v = new Version<D>(_sf, bh, datum);
                synchronized (_versions) {
                    _versions.put(range.getLowerBound(), v);
                    if ((dh != null) &&
                        (!res.first.hasUpperBound())) {
                        _dhs.add(dh);
                    }
                }
            }
            return v;
        }

        /**
         * If this version set contains a version that contains
         * timestamp, return it.  Otherwise, call on the miss handler
         * and store its result.  Otherwise, throw NoSuchDatum.
         */
        public Version<D> get(long timestamp)
            throws IOException, NoSuchDatum
        {
            return get(timestamp, null);
        }

        /**
         * Retrieve the requested version, without going to the miss
         * handler.
         */
        private Version<D> getLocal(long timestamp,
                                    DeprecationHandler dh)
        {
            synchronized (_versions) {
                SortedMap<Long, Version<D>> sub = _versions.headMap(timestamp+1);
                if (!sub.isEmpty()) {
                    Version<D> v = sub.get(sub.lastKey());
                    if (v.getRange().contains(timestamp, _glub.get())) {
                        if ((dh != null) &&
                            (!v.getRange().hasUpperBound())) {
                            _dhs.add(dh);
                        }
                        return v;
                    }
                }
            }
            return null;
        }

        /**
         * If this version set contains an unbounded version store it.
         * Otherwise, call on the miss handler and register its
         * result.  Otherwise, throw NoSuchDatum.
         */
        public Version<D> getLatest(DeprecationHandler dh)
            throws IOException, NoSuchDatum
        {
            Version<D> v = getLatestLocal(dh);
            if (v != null)
                return v;

            // Cache miss.
            if (_mh == null)
                throw new NoSuchDatum(_id);

            synchronized (_missLock) {
                // Check if the version arrived in the mean time
                v = getLatestLocal(dh);
                if (v != null)
                    return v;

                if (LOG)
                    log.println("Handling miss for " + _id + "@latest");
                // We have to synchronize here to prevent a
                // deprecation for this block from arriving before the
                // block gets inserted
                synchronized (_versions) {
                    Pair<TimeRange, D> res = _mh.getLatest(_id);
                    if (res.first.hasUpperBound())
                        throw new IndexOutOfBoundsException
                            ("Miss handler returned range " + res.first +
                             ", which is not unbounded");
                    TimeRange range = res.first;
                    D datum = res.second;
                    // Write to disk
                    Disk.BlockHandle bh = _slot.add(range, datum.getData());
                    // Construct the version
                    v = new Version<D>(_sf, bh, datum);
                    // XXX We used to do this here
                    // synchronized (_versions) {
                    _versions.put(range.getLowerBound(), v);
                    if (dh != null)
                        _dhs.add(dh);
                }
            }
            return v;
        }

        /**
         * Retrieve the unbounded version if it is locally available,
         * without going to the miss handler.  Return null if such a
         * version does not exist.
         */
        public Version<D> getLatestLocal(DeprecationHandler dh)
        {
            synchronized (_versions) {
                if (!_versions.isEmpty()) {
                    Version<D> v = _versions.get(_versions.lastKey());
                    if (!v.getRange().hasUpperBound()) {
                        if (dh != null)
                            _dhs.add(dh);
                        return v;
                    }
                }
            }
            return null;
        }

        public void write(long timestamp, D datum, DeprecationHandler dh,
                          Set<DeprecationHandler> callbacks)
            throws IOException
        {
            TimeRange tr = new TimeRange(timestamp+1);
            synchronized (_versions) {
                Version<D> cur = getLatestLocal(null);
                if (cur != null)
                    deprecate(cur, timestamp, callbacks);
                // Write to disk
                Disk.BlockHandle bh = _slot.add(tr, datum.getData());
                // Construct the version
                Version<D> v = new Version<D>(_sf, bh, datum);
                _versions.put(timestamp+1, v);
                if (dh != null)
                    _dhs.add(dh);
            }
        }

        public void deprecate(long timestamp,
                              Set<DeprecationHandler> callbacks)
        {
            synchronized (_versions) {
                Version<D> unbounded = getLatestLocal(null);
                // Should we ignore this deprecate?  The server is
                // allowed to send up deprecations for blocks we don't
                // have, so this can happen.
                if (unbounded == null)
                    return;
                    // throw new IllegalStateException
                    //     ("Cannot deprecate a version set that does not " +
                    //      "contain an unbounded version");
                deprecate(unbounded, timestamp, callbacks);
                // XXX Flush range change?
            }
        }

        /**
         * The _versions lock must be held when this is called.
         */
        private void deprecate(Version<D> unbounded, long timestamp,
                               Set<DeprecationHandler> callbacks)
        {
            assert Thread.holdsLock(_versions);
            callbacks.addAll(_dhs);
            _dhs.clear();
            unbounded.deprecate(timestamp);
        }

        public void expire(long bound)
        {
            synchronized (_versions) {
                for (Iterator<Version<D>> it = _versions.values().iterator();
                     it.hasNext();) {
                    Version <D> v = it.next();
                    TimeRange r = v.getRange();

                    if (r.hasUpperBound() && r.getUpperBound() < bound) {
                        it.remove();
                    } else {
                        // No later versions can possibly satisfy the
                        // condition
                        break;
                    }
                }
            }
        }

        @Override
        public String toString()
        {
            StringBuilder b = new StringBuilder("VersionSet<id=" + _id);
            synchronized (_versions) {
                for (Version<D> v : _versions.values()) {
                    b.append(',');
                    v.toShortString(b);
                }
            }
            b.append('>');
            return b.toString();
        }
    }

    /**
     * A map of all of the data in the store.
     */
    private final ConcurrentMap<Long, VersionSet> _store =
        new ConcurrentHashMap<Long, VersionSet>();

    /**
     * Construct a multi-versioned store that stores its data in the
     * given directory.  If the directory already exists, the data
     * already in it will be loaded.  Otherwise, it will be created.
     * The given storable factory will be used to reconstruct stored
     * items from this disk.
     */
    public VersionStore(File directory, StorableFactory<D> sf)
        throws IOException
    {
        this(directory, sf, null);
    }

    /**
     * Construct a multi-versioned cache that stores cached data in
     * the given directory.  If the directory already exists, the data
     * already in it will be loaded.  Otherwise, it will be created.
     * If there is a miss on the cache, the miss handler will be
     * invoked to retrieve it.
     */
    public VersionStore(File directory, StorableFactory<D> sf,
                        MissHandler<D> mh)
        throws IOException
    {
        _sf = sf;
        _mh = mh;

        if (directory.exists()) {
            Pair<Disk, Map<Long, Disk.Slot>> res = Disk.load(directory);
            _disk = res.first;
            // Restore state from disk
            if (LOG)
                log.println("Restoring state from disk..");
            long glub = 0;
            for (Map.Entry<Long, Disk.Slot> entry : res.second.entrySet()) {
                VersionSet vs = new VersionSet(entry.getValue());
                if (_store.put(entry.getKey(), vs) != null)
                    throw new Bug("Loaded more than one version of " +
                                  entry.getKey());
                if (LOG)
                    log.println("..Restored " + vs);
                TimeRange tr = entry.getValue().getLatest().getRange();
                if (tr.hasUpperBound())
                    glub = Math.max(glub, tr.getUpperBound());
                else
                    glub = Math.max(glub, tr.getLowerBound());
            }
            if (glub != 0) {
                _glub.waitFor(0);
                _glub.update(glub);
            }
            if (LOG)
                log.println("..Initial GLUB is " + glub);
        } else {
            _disk = Disk.create(directory);
        }
    }

    /**
     * Retrieve the specified version of the specified block.
     *<p>
     * If the requested block is already in memory, simply return
     * it. Otherwise, search the cache for the version of the
     * requested block with the greatest lower bound less than or
     * equal to timestamp that either has no upper bound or has an
     * upper bound greater than timestamp.  If a block is found,
     * retrieve and return it.  Otherwise, if there is a miss handler,
     * forward the request to it.  Otherwise, throw NoSuchDatum.
     *<p>
     * If a block with no upper bound is found, add the deprecation
     * handler dh for it so that when the block becomes bounded, the
     * deprecation handler will be called.
     *
     * @throws IllegalTimestampException if timestamp is greater than
     * or equal to the global least upper bound or if timestamp is
     * less than the low water mark.
     * @throws NoSuchDatum if the requested block does not exist at
     * the specified time.
     */
    public Version<D> get(long id, long timestamp,
                          DeprecationHandler dh)
        throws IOException, IllegalTimestampException, NoSuchDatum
    {
        if (timestamp > _glub.get())
            throw new IllegalTimestampException
                (timestamp, "Timestamp is beyond the GLUB " + _glub.get());
        if (timestamp < _lowWaterMark)
            throw new IllegalTimestampException
                (timestamp,
                 "Timestamp is below low water mark " + _lowWaterMark);
        VersionSet entry = getVersionSet(id);
        return entry.get(timestamp, dh);
    }

    /**
     * Retrieve the specified version of the specified block.
     *<p>
     * If the requested block is already in memory, simply return
     * it. Otherwise, search the cache for the version of the
     * requested block with the greatest lower bound less than or
     * equal to timestamp that either has no upper bound or has an
     * upper bound greater than timestamp.  If a block is found,
     * retrieve and return it.  Otherwise, if there is a miss handler,
     * forward the request to it.  Otherwise, throw NoSuchDatum.
     *
     * @throws IllegalTimestampException if timestamp is greater than
     * or equal to the global least upper bound or if timestamp is
     * less than the low water mark.
     * @throws NoSuchDatum if the requested block does not exist at
     * the specified time.
     */
    public Version<D> get(long id, long timestamp)
        throws IOException, IllegalTimestampException, NoSuchDatum
    {
        return get(id, timestamp, null);
    }

    /**
     * Retrieve the latest version of the specified block.
     *<p>
     * If the cache contains an unbounded copy of the block, return it
     * (either from memory or from the disk).  Otherwise, if there is
     * a miss handler, forward the request to it.  Otherwise, throw
     * NoSuchDatum.  In any case where a block is returned, the
     * specified deprecation handler is registered with that block so
     * that, when the current version of the block becomes bounded,
     * the deprecation handler will be invoked.
     *
     * @throws NoSuchDatum if the requested block does not exist
     */
    public Version<D> getLatest(long id, DeprecationHandler dh)
        throws IOException, NoSuchDatum
    {
        VersionSet entry = getVersionSet(id);
        return entry.getLatest(dh);
    }

    /**
     * Retrieve the latest version of the specified block.
     *<p>
     * If the cache contains an unbounded copy of the block, return it
     * (either from memory or from the disk).  Otherwise, if there is
     * a miss handler, forward the request to it.  Otherwise, throw
     * NoSuchDatum.
     *<p>
     * Care should be used with this function because it does not
     * register a deprecation handler, so the entry could become out
     * of date without warning.
     *
     * @throws NoSuchDatum if the requested block does not exist
     */
    public Version<D> getLatest(long id)
        throws IOException, NoSuchDatum
    {
        VersionSet entry = getVersionSet(id);
        return entry.getLatest(null);
    }

    /**
     * Perform a write in the cache that terminates the snapshot
     * timestamp and creates the snapshot timestamp+1.
     *<p>
     * Everything in the write set that currently exists as unbounded
     * versions will be given an upper bound of timestamp.  The
     * deprecation handlers for these versions will be invoked, except
     * performingDh (if non-null).  All of the data in the write set
     * will be added to the cache with a lower bound of timestamp+1
     * and no upper bound.  If dh is non-null, it will be registered
     * as the deprecation handler for the newly written versions.  The
     * global least upper bound is updated to timestamp+1.
     *<p>
     * This blocks until the GLUB is equal to timestamp.
     *
     * @param timestamp The timestamp of the write.  The write
     * operation will create version timestamp+1
     * @param write A map from IDs to data to write
     * @param newDh The deprecation handler to register for the newly
     * written blocks.  May be null.
     * @param performingDh The deprecation handler of the transaction
     * performing the write.  Even if this deprecation handler is
     * registered for the blocks in the write set (which, presumably,
     * it will be), it will not be called as a result of the write.
     * May be null.
     *
     * @throws IllegalTimestampException if timestamp is less than the
     * global least upper bound.
     * @throws IllegalStateException if the GLUB surpasses timestamp
     * while this is blocking
     */
    public void write(long timestamp, Map<Long, D> write,
                      DeprecationHandler newDh,
                      DeprecationHandler performingDh)
        throws IOException, IllegalTimestampException, IllegalStateException
    {
        Set<DeprecationHandler> curdhs = new HashSet<DeprecationHandler>();
        _glub.waitFor(timestamp);
        if (LOG)
            log.println("Writing to timestamp " + timestamp + "..");
        try {
            for (Map.Entry<Long, D> w : write.entrySet()) {
                if (LOG)
                    log.println("..Block " + w.getKey() +
                                " <- " + w.getValue());
                VersionSet entry = getVersionSet(w.getKey());
                entry.write(timestamp, w.getValue(), newDh, curdhs);
            }
            for (DeprecationHandler curdh : curdhs) {
                if (curdh == performingDh) {
                    if (LOG)
                        log.println("..Skipping " + curdh);
                } else {
                    if (LOG)
                        log.println("..Invoking " + curdh);
                    curdh.onDeprecation(write.keySet(), timestamp);
                }
            }
        } finally {
            _glub.update(timestamp+1);
        }
    }

    /**
     * Deprecate versions in order to terminate the snapshot at
     * timestamp and create the snapshot timestamp+1.
     *<p>
     * All entries in the write set are given an upper bound of
     * timestamp.  The deprecation handlers for these are invoked.
     * The global least upper bound is updated to timestamp+1.
     *<p>
     * This blocks until the GLUB is equal to startTS.
     *
     * @throws IllegalTimestampException if startTS is less than the
     * global least upper bound, or if timestamp is less than startTS.
     * @throws IllegalStateException if the GLUB surpasses startTS
     * while this is blocking
     */
    public void deprecate(long startTS, long timestamp, Set<Long> deprecate)
        throws IllegalArgumentException, IllegalStateException
    {
        if (timestamp < startTS)
            throw new IllegalTimestampException
                (timestamp, "Deprecation timestamp must be greater than " +
                 "its start timestamp " + startTS);

        Set<DeprecationHandler> curdhs = new HashSet<DeprecationHandler>();
        _glub.waitFor(startTS);
        if (LOG)
            log.println("Deprecating from " + startTS + " to " + timestamp + "..");
        try {
            for (long id : deprecate) {
                if (LOG)
                    log.println("..Deprecating block " + id);
                VersionSet entry = getVersionSet(id);
                entry.deprecate(timestamp, curdhs);
            }
            for (DeprecationHandler curdh : curdhs) {
                if (LOG)
                    log.println("..Invoking " + curdh);
                curdh.onDeprecation(deprecate, timestamp);
            }
        } finally {
            _glub.update(timestamp+1);
        }
    }

    /**
     * Expire every block with an upper bound less than the bound.
     * Set the low water mark to bound.
     */
    public synchronized void expire(long bound)
        throws IOException
    {
        if (bound < _lowWaterMark)
            return;
        if (bound > _glub.get())
            throw new IllegalTimestampException
                (bound, "Cannot expire past GLUB of " + _glub.get());

        if (LOG)
            log.println("Expiring versions before " + bound);

        _lowWaterMark = bound;

        for (VersionSet vs : _store.values()) {
            vs.expire(bound);
        }

        _disk.expire(bound);
    }

    /**
     * Retrieve the value of the global least upper bound.
     */
    public long getLeastUpperBound()
    {
        return _glub.get();
    }

    /**
     * Construct a summary of all of the unbounded versions in the
     * cache.  The summary maps from id's to lower bounds.  The
     * returned map will not be modified by the cache in the future.
     *<p>
     * Note that this does not perform any synchronization, so it
     * should not be called when other operations may be active.  This
     * is only meant to be used during start-up, so this should not be
     * a problem.
     */
    public Map<Long, Long> getUnboundedSummary()
    {
        Map<Long, Long> res = new HashMap<Long, Long>();

        for (Map.Entry<Long, VersionSet> entry : _store.entrySet()) {
            Version<D> latest = entry.getValue().getLatestLocal(null);
            if (latest != null)
                res.put(entry.getKey(), latest.getRange().getLowerBound());
        }

        return res;
    }

    /**
     * Retrieve the VersionSet for the specified id, creating it if
     * necessary.
     */
    private VersionSet getVersionSet(long id)
    {
        VersionSet v = _store.get(id);
        if (v == null) {
            _store.putIfAbsent(id, new VersionSet(id));
            v = _store.get(id);
        }
        return v;
    }

    /**
     * Retrieve the underlying Disk.  Only for testing.
     */
    Disk testGetDisk()
    {
        return _disk;
    }
}
