// the caching lock server implementation

#include "lock_server_cache.h"
#include <sstream>
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>

// The server runs two threads that periodically sends grant and
// revoke requests to handle lost grants and revokes.

static void *
revokethread(void *x)
{
    lock_server_cache *sc = (lock_server_cache *) x;
    sc->revoker();
    return 0;
}

static void *
grantthread(void *x)
{
    lock_server_cache *sc = (lock_server_cache *) x;
    sc->granter();
    return 0;
}



lock_server_cache::lock_server_cache(rpcs *_rsms, consistent_hash *_ch, sockaddr_in _dst, std::string _hostname, int _cid) 
    : lock_server() 
{

    //printf("creating lock server cache.. ");

    rs = _rsms;
    ch = _ch;
    dst = _dst;
    hostname = _hostname;
    cid = _cid;

    int r;
    
    rs->reg(cached_lock_protocol::acquire, this, &lock_server_cache::acquire);
    rs->reg(cached_lock_protocol::release, this, &lock_server_cache::release);
    rs->reg(cached_lock_protocol::divide_state, this, &lock_server_cache::divide_metadata_state);
    rs->reg(cached_lock_protocol::receive_state, this, &lock_server_cache::receive_metadata_state);
   

    rpc_ticker = 0;
    printf("%d\n",this->rpc_ticker);
    r = pthread_mutex_init(&rpc_ticker_mutex,0);
    assert (r == 0);
    r = pthread_mutex_init(&revoke_mutex,0);
    assert (r == 0);
    r = pthread_cond_init(&revoke_cond,0);
    assert (r == 0);
    r = pthread_mutex_init(&grant_mutex,0);
    assert (r == 0);
    r = pthread_cond_init(&grant_cond,0);
    assert (r == 0);
        
      
    r = pthread_mutex_init(&locks_mutex,NULL);
    assert(r == 0);
    
    //set up stuff to make calls to the client
    pthread_t th;
    r = pthread_create(&th, NULL, &revokethread, (void *) this);
    assert (r == 0);
    r = pthread_create(&th, NULL, &grantthread, (void *) this);
    assert (r == 0); 
       
	//printf("CREATED server! \n");
}

lock_server_cache::lock_server_cache() 
    : lock_server() 
{

    printf("using NULL constructor!\n");

    //rsms = 0;
    
    int r;
    cid = 0;
    ch = NULL;

    
    rpc_ticker = 0;
    printf("%d\n",this->rpc_ticker);
    r = pthread_mutex_init(&rpc_ticker_mutex,0);
    assert (r == 0);
    r = pthread_mutex_init(&revoke_mutex,0);
    assert (r == 0);
    r = pthread_cond_init(&revoke_cond,0);
    assert (r == 0);
    r = pthread_mutex_init(&grant_mutex,0);
    assert (r == 0);
    r = pthread_cond_init(&grant_cond,0);
    assert (r == 0);
        
      
    r = pthread_mutex_init(&locks_mutex,0);
    assert(r == 0);
    
    //set up stuff to make calls to the client
    pthread_t th;
    r = pthread_create(&th, NULL, &revokethread, (void *) this);
    assert (r == 0);
    r = pthread_create(&th, NULL, &grantthread, (void *) this);
    assert (r == 0); 
       
}
void
lock_server_cache::revoker()
{
   // This method should be a continuous loop, that sends revoke
    // messages to lock holders whenever another client wants the
    // same lock
    std::string lockname, owner;
    struct cached_lock *l;
    struct sockaddr_in dst;
   
    int ret, r;
    
    while(1) {
        //wait until there is a request
        pthread_mutex_lock(&revoke_mutex);        
        while(revokes.empty()) {
            pthread_cond_wait(&revoke_cond,&revoke_mutex);
        }
        
        lockname = revokes.front();
        revokes.pop();
        pthread_mutex_unlock(&revoke_mutex);
        
        l = locks[lockname];
        pthread_mutex_lock(&(l->lock_mutex));
        if ((l->taken) && (!(l->requesters.empty())))
            owner = l->owner;
         // assert(l->last_owner.compare(owner) == 0);
	pthread_mutex_unlock(&(l->lock_mutex));
        
        make_sockaddr(owner.c_str(),&dst);
        
      
        if (hostname.compare(owner) == 1) { printf("[%s] Revoke to %s\n \n", hostname.c_str(), owner.c_str()); }
        ret = cl.call(dst, cached_lock_protocol::revoke, get_new_id(), lockname, r);
        

        //if someone is still waiting on the lock, put it back in the
        //queue
        if ((l->taken) && (!(l->requesters.empty()))) {
            pthread_mutex_lock(&revoke_mutex);
            revokes.push(lockname);
            pthread_mutex_unlock(&revoke_mutex);
            sleep(1);
        }
    }
}

