// RPC stubs for clients to talk to lock_server, and cache the locks
// see lock_client.cache.h for protocol details.
 
#include "lock_client_cache.h"
#include "lock_server_cache.h"
#include "member.h"
#include "rpc.h"
#include <sstream>
#include <iostream>
#include <stdio.h>


/*
marshall& operator<<(marshall & b, member m) {
	b << m.get_client_id();
	b << m.get_addr();
	return b;
}

umarshall& operator>>(umarshall & b, member &m) {
	int cid;
	sockaddr_in addr;
	b >> cid; 
	b >> addr; 
	m = *(new member(cid, addr));
	return b;
	
}


marshall &
operator<<(marshall &m, std::list<member> v)
{
  m << (unsigned int) v.size();
  for(unsigned int i = 0; i < v.size(); i++)
    m << v.at(i);
  return m;
}
 
 unmarshall &
operator>>(unmarshall &u, std::list<member> &v)
{
  int n;
  u >> n;
  for(int i = 0; i < n; i++){
    member z;
    u >> z;
    v.push_back(z);
  }
  return u;
}

*/




static void *
releasethread(void *x)
{
    lock_client_cache *cc = (lock_client_cache *) x;
    cc->releaser();
    return 0;
}

int lock_client_cache::last_port = 0;



lock_client_cache::lock_client_cache(sockaddr_in xdst)
    : lock_client(xdst)
{
    
	//printf("creating the lock client cache...");
    //generate port
    srand(time(NULL)^last_port);
    rlock_port = ((rand()%32000) | (0x1 << 10));
    char hname[100];
    assert(gethostname(hname, 100) == 0);
    std::ostringstream host;
    host << hname << ":" << rlock_port;
    hostname = host.str();
    last_port = rlock_port;
    et = NULL;
 
    //make new rpc server
    rpcs *rlsrpc = new rpcs(htons(rlock_port));
    
    //generate a random client id
    cid = random() >> 16;
    
   ch = new consistent_hash();
   ls = new lock_server_cache(rlsrpc, ch, dst, hostname, cid);
	
                               
    /* register RPC handlers with rlsrpc */
    rlsrpc->reg(cached_lock_protocol::revoke, this, &lock_client_cache::revoke);
    rlsrpc->reg(cached_lock_protocol::grant, this, &lock_client_cache::grant);
 
    rlsrpc->reg(metadata_protocol::join_member, this, &lock_client_cache::join_member);
    rlsrpc->reg(metadata_protocol::leave_member, this, &lock_client_cache::remove_member);
    rlsrpc->reg(metadata_protocol::nr_locks, this, &lock_client_cache::nr_locks);
    rlsrpc->reg(extent_protocol::get, this, &lock_client_cache::get);
    rlsrpc->reg(extent_protocol::getattr, this, &lock_client_cache::getattr);
   
    
   
    
    //initialize locks for various data structures
    int r = pthread_mutex_init(&lock_cache_mutex,0);
    assert(r == 0);

    r = pthread_mutex_init(&release_mutex,0);
    assert(r == 0);

    r = pthread_cond_init(&release_cond,0);
    assert(r == 0);

    server_rpc_lst = new rpc_lst();
  

   
    pthread_t th;
    r = pthread_create(&th, NULL, &releasethread, (void *) this);
    assert (r == 0);

    //join the network
    join();

//	printf("CREATED lock client cache id %d \n", cid);
  
}


