package anastore.store;

import anastore.util.Log;
import anastore.util.Pair;
import anastore.util.TimeRange;

import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.lang.ref.WeakReference;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.*;

/**
 * An on-disk versioned block store.  A disk store consists of a set
 * of slots, each with a unique id.  Each slot stores zero or more
 * blocks with non-overlapping time ranges.  Given a slot, its latest
 * block can be retrieved and new blocks can be created that either
 * replace the latest block or get inserted into some time in the
 * past.  Blocks are retrieved through block handles, which store
 * enough metadata to retrieve the block from disk, but only cache the
 * actual block data.
 *<p>
 * Note that the operations this supports are very limited.  In
 * particular, it does <i>not</i> support lookups.  The only ways to
 * have a handle to a block are to obtain it from the map of blocks
 * read in when the store is loaded, or to have created it.
 *<p>
 * The on-disk representation consists of two components: a primary
 * store and a history.  The primary store stores the latest version
 * of the block in each slot, while the history stores older
 * versions.
 */
class Disk
{
    private static final boolean LOG = false;
    private static final Log log = new Log(LOG);

    private static final boolean FILELOG = true;
    private static final Log filelog = new Log(FILELOG);

    private static final int LOG_COMPACTION = 10;

    private final File _dir;
    private final LazyFileChannel _pri;
    private final SortedMap<Long, LazyFileChannel> _logs =
        new TreeMap<Long, LazyFileChannel>();

    /**
     * A one-element LRU to delay the closing of LazyFileChannel's.
     */
    private LazyFileChannel _lastClosed = null;
    private final Object _lastClosedLock = new Object();

    private class LazyFileChannel
    {
        private FileChannel _fc = null;
        private final File _path;
        private int _refCount = 0;

        public LazyFileChannel(File path)
        {
            _path = path;
        }

        public synchronized FileChannel acquire()
            throws IOException
        {
            if (_refCount++ == 0) {
                synchronized (_lastClosedLock) {
                    if (_lastClosed == this) {
                        _lastClosed = null;
                        assert _fc != null;
                    } else {
                        if (LOG)
                            log.println("Opening " + _path);
                        RandomAccessFile raf = new RandomAccessFile(_path, "rw");
                        _fc = raf.getChannel();
                    }
                }
            }
            assert _fc != null;
            return _fc;
        }

        public synchronized void release()
            throws IOException
        {
            if (--_refCount == 0) {
                synchronized (_lastClosedLock) {
                    if (_lastClosed != null) {
                        if (LOG)
                            log.println("Closing " + _lastClosed);
                        // Yes, closing the FileChannel closes the file
                        try {
                            _lastClosed._fc.close();
                        } finally {
                            _lastClosed._fc = null;
                        }
                    }
                    _lastClosed = this;
                }
            }
            assert _refCount >= 0;
        }

        public void delete()
        {
            assert _refCount == 0;
            _path.delete();
        }

        public String toString()
        {
            return _path.toString();
        }
    }

    /** Protected by _logs lock */
    private long _lowWaterMark = -1;

    private Disk(File dir, File fpri)
        throws IOException
    {
        _dir = dir;
        _pri = new LazyFileChannel(fpri);
        _pri.acquire();
    }

    public void close()
        throws IOException
    {
        _pri.release();
        for (LazyFileChannel lfc : _logs.values())
            assert lfc._refCount == 0;
    }

    /**
     * Create a new, empty disk store.
     *
     * @throws IllegalArgumentException if directory already exists
     */
    public static Disk create(File directory)
        throws IOException, IllegalArgumentException
    {
        if (!directory.mkdirs())
            throw new IllegalArgumentException
                ("Directory " + directory + " already exists");

        // Create the primary store
        File fpri = new File(directory, "primary");

        if (FILELOG)
            filelog.println("Creating block store in " + directory);

        return new Disk(directory, fpri);
    }