void
lock_server_cache::granter()
{

    // This method should be a continuous loop, waiting for locks
    // to be released and then sending grant messages to those who
    // are waiting for it.

    std::string lockname, owner, last_owner;
    cached_lock *l;
    struct sockaddr_in dst;
    
    int ret, r;
    
    while(1) {
        pthread_mutex_lock(&grant_mutex);
        while (grants.empty()) {
		
            pthread_cond_wait(&grant_cond,&grant_mutex);
		
        }
        lockname = grants.front();
        grants.pop();
        pthread_mutex_unlock(&grant_mutex);
        
        l = locks[lockname];
	assert(pthread_mutex_lock(&l->lock_mutex) == 0);
        assert(l->taken);
        owner = l->owner;
        last_owner = l->last_owner;
	l->last_owner = owner;
        make_sockaddr(owner.c_str(),&dst);      
       
        if (hostname.compare(owner)!= 0) {printf("[%s] Grant to %s ; last owner is %s \n \n",hostname.c_str(), owner.c_str(), last_owner.c_str()); }     
	assert(pthread_mutex_unlock(&l->lock_mutex) == 0);   

        ret = cl.call(dst, cached_lock_protocol::grant, get_new_id(), lockname, last_owner,  r, rpcc::to(1000));
        
    }                
}

lock_protocol::status
lock_server_cache::acquire(std::string hostname, int rpc_id, std::string lockname, std::string &r) 
{
    //printf("[%s] lock server acquire lock: %s for %s\n",this->hostname.c_str(),lockname.c_str(),hostname.c_str());
    r = "";
    pthread_mutex_lock(&rpc_lst_mutex);
    rpc_lst *rl;
    if(hostname_rpc_lst.count(hostname) > 0) {
        rl = hostname_rpc_lst[hostname];
        pthread_mutex_unlock(&rpc_lst_mutex);
        lock_protocol::status st;
        if (rl->find(rpc_id, st)){
	    rl->find(rpc_id,r);
	  // printf("[%s] rpc resolved already, last owner was %s \n",hostname.c_str(), r.c_str());
            return st;
        }
    } else {
        rl = new rpc_lst();
        hostname_rpc_lst[hostname] = rl;
	
        pthread_mutex_unlock(&rpc_lst_mutex);
    }
    
    struct cached_lock *l;
    pthread_mutex_lock(&locks_mutex);
    if(locks.count(lockname) > 0) {
        l = locks[lockname];
        pthread_mutex_lock(&(l->lock_mutex));
        pthread_mutex_unlock(&locks_mutex);
        if (l->taken) {
            if (l->owner != hostname) {
                for (unsigned i = 0; i < l->requesters.size(); i++) {
                    // if this host is already waiting on lock, then just return
                    if (l->requesters[i] == hostname) {
                        pthread_mutex_unlock(&(l->lock_mutex));
                        rl->put_result(rpc_id, lock_protocol::RETRY,"");
                        return lock_protocol::RETRY;
                    }
                }
              //  printf("Adding host %s to list of requesters for lock %s\n",hostname.c_str(),lockname.c_str());
                l->requesters.push_back(hostname);            
                pthread_mutex_unlock(&(l->lock_mutex));
                
                pthread_mutex_lock(&revoke_mutex);
                revokes.push(lockname);
                pthread_cond_signal(&revoke_cond);
                pthread_mutex_unlock(&revoke_mutex);
                           
                rl->put_result(rpc_id, lock_protocol::RETRY,"");
                return lock_protocol::RETRY;
            } else {
		//printf("client requesting lock that it has\n");
		r = hostname;
                pthread_mutex_unlock(&(l->lock_mutex));
                rl->put_result(rpc_id, lock_protocol::OK, hostname);
		
                return lock_protocol::OK;
            }
        } else {
            //printf("granting lock %s to host %s\n",lockname.c_str(),hostname.c_str());
            r = l->last_owner;
	    l->last_owner = hostname; 
            l->taken = true;
            l->owner = hostname;
            l->times_acquired++;
	    locks[lockname] = l;  //+
            pthread_mutex_unlock(&(l->lock_mutex));
            rl->put_result(rpc_id, lock_protocol::OK, r);
	  
            return lock_protocol::OK;
        }
    } else {
        //new lock
        l = new cached_lock(hostname);
	if (lockname.compare("1") != 0) {
		//not root, thus the requester of the lock is the owner of the data--> new data cause it is creating it
		l->last_owner = hostname;
	}
        r = l->last_owner;
	l->last_owner = hostname;
	locks[lockname] = l;
        pthread_mutex_unlock(&locks_mutex);
        rl->put_result(rpc_id, lock_protocol::OK, r);
        //printf("granting new lock %s to host %s\n",lockname.c_str(),hostname.c_str());
        return lock_protocol::OK;
    }
}
            