lock_client_cache::lock_client_cache(sockaddr_in xdst, int cid)
    : lock_client(xdst)
{
    
	//printf("creating the lock client cache...");
    //generate port
    srand(time(NULL)^last_port);
    rlock_port = ((rand()%32000) | (0x1 << 10));
    char hname[100];
    assert(gethostname(hname, 100) == 0);
    std::ostringstream host;
    host << hname << ":" << rlock_port;
    hostname = host.str();
    last_port = rlock_port;
    et = NULL;
 
    //make new rpc server
    rpcs *rlsrpc = new rpcs(htons(rlock_port));
    
    //generate a random client id
    this->cid = cid;
    
   ch = new consistent_hash();
   ls = new lock_server_cache(rlsrpc, ch, dst, hostname, cid);
	
                               
    /* register RPC handlers with rlsrpc */
    rlsrpc->reg(cached_lock_protocol::revoke, this, &lock_client_cache::revoke);
    rlsrpc->reg(cached_lock_protocol::grant, this, &lock_client_cache::grant);
 
    rlsrpc->reg(metadata_protocol::join_member, this, &lock_client_cache::join_member);
    rlsrpc->reg(metadata_protocol::leave_member, this, &lock_client_cache::remove_member);
    rlsrpc->reg(metadata_protocol::nr_locks, this, &lock_client_cache::nr_locks);
    rlsrpc->reg(extent_protocol::get, this, &lock_client_cache::get);
    rlsrpc->reg(extent_protocol::getattr, this, &lock_client_cache::getattr);
   
    
   
    
    //initialize locks for various data structures
    int r = pthread_mutex_init(&lock_cache_mutex,0);
    assert(r == 0);

    r = pthread_mutex_init(&release_mutex,0);
    assert(r == 0);

    r = pthread_cond_init(&release_cond,0);
    assert(r == 0);

    server_rpc_lst = new rpc_lst();
  

   
    pthread_t th;
    r = pthread_create(&th, NULL, &releasethread, (void *) this);
    assert (r == 0);

    //join the network
    join();

//	printf("CREATED lock client cache id %d \n", cid);
  
}

void
lock_client_cache::releaser()
{
    // This method should be a continuous loop, waiting to be notified of
    // freed locks that have been revoked by the server, so that it can
    // send a release RPC.
    std::string lockname;
    struct cached_lock *l;
    member lock_manager;
    int r, ret;
    std::list<member> member_lst;
    
    while(1) {
        pthread_mutex_lock(&release_mutex);
        while(release_queue.empty()) {
            pthread_cond_wait(&release_cond,&release_mutex);
        } 
        lockname = release_queue.front();
        release_queue.pop();
        pthread_mutex_unlock(&release_mutex);
        //printf("[%s] Found lock %s\n",hostname.c_str(),lockname.c_str());
        l = lock_cache[lockname];
        
        if (l->state == cached_lock::RELEASING) {
       //     printf("[%s] make release call for lock %s\n",hostname.c_str(),lockname.c_str());
            
            //TODO fix client calls
            lock_manager = ch->lookup(lockname);
            ret = cl.call(lock_manager.get_addr(), cached_lock_protocol::release, 
                          hostname, get_new_id(), lockname, r, rpcc::to(1000));
            int r;
            while((ret == -1)||(ret == lock_protocol::NOTMANAGER)) {
                ret = cl.call(dst, metadata_protocol::update, member_lst);
                //call metadataserver
                if (ret == lock_protocol::OK) {
                    ch->set_membership(member_lst);
                    lock_manager = ch->lookup(lockname);
                    ret = cl.call(lock_manager.get_addr(), cached_lock_protocol::release, 
                                  hostname, get_new_id(), lockname, r, rpcc::to(1000));
                } else {
                    ret = -1;
                }
            }
            
            pthread_mutex_lock(&(l->lock_mutex));
            l->state = cached_lock::NONE;
            l->last_owner = "dummy";
            pthread_cond_broadcast(&(l->lock_cond));
            pthread_mutex_unlock(&(l->lock_mutex));
        }
    }
    
}

