package simpledb;

import java.util.*;
import java.io.*;
import java.text.ParseException;

/**
 * HeapPage stores pages of HeapFiles and implements the Page interface that
 * is used by BufferPool.
 *
 * @see HeapFile
 * @see BufferPool
 */
public class HeapPage implements Page {

    HeapPageId pid;
    TupleDesc td;
    DataInputStream dis;
    int header[];
    Tuple tuples[];
    int numSlots;   
   
    boolean dirty = false;
    TransactionId dirtier = null;    
   
    /**
     * Constructor.
     * Construct the HeapPage from a set of bytes of data read from disk.  
     * The format of a HeapPage is a set of 32-bit header words indicating 
     * the slots of the page that are in use, plus (BufferPool.PAGE_SIZE/tuple 
     * size) tuple slots, where tuple size is the size of tuples in this 
     * database table
     * (which can be determined via a call to getTupleDesc in Catalog.)
     *
     * The number of 32-bitheader words is equal to:
     * <p>
     * (no. tuple slots / 32) + 1
     * <p>
     * @see Database#getCatalog
     * @see Catalog#getTupleDesc
     * @see BufferPool#PAGE_SIZE
     */
    public HeapPage(HeapPageId id, byte[] data) throws IOException {
	this.pid = id;
	this.td = Database.getCatalog().getTupleDesc(id.tableid());
	this.numSlots = BufferPool.PAGE_SIZE / td.getSize();
	this.dis = new DataInputStream(new ByteArrayInputStream(data));    
	
	// allocate and read the header slots of this page
	header = new int[(numSlots/32)+1];
	for (int i=0; i<header.length; i++)
	    header[i] = dis.readInt();
	
	try{
	    // allocate and read the actual records of this page
	    tuples = new Tuple[numSlots];
	    for (int i=0; i<numSlots; i++)
		tuples[i] = readNextTuple(i);
	}catch(NoSuchElementException e){
	    e.printStackTrace();
	}
	
    }

    /**
     * Alternate constructor. Create an empty HeapPage.
     */
    public HeapPage(HeapPageId id) throws IOException {
	pid = id;
	td = Database.getCatalog().getTupleDesc(id.tableid());
	numSlots = BufferPool.PAGE_SIZE / td.getSize();

	header = new int[(numSlots/32)+1];
        for (int i = 0; i < header.length; i++) {
            header[i] = 0;
        }

        tuples = new Tuple[numSlots];
        for (int i = 0; i < numSlots; i++) {
            tuples[i] = null;
        }
    }
    
    /** Return a view of this page before it was modified 
	-- used by recovery */
    public HeapPage getBeforeImage(){
	// some code goes here
	// only necessary for lab4
	return null;
    }
    
    /**
     * @return the PageId associated to this page.
     */
    public HeapPageId id() {	
	return pid;
    }
    
    /** 
     * Suck up tuples from the source file.
     */
    private Tuple readNextTuple(int slotId) throws NoSuchElementException {
	
	// if associated bit is not set, read forward to the next tuple, and
	// return null.
	if (!getSlot(slotId)) {
	    for (int i=0; i<td.getSize(); i++) {
		try {
		    dis.readByte();
		} catch (IOException e) {
		    throw new NoSuchElementException("error reading empty tuple");
		}
	    }
	    return null;
	}
	
	// read fields in the tuple
	Tuple t = new Tuple(td);
	RecordID rid = new RecordID(pid, slotId);
	t.setRecordID(rid);
	try {
	    for (int j=0; j<td.numFields(); j++) {
		Field f = td.getType(j).parse(dis);
		t.setField(j, f);
	    }	
	} catch (java.text.ParseException e) {
	    e.printStackTrace();
	    throw new NoSuchElementException("parsing error!");
	}	
	
	return t;
    }
    
    /** Return the size of the byte array returned by getPageData() */
    public int getPageSize() {
	return ((numSlots/32)+1) * 4 + BufferPool.PAGE_SIZE;
    }
    
    /**
     * Generates a byte array representing the contents of this page.
     * Used to serialize this page to disk.
     * <p>
     * The invariant here is that it should be possible to pass the byte 
     * array generated by getPageData to the HeapPage constructor and 
     * have it produce anx identical HeapPage object.
     *
     * @see #HeapPage
     * @return A byte array correspond to the bytes of this page.
     */
    public byte[] getPageData() {
	int len = header.length*4 + BufferPool.PAGE_SIZE;
	ByteArrayOutputStream baos = new ByteArrayOutputStream(len);
	DataOutputStream dos = new DataOutputStream(baos);
	
	// create the header of the page
	for (int i=0; i<header.length; i++) {
	    try {
		dos.writeInt(header[i]);
	    } catch (IOException e) {
		// this really shouldn't happen
		e.printStackTrace();
	    }
	}
	
	// create the tuples
	for (int i=0; i<numSlots; i++) {
	    
	    // empty slot
	    if (!getSlot(i)) {
		for (int j=0; j<td.getSize(); j++) {
		    try {
			dos.writeByte(0);
		    } catch (IOException e) {
			e.printStackTrace();
		    }
		}
		continue;
	    }
	    
	    // non-empty slot
	    for (int j=0; j<td.numFields(); j++) {
		Field f = tuples[i].getField(j);
		try {
		    f.serialize(dos);
		} catch (IOException e) {
		    e.printStackTrace();
		}
	    }
	}
	
	// padding
	int zerolen = BufferPool.PAGE_SIZE - numSlots * td.getSize();
	byte[] zeroes = new byte[zerolen];
	try {
	    dos.write(zeroes, 0, zerolen);
	} catch (IOException e) {
	    e.printStackTrace();
	}
	
	try {
	    dos.flush();
	} catch (IOException e) {
	    e.printStackTrace();
	}
	
	return baos.toByteArray();
    }
    