lock_protocol::status
lock_server_cache::release(std::string hostname, int rpc_id, std::string lockname, int &)
{
    //printf("lock server release lock: %s for %s\n",lockname.c_str(),hostname.c_str());
    //check if the rpc has been received already
    pthread_mutex_lock(&rpc_lst_mutex);
    rpc_lst *rl;
    if(hostname_rpc_lst.count(hostname) > 0) {
        rl = hostname_rpc_lst[hostname];
      
        lock_protocol::status st;
        if (rl->find(rpc_id, st)) {
            pthread_mutex_unlock(&rpc_lst_mutex);
            return st;
        }
	 pthread_mutex_unlock(&rpc_lst_mutex);
    } else {
        rl = new rpc_lst();
        hostname_rpc_lst[hostname] = rl;
        pthread_mutex_unlock(&rpc_lst_mutex);
    }
   
    if (locks.count(lockname) > 0) {
        struct cached_lock *l = locks[lockname];
        pthread_mutex_lock(&(l->lock_mutex));
        if ((l->taken) && (l->owner == hostname)){
            if (l->requesters.empty()) {
                l->taken = false;
		l->last_owner = hostname;
                l->owner = "";
                pthread_mutex_unlock(&(l->lock_mutex));
            } else {
                l->taken = true;
                l->owner = l->requesters.front();
                l->requesters.pop_front();
                pthread_mutex_unlock(&(l->lock_mutex));
                
                pthread_mutex_lock(&grant_mutex);
                grants.push(lockname);
                pthread_cond_signal(&grant_cond);
                pthread_mutex_unlock(&grant_mutex);                
            }
        }
        
    }
    rl->put_result(rpc_id,lock_protocol::OK,"");
    return lock_protocol::OK;
    
}


lock_protocol::status
lock_server_cache::stat(std::string name, int &r)
{
    if (locks.count(name) > 0) {
        r = (locks[name])->times_acquired;
        return lock_protocol::OK;
    } else {
        return lock_protocol::NOENT;
    }
}

int
lock_server_cache::get_new_id() {
    pthread_mutex_lock(&rpc_ticker_mutex);
    int ret = rpc_ticker++;
   
    pthread_mutex_unlock(&rpc_ticker_mutex);
    return cid * 1000 + ret;
}

std::string
lock_server_cache::marshal_state() {
    assert(0);
    return "";
}

void
lock_server_cache::unmarshal_state(std::string
) {
    assert(0);
}