lock_protocol::status
lock_client_cache::acquire(std::string name) {
    std::string ret;
    lock_protocol::status r = acquire(name, ret);
   

    cached_lock * l = lock_cache[name];

    assert( pthread_mutex_lock(&(l->lock_mutex)) == 0);
  //  if (hostname.compare(ret) == 1) {printf("[%s] acquired lock %s, last owner is %s \n", hostname.c_str(), name.c_str(), ret.c_str()); }
    if (et != NULL) {
	if (ret.compare(hostname) != 0) { //if I am the one creating the lock then no need to get the extent
		if (ret.compare(hostname) != 0) {
			if (ret.size() !=0 ) {printf("[%s] Get extent for lock %s from %s. \n \n", hostname.c_str(),name.c_str(), ret.c_str()); } else {
			printf("[%s] Get extent for lock %s from extent server. \n \n", hostname.c_str(),name.c_str());
			}
		}
    	 	 et->getextent(ret, name);

	} else {
		//printf("received my lock so no extent fetching\n");
	}
    }
     l->last_owner = hostname;
     assert (pthread_mutex_unlock(&(l->lock_mutex)) == 0);
    return r;
}


lock_protocol::status
lock_client_cache::acquire(std::string name, std::string &last_owner)
{
    //printf("[%s] lock client acquire lock: %s\n",hostname.c_str(),name.c_str());
    int ret, r;
    cached_lock *l;
    struct timeval now;
    struct timespec next_timeout;
    member lock_manager;
    std::list<member> member_lst;


    pthread_mutex_lock(&lock_cache_mutex);
    if (lock_cache.count(name) == 0){
        //get from server
        l = new cached_lock();
        r = pthread_cond_init(&(l->lock_cond),0);
        assert(r == 0);
        r = pthread_mutex_init(&(l->lock_mutex),0);
        assert(r == 0);
        l->state = cached_lock::NONE;
        lock_cache[name] = l;
	
    } else {
        l = lock_cache[name];
    }
    
    // lock so some other thread can't release the lock
    pthread_mutex_lock(&(l->lock_mutex));
    pthread_mutex_unlock(&lock_cache_mutex);
    //printf("%s lock state %d\n",hostname.c_str(),l->state);
    while(l->state != cached_lock::FREE) {
        if (l->state == cached_lock::NONE) {
            //printf("%s make rpc to acquire from server\n",hostname.c_str());
            
            //TODO: add consistent hashing
            lock_manager = ch->lookup(name);
           
            ret = cl.call(lock_manager.get_addr(), cached_lock_protocol::acquire, hostname, get_new_id(), name, last_owner);
           
            while((ret == -1)||(ret == lock_protocol::NOTMANAGER)) {
                ret = cl.call(dst, metadata_protocol::update, member_lst);
                //call metadataserver
                if (ret == lock_protocol::OK) {
                    ch->set_membership(member_lst);
                    lock_manager = ch->lookup(name);
                    ret = cl.call(lock_manager.get_addr(), cached_lock_protocol::acquire, hostname, get_new_id(), name, last_owner, rpcc::to(1000));
                } else {
                    ret = -1;
                }
            }
            
            
            if (ret == lock_protocol::OK) {
                //printf("%s got lock\n",hostname.c_str());
                l->state = cached_lock::LOCKED;
                l->last_owner = hostname;
                pthread_mutex_unlock(&(l->lock_mutex));
                return lock_protocol::OK;
            }
	
        }
        //printf("%s acquire waiting for lock: %s %d\n",hostname.c_str(),name.c_str(),l->state);
        gettimeofday(&now, NULL);
        next_timeout.tv_sec = now.tv_sec + 2;
        next_timeout.tv_nsec = 0;
        pthread_cond_timedwait(&(l->lock_cond),&(l->lock_mutex),&next_timeout);                    
    }

    l->state = cached_lock::LOCKED;
    last_owner = l->last_owner; 
    l->last_owner = hostname;
    pthread_mutex_unlock(&(l->lock_mutex));
  //printf("%s acquire got lock: %s\n",hostname.c_str(),name.c_str());
    return lock_protocol::OK;
}

