package simpledb;

import java.io.*;
import java.util.*;

/**
 * ExHashFile is an implementation of a DbFile that stores a collection
 * of tuples in an extendible hash table.
 */

public class ExHashFile implements DbFile {
    private static final String DIRECTORY_SUFFIX = ".directory";
    private static final String DATA_SUFFIX = ".data";
    private int key;
    private HeapFile dataFile;
    private HeapFile directoryFile;
    private int tableid;
    private TupleDesc dirTd;
    
    /**
     * Constructor
     * create a new ExHashFile that stores pages in the specified pool.
     *
     * @param hf A reference to a base file that stores the file on disk -- your code may
     *  use additional files besides this one to store hash file state.
     * @param key the field which index is keyed on
     */
    public ExHashFile(File hf, int key) {
        this.key = key;
        dataFile = new HeapFile(new File(hf.getPath()));
        directoryFile = new HeapFile(new File(hf.getPath() + DIRECTORY_SUFFIX));
        tableid = hf.getAbsoluteFile().hashCode();

        Type typeAr[] = new Type[2];
        typeAr[0] = Type.INT_TYPE;
        typeAr[1] = Type.INT_TYPE;
        dirTd = new TupleDesc(typeAr);
        Database.getCatalog().addTable(directoryFile, dirTd);
    }

    /**
     * This method lets you initialize state for the hash file which 
     * you cannot initialize in the constructor.
     *
     * For example, you do not have access to the tuple
     * descriptor for this file in its constructor (because the
     * Catalog has not been initialized at that time). On the
     * other hand, you do have access to the Catalog in init(),
     * so you can access it from here.
     *
     * If you do not need to initialize state based on the tuple 
     * descriptor at this time, you can leave this method empty.
     */
    public void init() throws IOException{
        dataFile.init();
        directoryFile.init();
        try {
            setGlobalLevel(new TransactionId(), 0);
            Tuple t = new Tuple(dirTd);
            t.setField(0, new IntField(new Integer(0)));
            t.setField(1, new IntField(new Integer(0)));
            directoryFile.setTuple(new TransactionId(), 1, t);
        } catch (DbException e) {
            throw new RuntimeException("DbException on directory init");
        } catch (TransactionAbortedException e) {
            throw new RuntimeException("TransactionAbortedException on directory init");
        }

    }
  
    /**
     *@returns a table id for this table.
     */
    public int id() {
        return tableid;
    }
    
    /**
     * Returns a Page from the file
     */    
    public Page readPage(PageId pid) throws NoSuchElementException {
        return dataFile.readPage(pid);
    }
  
    /**
     * Push the specified page to disk.
     * This page must have been previously read from this file via a call to
     * readPage. 
     *
     * @throws IOException if the write fails
     */
    public void writePage(Page page) throws IOException {
        dataFile.writePage(page);
    }
    
    /**
     * Adds the specified tuple to the file on behalf of transaction.
     * This method will acquire a lock on the affected pages of the file, and
     * may block until the lock can be acquired.
     *
     * @param tid The transaction performing the update
     * @param t The tuple to add.  This tuple will be updated to reflect that
     *          it is now stored in this file.
     * @throws DbException if the tuple cannot be added
     * @throws IOException if the needed file can't be read/written
     */
    public Vector<Page> addTuple(TransactionId tid, Tuple t) 
	throws DbException, IOException, TransactionAbortedException, 
	       UnsupportedOperationException {
        int hash = t.getField(key).hashCode();
        int dirIndex = 0, pageMap = 0;
        
        while (true) {
            try {
                dirIndex = hash % (1 << getGlobalLevel(tid));
                pageMap = getPageMap(tid, dirIndex);
                HeapPage p = dataFile.getPage(tid, pageMap, Permissions.READ_WRITE);
                p.addTuple(t);
                p.markDirty(true, tid);
                return null;
            } catch (DbException e) {
                rehash(tid, dirIndex);
            }
        }
    }

    /**
     * Removes the specifed tuple from the file on behalf of the specified
     * transaction.
     * This method will acquire a lock on the affected pages of the file, and
     * may block until the lock can be acquired.
     *
     * @throws DbException if the tuple cannot be deleted or is not a member
     *   of the file
     */
    public Page deleteTuple(TransactionId tid, Tuple t) 
	throws DbException, TransactionAbortedException{
	// some code goes here
	return null;
    }

    /**
     * get the specifed tuples from the file based on its keyfield value
     * on behalf of the specified transaction.
     * This method will acquire a lock on the affected pages of the file, and
     * may block until the lock can be acquired.
     */
    public DbFileIterator indexIterator(TransactionId tid, Field kfd) {
        return new ExHashFileIndexIterator(tid, kfd);
    }

    public DbFileIterator iterator(TransactionId tid) {
        return dataFile.iterator(tid);
    }


    private int getGlobalLevel(TransactionId tid) 
        throws DbException, IOException, TransactionAbortedException {
        Tuple t = directoryFile.getTuple(tid, 0);
        IntField f = (IntField) t.getField(1);
        return f.value();
    }