//rsm_state_transfer functions
std::string
lock_server_cache::marshal_lock_state(std::map<std::string, cached_lock*> data) {
    //printf("start marshalling state\n");
    marshall rep;
   

    
    //marshall locks
    rep << data.size();
    std::map<std::string,cached_lock*>::iterator li;
    std::string lockname;
    cached_lock *l;
    std::string marshalled_lock;
    for(li = data.begin(); li != data.end(); li++) {
        lockname = li->first;
        l = data[lockname];
        marshalled_lock = l->marshal_state();
        //printf("marshalling records for lock: %s\n",lockname.c_str());
        rep << lockname;
        rep << marshalled_lock;
    }
    
    printf("Join state transfer \n\n");
    //marshall recorded rpcs    
    //  rep << hostname_rpc_lst.size();
    // std::map<std::string,rpc_lst*>::iterator ri;
    // std::string hostname;
    //rpc_lst *rl;
    //std::string marshalled_rpclst;
    //printf("size of hostname table: %d\n",hostname_rpc_lst.size());
    //for(ri = hostname_rpc_lst.begin(); ri != hostname_rpc_lst.end(); ri++) {
    //   hostname = ri->first;
        //printf("marshalling records for host: %s\n",hostname.c_str());
    //    rl = hostname_rpc_lst[hostname];
    //    marshalled_rpclst = rl->marshal_state();
    //    rep << hostname;
    //    rep << marshalled_rpclst;
    //}
    
   // printf("done marshalling\n");
    return rep.str();
}

void
lock_server_cache::unmarshal_lock_state(std::string state, std::map<std::string,cached_lock*> & data) {

    unmarshall rep(state);
   
    data.clear();
    
    //unmarshall locks
    unsigned int size;
    rep >> size;
   // printf("umarsh size is %d \n", size);
    std::string lockname;
    cached_lock *l;
    std::string marshalled_lock;
    for(unsigned int i = 0; i < size; i++) { 
        rep >> lockname;
	//printf("lockname: %s \n", lockname.c_str());
        rep >> marshalled_lock;
        //printf("unmarshalling records for lock: %s\n",lockname.c_str());
        l = new cached_lock("");
        l->unmarshal_state(marshalled_lock);
        data[lockname] = l;
    }
    
    //unmarshall recorded rpcs    
   // rep >> size;
   // std::string hostname;
   // rpc_lst *rl;
   // std::string marshalled_rpclst;
   // for(unsigned int i = 0; i < size; i++) { 
   //     rep >> hostname;
   //     rep >> marshalled_rpclst;
   //     //printf("unmarshalling records for host: %s\n",hostname.c_str());
   //     rl = new rpc_lst(); 
   //     rl->unmarshal_state(marshalled_rpclst);
   //     hostname_rpc_lst[hostname] = rl;
   // }
  
    
}

lock_server_cache::cached_lock::cached_lock(std::string hostname) {
    int r = pthread_mutex_init(&lock_mutex,0);
    assert(r == 0);
    taken = true;
    last_owner = "";//the server
    owner = hostname;
    times_acquired = 1;        
}

std::string
lock_server_cache::cached_lock::marshal_state() {
    pthread_mutex_lock(&lock_mutex);
    marshall rep;
    if (taken) 
        rep << 1;
    else
        rep << 0;
    rep << owner;
    rep << last_owner;
    rep << times_acquired;
    rep << requesters.size();
    
    std::deque<std::string>::iterator i;
    for (i = requesters.begin(); i != requesters.end(); i++) 
        rep << *i;
    pthread_mutex_unlock(&lock_mutex);
    
    return rep.str();
}

void
lock_server_cache::cached_lock::unmarshal_state(std::string state) {
    unmarshall rep(state);
    int t;
    unsigned int size;
    std::string req;
	
  

    pthread_mutex_lock(&lock_mutex);    
    rep >> t;
    if (t) 
        taken = true;
    else 
        taken = false;
    rep >> owner;
    rep >> last_owner;
    rep >> times_acquired;
    rep >> size;
    for (unsigned int i = 0; i < size; i++) {
        rep >> req;
        requesters.push_back(req);
    }
    pthread_mutex_unlock(&lock_mutex);
    
}

member lock_server_cache::myself() {
	//member me;
	member me(hostname, cid);
	return me;
}
    
