package simpledb;

import java.lang.*;
import java.util.*;

/**
 * Lock manager. This enforces locking amongst transactions, and
 * creates a DeadlockDetector to detect when transactions are
 * deadlocked.
 *
 * @author <a href="mailto:drkp@mit.edu">Dan R. K. Ports</a>
 */
public class LockManager {
    private static final boolean DEBUG = false;
    
    private class LockStatus {
        public TransactionId exclusiveHolder;
        public Set<TransactionId> sharedHolders;

        public LockStatus() {
            exclusiveHolder = null;
            sharedHolders = new HashSet<TransactionId>();
        }

        public synchronized void acquireExclusive(TransactionId tid)
            throws TransactionAbortedException {
            if (exclusiveHolder == tid) {
                return;
            }

            long t = System.currentTimeMillis();

            while (true) {
                if (((exclusiveHolder == null) &&
                     ((sharedHolders.isEmpty()) ||
                      ((sharedHolders.size() == 1) &&
                       (sharedHolders.contains(tid)))))) {
                    break;
                }


                try {
                    wait();
                } catch (InterruptedException e) {
                    if (deadlockDetector.isTransactionAborted(tid)) {
                        throw new TransactionAbortedException();
                    }
                }
                
            }
            
            sharedHolders.remove(tid);
            exclusiveHolder = tid;
        }

        public synchronized void acquireShared(TransactionId tid)
            throws TransactionAbortedException {
            if ((exclusiveHolder == tid) || (sharedHolders.contains(tid))) {
                return;
            }

            while (!(exclusiveHolder == null)) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    if (deadlockDetector.isTransactionAborted(tid)) {
                        throw new TransactionAbortedException();
                    }    
                }
            }

            sharedHolders.add(tid);
        }

        public synchronized void release(TransactionId tid) {
            if (exclusiveHolder == tid) {
                exclusiveHolder = null;
            }
            sharedHolders.remove(tid);

            notifyAll();
        }
    }

    private Map<PageId, LockStatus> locks;
    private DeadlockDetector deadlockDetector;
    
    /**
     * Creates a new <code>LockManager</code> instance.
     */
    public LockManager() {
        locks = new HashMap<PageId, LockStatus>();
        deadlockDetector = new TimeoutDeadlockDetector(this, 1000, 50);
        // deadlockDetector = new WaitsForDeadlockDetector(this);
    }

    public void acquireShared(PageId pid, TransactionId tid)
        throws TransactionAbortedException {

        if ((tid == exclusiveHolder(pid))
            || (sharedHolders(pid).contains(tid))) {
            return;
        }

        LockStatus status = getLock(pid);
        if (DEBUG) System.err.println("Transaction " + tid + " acquiring shared lock on " + pid);
        deadlockDetector.waitingForLock(pid, tid, false);
        status.acquireShared(tid);
        deadlockDetector.lockAcquired(pid, tid);
        if (DEBUG) System.err.println("Transaction " + tid + " acquired shared lock on " + pid);
        
    }

    public void acquireExclusive(PageId pid, TransactionId tid)
        throws TransactionAbortedException {

        if (tid == exclusiveHolder(pid)) {
                return;
        }

        LockStatus status = getLock(pid);
        if (DEBUG) System.err.println("Transaction " + tid + " acquiring exclusive lock on " + pid);
        deadlockDetector.waitingForLock(pid, tid, true);
        status.acquireExclusive(tid);
        deadlockDetector.lockAcquired(pid, tid);
        if (DEBUG) System.err.println("Transaction " + tid + " acquired exclusive lock on " + pid);
    }

    public void release(PageId pid, TransactionId tid) {
        LockStatus status = getLock(pid);
        if (DEBUG) System.err.println("Transaction " + tid + " releasing lock on " + pid);
        status.release(tid);
    }

    public void releaseAll(TransactionId tid) {
        for (LockStatus l : locks.values()) {
            l.release(tid);
        }
    }

    public void transactionComplete(TransactionId tid,
                                     boolean commit) {
        if (DEBUG) System.err.println("Transaction complete: " + tid + " " + commit);
        // Release locks
        releaseAll(tid);

        // Notify deadlock detector
        deadlockDetector.transactionComplete(tid, commit);
    }

    /**
     * Returns true if tid holds any lock on tid.
     */
    public synchronized boolean holdsLock(PageId pid, TransactionId tid) {
        LockStatus status = getLock(pid);
        if ((status.exclusiveHolder == tid) ||
            (status.sharedHolders.contains(tid))) {
            return true;
        } else {
            return false;
        }
    }

    /**
     * Returns the exclusive holder of pid, if any.
     */
    public synchronized TransactionId exclusiveHolder(PageId pid) {
        LockStatus status = getLock(pid);
        return status.exclusiveHolder;
    }

    /**
     * Returns the set of shared holders of pid, if any.
     */
    public synchronized Set<TransactionId> sharedHolders(PageId pid) {
        LockStatus status = getLock(pid);
        return Collections.unmodifiableSet(status.sharedHolders);
    }

    private synchronized LockStatus getLock(PageId pid) {
        LockStatus status = locks.get(pid);

        if (status == null) {
            status = new LockStatus();
            locks.put(pid, status);
        }

        return status;
    }
}