    /**
     * Static method to generate a byte array corresponding to an empty
     * HeapPage.  
     * Used to add new, empty pages to the file. Passing the results of 
     * this method to the HeapPage constructor will create a HeapPage with 
     * no valid tuples in it.
     *     
     * @param tableid The id of the table that this empty page will belong to.
     * @return The returned ByteArray.
     */
    public static byte[] createEmptyPageData(int tableid) {
	TupleDesc td = Database.getCatalog().getTupleDesc(tableid);
	int hb = (((BufferPool.PAGE_SIZE / td.getSize()) / 32) +1) * 4;
	int len = BufferPool.PAGE_SIZE + hb;
	return new byte[len]; //all 0
    }
    
    /**
     * Delete the specified tuple from the page.
     * @throws DbException if this tuple is not on this page, or tuple slot is
     *         already empty.
     * @param t The tuple to delete
     */
    public void deleteTuple(Tuple t) throws DbException {
        for (int i = 0; i < numSlots; i++) {
            if (getSlot(i) && tuples[i] == t) {
                setSlot(i, false);
                tuples[i] = null;
                return;
            }
        }

        // Didn't find the tuple
        throw new DbException("Tuple not found for deletion");
    }
    
    /**
     * Adds the specified tuple to the page.
     * @throws DbException if the page is full (no empty slots) or tupledesc 
     *         is mismatch.
     * @param t The tuple to add.
     */
    public void addTuple(Tuple t) throws DbException {
        // Check TupleDesc match.
        if (!td.equals(t.getTupleDesc())) {
            throw new DbException("TupleDesc mismatch");
        }

        // Try to find an empty slot
        int slot = -1;
        for (int i = 0; i < numSlots; i++) {
            if (!getSlot(i)) {
                slot = i;
                break;
            }
        }

        // Did we succeed?
        if (slot == -1) {
            // No empty slots!
            throw new DbException("Page is full");
        }

        setTuple(slot, t);
    }
    
    /**
     * Marks this page as dirty/not dirty and record that transaction 
     * that did the dirtying
     */
    public void markDirty(boolean dirty, TransactionId tid) {	
	//Debug.println("HeapPage.markDirty: " + pid.tableid() + ":" + pid.pageno() + " dirty = " + dirty, 1);
	this.dirty = dirty;
	if (dirty) this.dirtier = tid;
    }
    
    /**
     * Returns the tid of the transaction that last dirtied this page, or null if the page is not dirty
     */
    public TransactionId isDirty() {
	if (this.dirty)
	    return this.dirtier;
	else
	    return null;
    }
    
    /**
     * Returns the number of emtpy slots on this page.
     */
    public int getNumEmptySlots() {
	int cnt = 0;
	for(int i=0; i<numSlots; i++)
	    if(!getSlot(i))
		cnt++;
	return cnt;
    }
    
    /**
     * Returns true if associated slot on this page is filled.
     */
    public boolean getSlot(int i) {
	int headerbit = i % 32;
	int headerbyte = (i - headerbit) / 32;
	return (header[headerbyte] & (1 << headerbit)) != 0;
    }
    
    /**
     * Abstraction to fill a slot on this page.
     */
    private void setSlot(int i, boolean value) {
	int headerbit = i % 32;
	int headerbyte = (i - headerbit) / 32;
	
	Debug.println("HeapPage.setSlot: setting slot " + i + " to " + value, 1);
	if(value)
	    header[headerbyte] |= 1 << headerbit;
	else
	    header[headerbyte] &= (0xFFFFFFFF ^ (1 << headerbit));	
    }

    /**
     * Random access to the tuples in this page. Returns the tuple in
     * slot i, or null if the slot is empty.
     */
    public Tuple getTuple(int i) {
        if (getSlot(i)) {
            return tuples[i];
        } else {
            return null;
        }
    }

    /**
     * Random access modification function. Stores tuple t in slot i
     * of this page, possibly overwriting anything that's already
     * there.
     */
    public void setTuple(int i, Tuple t) {
        // Insert the tuple and mark the slot filled
        tuples[i] = t;
        setSlot(i, true);

        // Set the tuple's recordId
	RecordID rid = new RecordID(pid, i);
        t.setRecordID(rid);
    }
    
    /**
     * @return an iterator over all tuples on this page.
     */
    public Iterator iterator() {
	return new HeapPageIterator();
    }

    /**
     * Iterator over all tuples in this page.
     */
    private class HeapPageIterator implements Iterator {
        private int nextSlot;
        
        public HeapPageIterator() {
            nextSlot = findNext();
        }

        public boolean hasNext() {
            return (nextSlot != -1);
        }

        public Tuple next() {
            if (nextSlot == -1) {
                throw new NoSuchElementException();
            }

            Tuple t = tuples[nextSlot];
            nextSlot += 1;
            nextSlot = findNext();
            return t;
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

        private int findNext() {
            if (nextSlot == -1) {
                return -1;
            }

            for (int i = nextSlot; i < numSlots; i++) {
                if (getSlot(i)) {
                    return i;
                }
            }

            // no more slots
            return -1;
        }
    }
}