lock_protocol::status
lock_client_cache::release(std::string name)
{
   // printf("[%s] lock client release lock: %s\n",hostname.c_str(),name.c_str());
    if (lock_cache.count(name) > 0) {
        
        cached_lock *l = lock_cache[name];
        pthread_mutex_lock(&(l->lock_mutex));
    //    printf("[%s] lock state %d\n",hostname.c_str(),l->state);
    
        if (l->state == cached_lock::RELEASING) {
   //         printf("[%s] Release lock %s to server\n",hostname.c_str(),name.c_str());
            pthread_mutex_unlock(&(l->lock_mutex));
            //place in the releasing queue
            pthread_mutex_lock(&release_mutex);
   //         printf("[%s] putting the release in the release queue\n",hostname.c_str());
            release_queue.push(name);
            pthread_cond_signal(&release_cond);
            pthread_mutex_unlock(&release_mutex);            
            
        } else {
            //printf("%s releasing lock: %s\n",hostname.c_str(),name.c_str());
     //       printf("[%s] Free lock %s\n",hostname.c_str(),name.c_str());
            l->state = cached_lock::FREE;
	    l->last_owner = hostname;
            pthread_cond_broadcast(&(l->lock_cond));
            pthread_mutex_unlock(&(l->lock_mutex));
        }
        
    }
 //   printf("[%s] released lock %s\n",hostname.c_str(),name.c_str());
    return lock_protocol::OK;
}

lock_protocol::status
lock_client_cache::revoke(int rpc_id, std::string name, int &) {
    //printf("revoke rpc %d\n",rpc_id);
    lock_protocol::status st = lock_protocol::NOENT;
    if (server_rpc_lst->find(rpc_id,st)) {
        //printf("seen rpc %d already\n",rpc_id);
        return st;
    }

 //   printf("[%s] lock client revoke lock: %s\n", hostname.c_str(),name.c_str());
    if (lock_cache.count(name) > 0) {
        cached_lock *l = lock_cache[name];
        pthread_mutex_lock(&(l->lock_mutex));
        //printf("%s lock client revoke lock p1: %d\n", hostname.c_str(),l->state);
        if (l->state == cached_lock::NONE) {
            pthread_mutex_unlock(&(l->lock_mutex));
        } else {
            st = lock_protocol::RETRY;
            if (l->state == cached_lock::FREE) {
 //               printf("[%s] lock %s is free, so release\n",hostname.c_str(),name.c_str());
                l->state = cached_lock::RELEASING;
                pthread_mutex_unlock(&(l->lock_mutex));
                // put in release queue
                pthread_mutex_lock(&release_mutex);
                release_queue.push(name);
                pthread_cond_signal(&release_cond);
                pthread_mutex_unlock(&release_mutex);
                
            } else {
 //               printf("[%s] lock %s is not free, so set to releasing\n",hostname.c_str(),name.c_str());                
                l->state = cached_lock::RELEASING;
                pthread_mutex_unlock(&(l->lock_mutex));
            }
        }
    } 

    //printf("put rpc revoke %d revoke for lock %s \n", rpc_id, name.c_str());
    server_rpc_lst->put_result(rpc_id,st,"");
    return st;
    
}

lock_protocol::status
lock_client_cache::grant(int rpc_id, std::string name, std::string last_owner, int &) {
   //printf("[%s] received grant for lock %s last owner: %s \n",hostname.c_str(), name.c_str(), last_owner.c_str());
    lock_protocol::status st;
    if (server_rpc_lst->find(rpc_id,st)) {
     //   printf("SEEN rpc %d already\n",rpc_id);
	server_rpc_lst->print_list();
        return st;
    }

    //printf("%s lock client grant lock: %s\n", hostname.c_str(),name.c_str());
    if (lock_cache.count(name) > 0) {
        cached_lock *l = lock_cache[name];	
        pthread_mutex_lock(&(l->lock_mutex));
	l->last_owner = last_owner;
        if ((l->state != cached_lock::LOCKED) && (l->state != cached_lock::RELEASING))  {
            l->state = cached_lock::FREE;
	    l->last_owner = last_owner;
            pthread_cond_broadcast(&(l->lock_cond));
            
        }
        pthread_mutex_unlock(&(l->lock_mutex));
        
    }
    //printf("put rpc %d grant for lock %s \n", rpc_id, name.c_str());
    server_rpc_lst->put_result(rpc_id,lock_protocol::OK,"");
    return lock_protocol::OK;
}

