package simpledb;

import java.io.*;
import java.util.*;
import java.text.ParseException;

/**
 * HeapFile is an implementation of a DbFile that stores a collection
 * of tuples in no particular order.  Tuples are stored on pages, each of 
 * which is a fixed size, and the file is simply a collection of those 
 * pages. HeapFile works closely with HeapPage.  The format of HeapPages 
 * is described in the HeapPage constructor.
 *
 * @see simpledb.HeapPage#HeapPage
 * @author Sam Madden
 */
public class HeapFile implements DbFile {
 
    File f = null;
    int tableid = -1;
    
    // a hack to remember the last page that had a free slot
    int lastEmptyPage = -1;

    /**
     * Constructor.
     * Creates a new heap file that stores pages in the specified buffer pool.
     *
     * @param f The file that stores the on-disk backing store for this DbFile.
     */
    public HeapFile(File f) {
	this.f = f;
	this.tableid = f.getAbsoluteFile().hashCode();
    }

    /**
     * Initializer.
     * Builds a new heap file from scratch. This has to be called
     * separately from the constructor because, of course, only the
     * catalog knows the TupleDesc, and it's not there at constructor
     * time...
     */
    public void init() throws IOException {
        if (f.exists()) {
            f.delete();
        }

        // Make a new empty page to start things off.
        HeapPageId pid = new HeapPageId(tableid, numPages());
        HeapPage page = new HeapPage(pid);

        // Write the new empty page to disk.
        writePage(page);
        lastEmptyPage = 0;
    }
    
    /**
     * @return an ID uniquely identifying this HeapFile
     */
    public int id() {
	return tableid;
    }
    
    /**
     * Returns a Page from the file.
     */
    public HeapPage readPage(PageId pid) throws NoSuchElementException {
	HeapPageId id = (HeapPageId) pid;
	BufferedInputStream br = null;
	
	try {
	    br = new BufferedInputStream(new FileInputStream(f));
	    br.skip(id.pageno() * bytesPerPage());
	    
	    byte pageBuf[] = new byte[bytesPerPage()];
	    if(br.read(pageBuf, 0, bytesPerPage()) == -1){
		br.close();
		throw new NoSuchElementException("Read past end of table.");
	    }
	    br.close();
	    HeapPage p = new HeapPage(id, pageBuf);
	    return p;
	    
	} catch(IOException e) {
	    // e.printStackTrace();
	    try {
		if (br != null) br.close();
	    } catch (IOException ioe) {
	    }
	    throw new NoSuchElementException(e.getMessage());
	}
    }
    
    /**
     * Writes the given page to the appropriate location in the file.
     */
    public void writePage(Page page) throws IOException {
	HeapPage p = (HeapPage) page; 	
	byte[] data = p.getPageData();
	RandomAccessFile rf = new RandomAccessFile(f, "rw");
	rf.seek(p.id().pageno() * bytesPerPage());
	rf.write(data);
	rf.close();
    }

    /**
     * Returns the requested page of this HeapFile. This differs from
     * readPage in that readPage is intended for use solely by the
     * BufferPool, whereas this goes *through* the buffer pool and is
     * intended for general random-access use to pages.
     */
    public HeapPage getPage(TransactionId tid, int pageno, Permissions perm)
        throws NoSuchElementException, TransactionAbortedException,
               DbException {
        HeapPageId pid = new HeapPageId(tableid, pageno);
        return (HeapPage) Database.getBufferPool().getPage(tid, pid,
                                                           perm);
    }
    
    /**
     * Returns the number of pages in this HeapFile.
     */
    public int numPages() {
	return (int)(f.length() / bytesPerPage());
    }
    
    /**
     * Adds the specified tuple to the table under the specified TransactionId.
     *
     * @throws DbException
     * @throws IOException
     */
    public Vector<Page> addTuple(TransactionId tid, Tuple t) 
	throws DbException, IOException, TransactionAbortedException {

        // Try to find an empty page.
        HeapPage page = findEmptyPage(tid, Permissions.READ_WRITE);

        if (page == null) {
            // Didn't find one; need to make a new one.
            allocateNewPage();
            
            page = findEmptyPage(tid, Permissions.READ_WRITE);
        }

        page.markDirty(true, tid);
        page.addTuple(t);

        // It's not clear why on earth this function ought to even
        // consider returning a Vector<Page>, but the test cases
        // expect it, and far be it from me to mess with them.
        return null;
    }
    
    /**
     * Deletes the specified tuple from the table, under the specified
     * TransactionId.
     */
    public Page deleteTuple(TransactionId tid, Tuple t) 
	throws DbException, TransactionAbortedException {
        HeapPageId pid = (HeapPageId) t.getRecordID().pageid();
        assert(pid.tableid() == tableid);
        
        HeapPage page = (HeapPage)
            Database.getBufferPool().getPage(tid, pid,
                                             Permissions.READ_WRITE);
        page.markDirty(true, tid);
        page.deleteTuple(t);
        lastEmptyPage = pid.pageno();
        
        return null;
    }
    
    /**
     * an iterator over all tuples on this file, over all pages.
     */
    public DbFileIterator iterator(TransactionId tid) {
        return new HeapFileIterator(tid);
    }

    /**
     * Returns the number of slots in each page of the HeapFile.
     */
    public int slotsPerPage() {
        TupleDesc td = Database.getCatalog().getTupleDesc(tableid);
	return (BufferPool.PAGE_SIZE / td.getSize());
    }
    
