package simpledb.unittest;

import simpledb.*;
import java.util.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import junit.framework.JUnit4TestAdapter;

public class JoinTest {

  int width1 = 2;
  int width2 = 3;
  DbIterator scan1;
  DbIterator scan2;
  DbIterator eqJoin;
  DbIterator gtJoin;

  /**
   * Initialize each unit test
   */
  @Before public void setUp() throws Exception {
    this.scan1 = Utility.createTupleList(width1, 
        new int[] { 1, 2,
                    3, 4,
                    5, 6,
                    7, 8 });
    this.scan2 = Utility.createTupleList(width2, 
        new int[] { 1, 2, 3,
                    2, 3, 4,
                    3, 4, 5,
                    4, 5, 6,
                    5, 6, 7 });
    this.eqJoin = Utility.createTupleList(width1 + width2, 
        new int[] { 1, 2, 1, 2, 3,
                    3, 4, 3, 4, 5,
                    5, 6, 5, 6, 7 });
    this.gtJoin = Utility.createTupleList(width1 + width2, 
        new int[] { 
                    3, 4, 1, 2, 3, // 1, 2 < 3
                    3, 4, 2, 3, 4,
                    5, 6, 1, 2, 3, // 1, 2, 3, 4 < 5
                    5, 6, 2, 3, 4,
                    5, 6, 3, 4, 5,
                    5, 6, 4, 5, 6,
                    7, 8, 1, 2, 3, // 1, 2, 3, 4, 5 < 7
                    7, 8, 2, 3, 4,
                    7, 8, 3, 4, 5,
                    7, 8, 4, 5, 6,
                    7, 8, 5, 6, 7 });
  }

  /**
   * Unit test for Join.getTupleDesc()
   */
  @Test public void getTupleDesc() {
    JoinPredicate pred = new JoinPredicate(0, Predicate.Op.EQUALS, 0);
    Join op = new Join(pred, scan1, scan2);
    TupleDesc expected = Utility.getTupleDesc(width1 + width2);
    TupleDesc actual = op.getTupleDesc();
    assertTrue(expected.equals(actual));
  }

  /**
   * Unit test for Join.rewind()
   */
  @Test public void rewind() throws Exception {
    JoinPredicate pred = new JoinPredicate(0, Predicate.Op.EQUALS, 0);
    Join op = new Join(pred, scan1, scan2);
    op.open();
    try {
      while (true)
        op.getNext();
    } catch (NoSuchElementException e) {
      // explicitly ignore this.
    }

    op.rewind();

    eqJoin.open();
    Tuple expected = eqJoin.getNext();
    Tuple actual = op.getNext();
    assertTrue(Utility.compareTuples(expected, actual));
  }

  /**
   * Unit test for Join.getNext() using a &gt; predicate
   */
  @Test public void gtJoin() throws Exception {
    JoinPredicate pred = new JoinPredicate(0, Predicate.Op.GREATER_THAN, 0);
    Join op = new Join(pred, scan1, scan2);
    op.open();
    gtJoin.open();
    Utility.matchAllTuples(gtJoin, op);
  }

  /**
   * Unit test for Join.getNext() using an = predicate
   */
  @Test public void eqJoin() throws Exception {
    JoinPredicate pred = new JoinPredicate(0, Predicate.Op.EQUALS, 0);
    Join op = new Join(pred, scan1, scan2);
    op.open();
    eqJoin.open();
    Utility.matchAllTuples(eqJoin, op);
  }

  /**
   * JUnit suite target
   */
  public static junit.framework.Test suite() {
    return new JUnit4TestAdapter(JoinTest.class);
  }
}