    /**
     * Load an existing store.  Note that only the blocks in the
     * primary store are read back in.  The history is discarded.
     *
     * @return The loaded Disk, and a map from ID's to slots
     * containing the latest version of each block currently on disk
     * @throws IllegalArgumentException if the directory does not
     * exist or does not contain a primary store
     */
    public static Pair<Disk, Map<Long, Slot>> load(File directory)
        throws IOException, IllegalArgumentException
    {
        File fpri = new File(directory, "primary");
        if (!fpri.exists())
            throw new IllegalArgumentException
                ("File " + fpri + " does not exist");

        // XXX Delete old logs

        if (FILELOG)
            filelog.println("Loading block store from " + directory);

        Disk disk = new Disk(directory, fpri);
        return new Pair<Disk, Map<Long, Slot>>(disk, disk.getLatestSlots());
    }

    /**
     * Scan the primary store to construct the map of latest blocks.
     */
    private Map<Long, Slot> getLatestSlots()
        throws IOException
    {
        Map<Long, Slot> slots = new HashMap<Long, Slot>();

        FileChannel pri = _pri.acquire();
        try {
            long pos = 0;
            long limit = pri.size();
            while (pos < limit) {
                BlockHandle bh = new BlockHandle(_pri, pos);
                Slot slot = new Slot(bh._id);
                slot._pos = pos;
                slot._latest = bh;
                slots.put(slot._id, slot);
                pos += bh._size + BLOCK_HEADER_SIZE;
            }
        } finally {
            _pri.release();
        }

        return slots;
    }

    /**
     * Construct a new empty slot.  This slot will be allocated space
     * in the primary store when the first block is added to it.
     */
    public Slot newSlot(long id)
    {
        if (LOG)
            log.println("Creating slot " + id);
        return new Slot(id);
    }

    /**
     * Delete blocks whose upper bound is less than the bound.
     */
    public void expire(long bound)
        throws IOException
    {
        if (FILELOG)
            filelog.println("Expiring logs before " + bound);
        synchronized (_logs) {
            for (Iterator<Map.Entry<Long, LazyFileChannel>> it =
                     _logs.entrySet().iterator();
                 it.hasNext();) {
                Map.Entry<Long, LazyFileChannel> entry = it.next();
                if (entry.getKey() < bound) {
                    LazyFileChannel lfc = entry.getValue();
                    if (FILELOG)
                        filelog.println("Expiring " + lfc);
                    lfc.delete();
                    it.remove();
                } else {
                    // Later versions can't satisfy the condition
                    break;
                }
            }
            if (bound > _lowWaterMark)
                _lowWaterMark = bound;
        }
    }

    /**
     * Retrieve the log for the specified upper bound.  All blocks in
     * this log must have this upper bound.  The appropriate file will
     * be created if it does not exist.
     */
    private LazyFileChannel getLog(long upperBound)
        throws IOException
    {
        synchronized (_logs) {
            if (upperBound < _lowWaterMark)
                throw new IllegalArgumentException
                    ("Request for log " + upperBound +
                     " before low water mark " + _lowWaterMark);

            upperBound = (upperBound + (LOG_COMPACTION-1) -
                          (upperBound % LOG_COMPACTION));

            LazyFileChannel lfc = _logs.get(upperBound);
            if (lfc == null) {
                File path = new File(_dir, "log-" + upperBound);
                lfc = new LazyFileChannel(path);
                _logs.put(upperBound, lfc);
                if (FILELOG)
                    filelog.println("Created log " + path);
            }
            return lfc;
        }
    }

    /**
     * A slot on disk.  A slot contains zero or more non-overlapping
     * blocks all with the same id.  It provides convenient access
     * only to the latest block in the slot.
     */
    public class Slot
    {
        private final long _id;
        private long _pos;
        private BlockHandle _latest;

        private Slot(long id)
        {
            _id = id;
            _pos = 0;
            _latest = null;
        }

        /**
         * Retrieve the ID of this slot, as originally passed to
         * newSlot.
         */
        public long getID()
        {
            return _id;
        }

        /**
         * Get a handle to the block with the greatest timestamp in
         * this slot.  Note that this may not be unbounded.
         */
        public synchronized BlockHandle getLatest()
        {
            return _latest;
        }