    /**
     * Returns the number of bytes in the header of a page in the HeapFile.
     * Assumes sizeof(int) == 4.
     */
    private int headerBytes() {
	TupleDesc td = Database.getCatalog().getTupleDesc(tableid);
	int hb = (((BufferPool.PAGE_SIZE / td.getSize()) / 32) +1) * 4;
	return hb;
    }

    /**
     * Returns the number of bytes on a page, including the number of bytes
     * in the header.
     */
    public int bytesPerPage() {
	return BufferPool.PAGE_SIZE + headerBytes();
    }

    /**
     * Find a page with an empty slot, or return null if none exists.
     */
    private HeapPage findEmptyPage(TransactionId tid, Permissions perm)
        throws DbException, IOException, TransactionAbortedException {
        HeapPageId pid;
        HeapPage page;
        
        // Try the hint first
        if ((lastEmptyPage != -1) &&
            (lastEmptyPage < numPages())) {
            pid = new HeapPageId(tableid, lastEmptyPage);
            page = (HeapPage)
                Database.getBufferPool().getPage(tid, pid, perm);
            
            if (page.getNumEmptySlots() != 0) {
                return page;
            }
        }

        // Hint didn't work. Fall back on scanning for empty pages. As
        // a (possibly completely off base) heuristic, start the scan
        // at the end in hopes that the last page might not be full.
        for (int i = numPages()-1; i >= 0; i--) {
            pid = new HeapPageId(tableid, i);
            page = (HeapPage)
                Database.getBufferPool().getPage(tid, pid, perm);
            
            if (page.getNumEmptySlots() != 0) {
                // Update the hint to this page.
                lastEmptyPage = i;
                return page;
            }            
        }
        
        // Nope, none found. Indicate failure.
        return null;
    }

    /**
     * Allocate a new page in the HeapFile. This is used internally
     * when the heap file fills up, and can also be used by callers to
     * ensure that enough pages are available for random access.
     */
    public HeapPageId allocateNewPage()
        throws IOException {
        HeapPageId pid = new HeapPageId(tableid, numPages());
        HeapPage page = new HeapPage(pid);
        
        // Write the new empty page to disk.
        writePage(page);
        
        // Update the hint and get the page back through the
        // buffer pool (this is a little circuitous and
        // inefficient, but not a big deal).
        lastEmptyPage = pid.pageno();

        return pid;
    }

    /**
     * Returns the ith tuple in the HeapFile, loading the appropriate
     * page as necessary. Returns null if no such tuple exists.
     */
    public Tuple getTuple(TransactionId tid, int i)
        throws DbException, IOException, TransactionAbortedException {

        int pageNum = i / slotsPerPage();
        int pageOffset = i % slotsPerPage();
        HeapPage page = getPage(tid, pageNum, Permissions.READ_ONLY);
        
        return page.getTuple(pageOffset);
    }

    /**
     * Stores tuple t as the ith tuple in the HeapFile, loading the
     * appropriate page as necessary, and overwriting anything else
     * that might happen to be in that slot.
     */
    public void setTuple(TransactionId tid, int i, Tuple t) 
        throws DbException, IOException, TransactionAbortedException {
        
        int pageNum = i / slotsPerPage();
        int pageOffset = i % slotsPerPage();
        while (pageNum >= numPages()) {
            allocateNewPage();
        }

        HeapPage page = getPage(tid, pageNum, Permissions.READ_WRITE);
        
        page.markDirty(true, tid);
        page.setTuple(pageOffset, t);
    }
    
    /**
     * Iterator over all tuples in all pages in this table.
     */
    private class HeapFileIterator implements DbFileIterator {
        /*
         * The strategy for iterating is to keep 'page' as the next
         * page in the file that has tuples available, and 'pid' its
         * id. pageIter is an iterator over the tuples in that
         * page. When pageIter runs out of items, we should call
         * findNextPage to move to the next page.
         */
        private TransactionId tid;
        private Page page;
        private HeapPageId pid;
        private Iterator pageIter;

        public HeapFileIterator(TransactionId tid) {
            this.tid = tid;
        }

        public TransactionId getTransactionId() {
            return tid;
        }

        public void open() throws
            DbException, TransactionAbortedException {
            
            rewind();
        }

        public Tuple getNext() throws TransactionAbortedException,
                                      DbException, NoSuchElementException
        {
            if (pageIter == null) {
                throw new NoSuchElementException();
            }

            Tuple t = (Tuple) pageIter.next();

            if (!pageIter.hasNext()) {
                findNextPage();
            }

            return t;
        }
        
        public void rewind() throws DbException,
                                    TransactionAbortedException {
            page = null;
            pid = null;
            pageIter = null;
            
            findNextPage();
        }

        public TupleDesc getTupleDesc() {
            return Database.getCatalog().getTupleDesc(tableid);
        }

        public void close() {
            page = null;
            pid = null;
            pageIter = null;
        }

        private void findNextPage() throws DbException,
                                           TransactionAbortedException {
            // Select a page to load
            int pageNo;

            if (pid == null) {
                pageNo = 0;
            } else {
                pageNo = pid.pageno() + 1;
            }

            while (true) {
                if (pageNo >= numPages()) {
                    // Ran out of pages
                    pid = null;
                    page = null;
                    pageIter = null;
                    
                    return;
                }

                // Load the page
                pid = new HeapPageId(tableid, pageNo);
                page = Database.getBufferPool().getPage(tid, pid,
                                                        Permissions.READ_ONLY);
                pageIter = page.iterator();

                
                if (pageIter.hasNext()) {
                    // Page has tuples; use it
                    return;
                } else {
                    // No tuples, keep trying.
                    pageNo++;
                }
            }
        }
    }
}