    private void setGlobalLevel(TransactionId tid, int l) 
        throws DbException, IOException, TransactionAbortedException {
        Tuple t = new Tuple(dirTd);
        t.setField(0, new IntField(new Integer(-1)));
        t.setField(1, new IntField(new Integer(l)));
        directoryFile.setTuple(tid, 0, t);
    }

    private int getPageMap(TransactionId tid, int pgno) 
        throws DbException, IOException, TransactionAbortedException {
        Tuple t = directoryFile.getTuple(tid, pgno+1);
        IntField f = (IntField) t.getField(0);
        return f.value();        
    }

    private void setPageMap(TransactionId tid, int pgno, int val) 
        throws DbException, IOException, TransactionAbortedException {
        int pageLevel = getPageLevel(tid, pgno);

        Tuple t = new Tuple(dirTd);
        t.setField(0, new IntField(new Integer(val)));
        t.setField(1, new IntField(new Integer(pageLevel)));
        directoryFile.setTuple(tid, pgno+1, t);
    }

    private int getPageLevel(TransactionId tid, int pgno) 
        throws DbException, IOException, TransactionAbortedException {
        Tuple t = directoryFile.getTuple(tid, pgno+1);
        IntField f = (IntField) t.getField(1);
        return f.value();        
    }

    private void setPageLevel(TransactionId tid, int pgno, int val) 
        throws DbException, IOException, TransactionAbortedException {
        int pageMap = getPageMap(tid, pgno);

        Tuple t = new Tuple(dirTd);
        t.setField(0, new IntField(new Integer(pageMap)));
        t.setField(1, new IntField(new Integer(val)));
        directoryFile.setTuple(tid, pgno+1, t);
    }

    private void rehash(TransactionId tid, int pgno)
        throws DbException, IOException, TransactionAbortedException {
        // Increment the page level, and the global level if necessary
        int oldLevel = getPageLevel(tid, pgno);
        pgno = pgno % (1 << oldLevel);
        
        if (oldLevel == getGlobalLevel(tid)) {
            incrementGlobalLevel(tid);
        }
        int globalLevel = getGlobalLevel(tid);
        int newLevel = oldLevel+1;

        HeapPage oldPage = dataFile.getPage(tid, getPageMap(tid, pgno),
                                            Permissions.READ_WRITE);
        HeapPageId newPageId = dataFile.allocateNewPage();
        HeapPage newPage = dataFile.getPage(tid, newPageId.pageno(),
                                            Permissions.READ_WRITE);

        for (int base = 0; base < (1 << globalLevel);
             base += (1 << newLevel)) {
            setPageLevel(tid, base + pgno, newLevel);
            setPageMap(tid, base + pgno + (1 << oldLevel), newPageId.pageno());
            setPageLevel(tid, base + pgno + (1 << oldLevel), oldLevel+1);
        }

        oldPage.markDirty(true, tid);
        newPage.markDirty(true, tid);

        for (int i = 0; i < dataFile.slotsPerPage(); i++) {
            Tuple t = oldPage.getTuple(i);
            if (t != null) {
                int hash = t.getField(key).hashCode() % (1 << newLevel);
                if (hash == pgno) {
                    // Page hashes to same bucket; don't move it
                } else if (hash == (pgno + (1 << oldLevel))) {
                    // Page hashes to new bucket
                    oldPage.deleteTuple(t);
                    newPage.addTuple(t);
                } else {
                    throw new RuntimeException("Page hashes to wrong bucket!");
                }
            }

        }

    }

    private void incrementGlobalLevel(TransactionId tid) 
        throws DbException, IOException, TransactionAbortedException {
        int oldLevel = getGlobalLevel(tid);
        setGlobalLevel(tid, oldLevel+1);

        for (int i = 0; i < (1 << oldLevel); i++) {
            directoryFile.setTuple(tid, i + (1 << oldLevel) + 1,
                                   directoryFile.getTuple(tid, i+1));
        }
    }

    /**
     * Iterator for an index scan. This takes a field value and
     * returns all tuples with that as their key by loading the
     * appropriate page, obtaining an iterator over the page's
     * contents, and filtering out all tuples that don't have the
     * matching key.
     */
    private class ExHashFileIndexIterator implements DbFileIterator {
        private Iterator child;
        private TransactionId tid;
        private Field kfd;
        

        public ExHashFileIndexIterator(TransactionId tid, Field kfd) {
            this.tid = tid;
            this.kfd = kfd;
        }

        public TransactionId getTransactionId() {
            return tid;
        }

        public void open()
            throws DbException, TransactionAbortedException {
            try {
                int hash = kfd.hashCode() % (1 << getGlobalLevel(tid));
                HeapPage p = dataFile.getPage(tid, getPageMap(tid, hash),
                                              Permissions.READ_ONLY);
                child = p.iterator();
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException("IO exception on index iterator open");
            }
        }

        public Tuple getNext()
            throws TransactionAbortedException, DbException,
                   NoSuchElementException {
            while (true) {
                Tuple t = (Tuple) child.next();
                if (t.getField(key).equals(kfd)) {
                    return t;
                }
            }
        }

        public void rewind()
            throws DbException, TransactionAbortedException {
            open();
        }

        public void close() {
        }
    }
}

