// RPC stubs for clients to talk to lock_server, and cache the locks
// see lock_client.cache.h for protocol details.

#include "lock_client_cache.h"
#include "rpc.h"
#include <sstream>
#include <iostream>
#include <stdio.h>
#include <stdexcept>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <time.h>
#include <pthread.h>

using std::exception;




static void *
releasethread(void *x)
{
  lock_client_cache *cc = (lock_client_cache *) x;
  cc->releaser();
  return 0;
}

int lock_client_cache::last_port = 0;


lock_client_cache::lock_client_cache(sockaddr_in xdst, 
				     class lock_release_user *_lu)
  : lock_client(xdst), lu(_lu)
{
	
  printf("in lock cache client!\n");
 
  srand(time(NULL)^last_port);
  rlock_port = ((rand()%32000) | (0x1 << 10));
  char hname[100];
  assert(gethostname(hname, 100) == 0);
  std::ostringstream host;
  host << hname << ":" << rlock_port;
  hostname = host.str();
  last_port = rlock_port;
  rpcs *rlsrpc = new rpcs(htons(rlock_port));
  /* register RPC handlers with rlsrpc */
  rlsrpc->reg(lock_protocol::grant, this, &lock_client_cache::grant_handler);
  rlsrpc->reg(lock_protocol::revoke, this, &lock_client_cache::revoke_handler);
 
  assert(pthread_mutex_init(&data_lock, NULL) == 0);
  assert(pthread_cond_init(&lock_released_cond, NULL) == 0);
  
  nonce = 0;
  
  cid = rlock_port;
  
  rev_vector = new std::vector<rev_info>;
  rev_index = 0;
  //rev_end = 0;
  rev_list = new std::list<rev_info>;

  pthread_t th;
  int r = pthread_create(&th, NULL, &releasethread, (void *) this);
  assert (r == 0);
}

void lock_client_cache::c(int n){
	//printf("client id %d :  %d \n", cid, n);
}

void lock_client_cache::c(std::string s){
	//printf("client id %d :  %s \n", cid, s.c_str());
}

void
lock_client_cache::releaser()
{
	int r;
  // This method should be a continuous loop, waiting to be notified of
  // freed locks that have been revoked by the server, so that it can
  // send a release RPC.
	
	
	
	while (1) {
	
		//rev_vector is accessed readonly
		int size = rev_vector->size();
		while (rev_index < size) {
			rev_info rv;
			try {
				rv = rev_vector->at(rev_index);
				
				
			} catch (exception& e) {
				if (DEBUG) printf("EXCEPTION CAUGHT!\n");
				break;
			}
			rev_list->push_back(rv);
			rev_index++;
		}
		
		//loop through the list to see what we can revoke
		
		std::list<rev_info>::iterator it = rev_list->begin();
		std::list<rev_info>::iterator  nextit;
		
		while (it != rev_list->end()) {
			
			nextit = ++it; it--;
			
			pthread_mutex_lock(&data_lock);
			//determine if this request was processed
			std::map<std::string, info>::iterator fit = data.find(it->name);
			if (fit!= data.end()) { 
				if (fit->second.nonce <= it->nonce) {
					if ((fit->second.nonce == it->nonce) && (fit->second.type == FREE)) {
						fit->second.type = NONE;
						printf("releasing the lock to the server and flushing\n");
						if (lu != NULL) {
							lu->dorelease(it->name); //flush cache
						}
						cl.call(dst, lock_protocol::release, hostname, it->name, fit->second.nonce, cid, r);
						nextit = rev_list->erase(it);
					} else {
						//do nothing, will come back again and attempt
					}
				} else {
					nextit = rev_list->erase(it);
				}
				
			} else {
				assert(0);
			}
			pthread_mutex_unlock(&data_lock);
			
			it = nextit;
		}
		
		pthread_yield();
		
		
		
	}


}

bool lock_client_cache::isfree(std::string name) {
	std::map<std::string, info>::iterator it = data.find(name);
	return (it->second.type == FREE);
}

bool lock_client_cache::islocked(std::string name) {
	std::map<std::string, info>::iterator it = data.find(name);
	return (it->second.type == LOCKED);
}

