package simpledb;

import java.util.*;
import java.lang.*;

/**
 * Abstract implementation of the DeadlockDetector interface. This
 * implements the functions for tracking lock requests, adding them to
 * internal lists that concrete deadlock detectors can use, and adds
 * functions for aborting a transaction.
 *
 * @author <a href="mailto:drkp@mit.edu">Dan R. K. Ports</a>
 */
public abstract class AbstractDeadlockDetector
    implements DeadlockDetector {

    private static final boolean MURDEROUS = false;
    private static final boolean DEBUG = false;

    protected class LockRequest {
        public final PageId pid;
        public final TransactionId tid;
        public final boolean exclusive;
        public final long requestTime;

        public LockRequest(PageId pid, TransactionId tid,
                           boolean exclusive, long requestTime) {
            this.pid = pid;
            this.tid = tid;
            this.exclusive = exclusive;
            this.requestTime = requestTime;
        }
    }
    protected Map<PageId, List<LockRequest>> lockRequestsByPid;
    protected Map<TransactionId, List<LockRequest>>
        lockRequestsByTid;
    protected Map<TransactionId, Thread> transactionThreads;
    protected LockManager lockManager;
    protected Set<TransactionId> abortedTransactions;
    
    public AbstractDeadlockDetector(LockManager lm) {
        lockManager = lm;
        lockRequestsByPid = new HashMap();
        lockRequestsByTid = new HashMap();
        transactionThreads = new HashMap();
        abortedTransactions = new HashSet();
    }

    /**
     * Called when a thread starts waiting for a lock. Adds the lock
     * request to the request list
     */
    public synchronized void waitingForLock(PageId pid,
                                            TransactionId tid,
                                            boolean exclusive) {
        LockRequest req =
            new LockRequest(pid, tid, exclusive,
                            System.currentTimeMillis());
        List<LockRequest> reqs;

        transactionThreads.put(tid, Thread.currentThread());

        reqs = lockRequestsByPid.get(pid);
        if (reqs == null) {
            reqs = new ArrayList<LockRequest>();
            lockRequestsByPid.put(pid, reqs);
        }
        reqs.add(req);

        reqs = lockRequestsByTid.get(tid);
        if (reqs == null) {
            reqs = new ArrayList<LockRequest>();
            lockRequestsByTid.put(tid, reqs);
        }
        reqs.add(req);
    }

    /**
     * Called when a thread acquires a lock. Removes the request from
     * the request list.
     */
    public synchronized void lockAcquired(PageId pid,
                                          TransactionId tid) {
        removeLockRequest(pid, tid);
    }

    /**
     * Called when a transaction has been completed. Removes its
     * pending lock requests.
     */
    public synchronized void transactionComplete(TransactionId tid,
                                                 boolean commit) {
        List<LockRequest> reqs = new ArrayList(lockRequestsByTid.get(tid));
        for (LockRequest req : reqs) {
            removeLockRequest(req.pid, tid);
        }
        transactionThreads.remove(tid);
    }

    /**
     * Returns true if the transaction has been aborted.
     */
    public synchronized boolean isTransactionAborted(TransactionId tid) {
        return abortedTransactions.contains(tid);
    }

    /**
     * Break a deadlock. Called when a deadlock has been
     * identified. 'req' is one of the requests thatre deadlocking.
     * Some transaction will be aborted to resolve the deadlock.
     */ 
    protected synchronized void breakDeadlock(LockRequest req) {
        // Break deadlock by killing the thread that's waiting for the
        // lock.
        if (DEBUG) {
            System.err.println("Breaking deadlock on " + req.pid + " by aborting " + req.tid);
            System.err.println("Request is for exclusive: " + req.exclusive);
            System.err.println("exclusive holder: " + lockManager.exclusiveHolder(req.pid));
            for (TransactionId tid : lockManager.sharedHolders(req.pid)) {
                System.err.println("Shared holder: " + tid);
            }
        }


        if (MURDEROUS) {
            if (lockManager.exclusiveHolder(req.pid) != null) {
                abortTransaction(lockManager.exclusiveHolder(req.pid));
            }
            for (TransactionId tid : lockManager.sharedHolders(req.pid)) {
                if (tid != req.tid) {
                    abortTransaction(tid);
                }
            }
        } else {
            abortTransaction(req.tid);
        }
    }

    /**
     * Abort a transaction by marking the transaction as aborted and
     * interrupting its thread.
     */
    protected synchronized void abortTransaction(TransactionId tid) {
        abortedTransactions.add(tid);
        Thread t = transactionThreads.get(tid);
        t.interrupt();
    }

    private synchronized void removeLockRequest(PageId pid,
                                                TransactionId tid) {
        List<LockRequest> reqs = lockRequestsByPid.get(pid);

        for (Iterator<LockRequest> it = reqs.iterator();
             it.hasNext();) {
            LockRequest req = it.next();
            if ((req.pid == pid) &&
                (req.tid == tid)) {
                it.remove();
            }
        }
        
        reqs = lockRequestsByTid.get(tid);
        for (Iterator<LockRequest> it = reqs.iterator();
             it.hasNext();) {
            LockRequest req = it.next();
            if ((req.pid == pid) &&
                (req.tid == tid)) {
                it.remove();
            }
        }
    }
}