int
lock_client_cache::get_new_id() {
    int ret;
    pthread_mutex_lock(&request_id_mutex);
    ret = request_id++;
    pthread_mutex_unlock(&request_id_mutex);
    return cid*1000 + ret;
}    

member lock_client_cache::myself() {
	//member me;
	member me(hostname, cid);
	return me;
}


void lock_client_cache::stats() {
	int r;
	cl.call(dst, metadata_protocol::stats, 0, r);
}


//JOIN AND LEAVE



void lock_client_cache::join() {
	
	std::list<member> membs;
	
	ls->freeze_state();
	
	assert(cl.call(dst, metadata_protocol::join, myself(), membs) == lock_protocol::OK);
	//printf("e23\n");
	ch->set_membership(membs);
//printf("e24\n");
	std::list<member>::iterator it = membs.begin();
    int r;
	//printf("e3\n");
	while (it != membs.end()) {
		if (it->get_client_id() != cid) {//do not send to yourself
			cl.call(it->get_addr(), metadata_protocol::join_member, myself(), r, rpcc::to(1000));
		}
		it++;
	}
	//printf("e4\n");
	if (ch->lookup_previous(myself()).get_client_id() != cid) {
		ls->get_metadata_state(ch->lookup_previous(myself()));
	}
	//printf("e5\n");
	ls->unfreeze_state();
	
}

void lock_client_cache::leave() {
	
	std::list<member> membs;
	
    ls->freeze_state();
	
	assert(cl.call(dst, metadata_protocol::leave, myself(), membs) == lock_protocol::OK);

	std::list<member>::iterator it = membs.begin();
    int r;
	while (it != membs.end()) {
		cl.call(it->get_addr(), metadata_protocol::leave_member, myself(), r, rpcc::to(1000));
		it++;
	}

	ls->yield_metadata_state();

	// flush_extent();
	
    std::map<std::string,cached_lock*>::iterator li;
    std::string lockname;
    member lock_manager;
    std::list<member> member_lst;
    int ret;
    for(li = lock_cache.begin(); li != lock_cache.end(); li++) {
        lockname = li->first;
        lock_manager = ch->lookup(lockname);
        ret = cl.call(lock_manager.get_addr(), cached_lock_protocol::release, 
                      hostname, get_new_id(), lockname, r, rpcc::to(1000));

        //not sure if this is necessary but just in case some other part of the system is changing
        while((ret == -1)||(ret == lock_protocol::NOTMANAGER)) {
            ret = cl.call(dst, metadata_protocol::update, member_lst);
            //call metadataserver
            if (ret == lock_protocol::OK) {
                ch->set_membership(member_lst);
                lock_manager = ch->lookup(lockname);
                ret = cl.call(lock_manager.get_addr(), cached_lock_protocol::release, 
                              hostname, get_new_id(), lockname, r, rpcc::to(1000));
            } else {
                ret = -1;
            }
        }
    }            
    
	ls->unfreeze_state(); 
	
}



metadata_protocol::status lock_client_cache::join_member(member m, int &) {
	ls->freeze_state();
	ch->add_member(m);
	ls->unfreeze_state();
	return metadata_protocol::OK;
}

metadata_protocol::status lock_client_cache::remove_member(member m, int &) {
	ls->freeze_state();
	ch->remove_member(m);
	ls->unfreeze_state();
	return metadata_protocol::OK;
}

//extents
extent_protocol::status lock_client_cache::get(extent_protocol::extentid_t eid, std::string & buf) {
	return et->get(eid, buf);
};

extent_protocol::status lock_client_cache::getattr(extent_protocol::extentid_t eid, extent_protocol::attr & buf) {
	return et->getattr(eid, buf);
};