bool lock_client_cache::isnone(std::string name) {
	std::map<std::string, info>::iterator it = data.find(name);
	return (it->second.type == NONE);
}
bool lock_client_cache::isacquiring(std::string name) {
	std::map<std::string, info>::iterator it = data.find(name);
	return (it->second.type == ACQUIRING);
}
bool lock_client_cache::exists(std::string name) {
	
	return (data.find(name) != data.end());
} 

lock_protocol::status
lock_client_cache::acquire(std::string name)
{
	int r;
	
	//printf("I am in acquire in lock_client cache\n");
		
	pthread_mutex_lock(&data_lock);
	while ((!exists(name)) || (!isfree(name))) {
		if (!exists(name)) {
			//c("does not exists: status NONE");
			info * inf = new info;
			inf->nonce = 0;
			inf->type = NONE;
			data.insert(std::make_pair<std::string, info>(name, *inf));
			inf->tid = pthread_self(); 
		} 
		
		if (isacquiring(name)) {
			//nothing: keep looping aTTENTION!! to order of the ifs			
			
		} 
		
		if (isnone(name)) { 
			//printf("acquire %s: is none \n", name.c_str());
			nonce++;
			std::map<std::string, info>::iterator it = data.find(name);
			it->second.type = ACQUIRING;
			it->second.nonce = nonce;
			it->second.tid = pthread_self();
			cl.call(dst, lock_protocol::acquire, hostname, name, nonce, cid, r);	
			
		} 			
		
		if (islocked(name)) {
			//nothing: keep looping, need to wait
		}

		/*c("acquire is Waiting...");
		pthread_cond_wait(&lock_released_cond, &data_lock);
		c("done Waiting"); */
		
		struct timespec ts;
		struct timeval timeval1;
  		struct timezone timezone1;
  		gettimeofday(&timeval1, &timezone1 );
		ts.tv_sec= timeval1.tv_sec;
		ts.tv_nsec = timeval1.tv_usec * 1000;
	        ts.tv_nsec += 5000000; //just in case a race occurs = 5ms -- should be woken up by signal though
		pthread_cond_timedwait(&lock_released_cond, &data_lock, &ts);	

		pthread_yield();
	}
	
	
	std::map<std::string, info>::iterator it = data.find(name);
	it->second.type = LOCKED;
	if (DEBUG) c("acquired!");
	it->second.tid = pthread_self();
	//nonce remains the same	
	
	pthread_mutex_unlock(&data_lock);
	
	return lock_protocol::OK;
	
}

lock_protocol::status
lock_client_cache::release(std::string name)
{	
	
	pthread_mutex_lock(&data_lock);
	assert(islocked(name));
	std::map<std::string, info>::iterator it = data.find(name);
	assert(it != data.end());
	it->second.tid = pthread_self();
	it->second.type = FREE;
	pthread_cond_signal(&lock_released_cond);
	pthread_mutex_unlock(&data_lock);
	//if (DEBUG) c("releases..");
	return lock_protocol::OK;
}

lock_protocol::status lock_client_cache::grant_handler(std::string name, int nonce, int & r) {
	//if (DEBUG) c("grant handler called");
	//printf("grant handler: lock %s and nonce %d\n", name.c_str(), nonce);
	//make from none to free in granter	
	std::map<std::string, info>::iterator it = data.find(name);
	if (nonce < it->second.nonce) {
		return lock_protocol::OK;
	} 
	
	if (nonce == it->second.nonce) {
		if (it->second.type == ACQUIRING) {
			it->second.type = FREE;
		}
	} else { // nonce > it->second.nonce
		assert(0); //not possible
	}
		
	
	pthread_cond_signal(&lock_released_cond);
	
	return lock_protocol::OK;
}

lock_protocol::status lock_client_cache::revoke_handler(std::string name, int nonce,  int & r) {
	//if (DEBUG) c("revoke handler called");
	//the check of revoke is done in releaser thread
	//printf("revoke handler: lock %s and nonce %d\n", name.c_str(), nonce);
	rev_info ri;
	ri.nonce = nonce;
	ri.name = name;
	rev_vector->push_back(ri);	
	return lock_protocol::OK;
}