//precondition : locks_mutex and rpc_list mutex are held
void lock_server_cache::get_metadata_state(member m) {	
	if (DEBUG) {
		//printf("entering get metadata state (cid %d)\n", cid);
	}

	std::map<std::string,cached_lock*> data;
	std::string state;
	//printf("client %d asks for divide state to client %d \n", cid, m.get_client_id());
	assert(cl.call(m.get_addr(), cached_lock_protocol::divide_state, myself(), state, rpcc::to(1000)) == lock_protocol::OK);
	//printf("get1\n");
	//printf("state has size %d and is %s \n", state.size(), state.c_str());
	unmarshal_lock_state(state, data);
	//printf("get2\n");
	std::map<std::string,cached_lock*>::iterator it = data.begin();
	//printf("get22\n");
	//printf("data size() is %d \n", data.size());
	
	while (it != data.end()) {
	//	printf("get 23\n");
//		printf("it->first is %s \n", it->first.c_str());
		locks[it->first] = it->second;
		it++;
	}
//	printf("get3\n");
	if (DEBUG) {
//		printf("leaving get metadata state \n");
	}

}

metadata_protocol::status lock_server_cache::divide_metadata_state(member m, std::string & state) {	

	if (DEBUG) {
//		printf("entering divide metadata state (client %d )\n", this->cid);
	}
//	printf("c201\n");
	assert(pthread_mutex_lock(&locks_mutex) == 0);
//	printf("c20\n");
	ch->add_member(m);
//	printf("c22\n");
	std::map<std::string,cached_lock*> data;
//	printf("c23\n"); 
	std::map<std::string,cached_lock*>::iterator it = locks.begin();
//	printf("c1\n");
	member me = myself();

	while (it != locks.end()) {
		if (ch->lookup(it->first).equals(m)) {
			data[it->first] = it->second;
            //	it = locks.erase(it);
		}
		it++;
	}
//	printf("c2\n");
  	for (it = data.begin(); it != data.end(); it++) {
        	locks.erase(it->first);
    	}
//	printf("c3\n");
	state = lock_server_cache::marshal_lock_state(data); 
//	printf("c4\n");

	if (DEBUG) {
//		printf("leaving divide metadata state \n");
	}


	assert(pthread_mutex_unlock(&locks_mutex)== 0);
	return lock_protocol::OK;

	

}


//precondition : locks_mutex and rpc_list mutex are held
void lock_server_cache::yield_metadata_state() {

	if (DEBUG) {
//		printf("entering yield metadata state \n");
	}

    std::string state;
	state = marshal_lock_state(locks); 
	std::list<member> membs;

	int ret = -1;
	int r;
	while (ret == -1) {
		ret = cl.call(ch->lookup_previous(myself()).get_addr(), cached_lock_protocol::receive_state, state, r, rpcc::to(1000));	
//		printf("client does not answer to receive metadata state\n");
		assert(cl.call(dst, metadata_protocol::update, membs) == metadata_protocol::OK);
		ch->set_membership(membs);
		
	}
	
	if (DEBUG) {
//		printf("leaving yield metadata state \n");
	}

}



metadata_protocol::status lock_server_cache::receive_metadata_state(std::string state, int &) {

	assert(pthread_mutex_lock(&locks_mutex)== 0);
	if (DEBUG) {
//		printf("entering receive metadata state \n");
	}
	std::map<std::string,cached_lock*>  data;

	unmarshal_lock_state(state, data); 

	std::map<std::string,cached_lock*>::iterator it = data.begin();
      
	while (it != data.end()) {
		if (!ch->lookup(it->first).equals(myself())) {
			locks[it->first] = it->second;
		}
		it++;
	}
	if (DEBUG) {
//		printf("leaving receive metadata state \n");
	}

	assert(pthread_mutex_lock(&locks_mutex)== 0);
	return lock_protocol::OK;

}

void lock_server_cache::freeze_state() {
	assert(pthread_mutex_lock(&locks_mutex)== 0);

}
void lock_server_cache::unfreeze_state() {
	assert(pthread_mutex_unlock(&locks_mutex)== 0);
}

int lock_server_cache::nr_locks() {
	return locks.size();
}