        /**
         * Add a block to this slot.  If this slot is empty (there is
         * no latest block), then it is allocated space on disk in the
         * primary store and the block is written there.  If the block
         * is more recent than the latest block, it will move the
         * latest block to the history and replace it in the primary
         * store.  Otherwise, the block will be added directly to the
         * history.  No matter what, all existing block handles are
         * guaranteed to remain valid.
         *
         * @throws IllegalArgumentException if the given range is
         * incomparable with the range of the latest block, or
         * overlaps exactly, or the size of data differs from the size
         * of the latest block
         */
        public synchronized BlockHandle add(TimeRange range, byte[] data)
            throws IOException, IllegalArgumentException
        {
            if (_latest == null) {
                // This is this slot's first block, so allocate space
                // now
                _latest = new BlockHandle(_pri, _id, range, data);
                _pos = _latest._pos;
                if (LOG)
                    log.println(this + " assigned to byte " + _pos);
                return _latest;
            }

            if (data.length != _latest._size)
                throw new IllegalArgumentException
                    ("Cannot replace " + _latest._size + " byte block with " +
                     data.length + " byte block");

            int cmp = range.compareTo(_latest._range);

            if (cmp > 0) {
                // Replace latest block
                if (LOG)
                    log.println(this + " replacing " + _latest._range +
                                " with " + range);
                _latest.moveToLog();
                _latest = new BlockHandle(_pri, _pos, _id, range, data);
                return _latest;
            } else if (cmp < 0) {
                // Add to past
                if (LOG)
                    log.println(this + " adding " + range + " to past");
                LazyFileChannel lfc = getLog(range.getUpperBound());
                return new BlockHandle(lfc, _id, range, data);
            } else {
                throw new IllegalArgumentException
                    ("Added block exactly overlaps with latest block");
            }
        }

        @Override
        public String toString()
        {
            return "Slot<" + _id + ">";
        }
    }

    private static final int BLOCK_HEADER_SIZE = 8*3 + 4;

    /**
     * A handle to a block stored on disk.  The data in a block is
     * immutable.
     *<p>
     * Blocks are stored in the following format:
     *<table>
     * <tr><td>long</td><td>id</td></tr>
     * <tr><td>long</td><td>lowerBound</td></tr>
     * <tr><td>long</td><td>upperBound (or -1)</td></tr>
     * <tr><td>int</td><td>size</td></tr>
     * <tr><td>byte[size]</td><td>data</td></tr>
     *</table>
     */
    public class BlockHandle
    {
        private LazyFileChannel _file;
        private long _pos;

        private final long _id;
        private final TimeRange _range;
        private final int _size;
        private WeakReference<byte[]> _data;

        /**
         * Construct a block by reading from the given position of a
         * file
         */
        private BlockHandle(LazyFileChannel file, long pos)
            throws IOException
        {
            _file = file;
            _pos = pos;

            ByteBuffer header = ByteBuffer.allocate(BLOCK_HEADER_SIZE);
            FileChannel fc = _file.acquire();
            try {
                fc.read(header, pos);
            } finally {
                _file.release();
            }
            header.flip();
            _id = header.getLong();
            long lb = header.getLong();
            long ub = header.getLong();
            if (ub == -1)
                _range = new TimeRange(lb);
            else
                _range = new TimeRange(lb, ub);
            _size = header.getInt();
            _data = null;

            if (LOG)
                log.println(this + " read from byte " + pos +
                            " of " + file);

            assert header.remaining() == 0;
        }

        /**
         * Construct a block by appending its data to the given file
         */
        private BlockHandle(LazyFileChannel file,
                            long id, TimeRange range, byte[] data)
            throws IOException
        {
            this(file, -1, id, range, data);
        }

        /**
         * Construct a block, writing its data to the given position
         * of the file.
         */
        private BlockHandle(LazyFileChannel file, long pos,
                            long id, TimeRange range, byte[] data)
            throws IOException
        {
            _file = file;
            _id = id;
            _range = range;
            _size = data.length;
            _data = new WeakReference<byte[]>(data);

            ByteBuffer header = encodeHeader();
            ByteBuffer body = ByteBuffer.wrap(data);
            FileChannel fc = _file.acquire();
            try {
                if (pos == -1) {
                    synchronized (fc) {
                        _pos = fc.size();
                        fc.write(header, _pos);
                        fc.write(body, _pos + BLOCK_HEADER_SIZE);
                    }
                } else {
                    _pos = pos;
                    fc.write(header, _pos);
                    fc.write(body, _pos + BLOCK_HEADER_SIZE);
                }
            } finally {
                _file.release();
            }

            if (LOG)
                log.println(this + " created at byte " + _pos +
                            " of " + _file);
        }

        /**
         * Construct a byte buffer containing the header data of this
         * block.
         */
        private ByteBuffer encodeHeader()
        {
            ByteBuffer buf = ByteBuffer.allocate(BLOCK_HEADER_SIZE);
            buf.putLong(_id);
            buf.putLong(_range.getLowerBound());
            if (_range.hasUpperBound())
                buf.putLong(_range.getUpperBound());
            else
                buf.putLong(-1);
            buf.putInt(_size);

            assert buf.remaining() == 0;
            buf.rewind();
            return buf;
        }

        /**
         * Retrieve the ID of the slot containing this block.
         */
        public long getID()
        {
            return _id;
        }

        /**
         * Retrieve the time range associated with this block.  Note
         * that changes to this time range do not automatically get
         * propagated to disk.  Currently, the only way to propagate a
         * time stamp change to disk is to replace a block with a more
         * recent one.
         */
        public TimeRange getRange()
        {
            return _range;
        }

        /**
         * Read the data of this block.
         */
        public synchronized byte[] read()
            throws IOException
        {
            byte[] data = _data == null ? null : _data.get();
            if (data == null)
                return readFromDisk().array();
            else
                return data;
        }

        /**
         * Retrieve the data as a ByteBuffer.  This must be called
         * while synchronized on this.
         */
        private ByteBuffer readBuf()
            throws IOException
        {
            byte[] data = _data == null ? null : _data.get();
            if (data == null)
                return readFromDisk();
            else
                return ByteBuffer.wrap(data);
        }

        /**
         * Read the data in from the disk, update _data, and return a
         * ByteBuffer backed by the data array.  This must be called
         * while synchronized on this.
         */
        private ByteBuffer readFromDisk()
            throws IOException
        {
            byte[] data = new byte[_size];
            ByteBuffer buf = ByteBuffer.wrap(data);
            FileChannel fc = _file.acquire();
            try {
                fc.read(buf, _pos + BLOCK_HEADER_SIZE);
            } finally {
                _file.release();
            }
            _data = new WeakReference<byte[]>(data);
            if (LOG)
                log.println(this + " read from byte " + _pos +
                            " of " + _file);
            buf.flip();
            return buf;
        }

        /**
         * Migrate this block to the appropriate log.  The block's
         * time range must have an upper bound before this is
         * performed.
         */
        private void moveToLog()
            throws IOException
        {
            assert _file == _pri;
            assert _range.hasUpperBound();

            ByteBuffer header = encodeHeader();
            ByteBuffer body = readBuf();
            synchronized (_logs) {
                if (_range.getUpperBound() < _lowWaterMark) {
                    _file = null;
                    if (LOG)
                        log.println(this + " migrated to the void");
                    return;
                }
                _file = getLog(_range.getUpperBound());
                FileChannel fc = _file.acquire();
                synchronized (fc) {
                    try {
                        _pos = fc.size();
                        fc.write(header, _pos);
                        fc.write(body, _pos + BLOCK_HEADER_SIZE);
                    } finally {
                        _file.release();
                    }
                }
            }
            if (LOG)
                log.println(this + " migrated to byte " + _pos +
                            " of " + _file);
        }

        /**
         * Forcibly clear the cache of this block's data.  Only for
         * testing.
         */
        void testForgetData()
        {
            _data.clear();
        }

        @Override
        public String toString()
        {
            return "Block<" + _id + "@" + _range + ">";
        }
    }
}
