// RPC stubs for clients to talk to extent_server

#include "extent_client.h"
#include "../rpc/rpc.h"
#include <sstream>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <time.h>

// The calls assume that the caller holds a lock on the extent

extent_client::extent_client(sockaddr_in xdst)
  : dst(xdst)
{
	
	data_cache = new std::map<extent_protocol::extentid_t, data_unit *>;
	attr_cache = new std::map<extent_protocol::extentid_t, attr_unit *>;

	remove_attr = new std::list<extent_protocol::extentid_t>;
	remove_data = new std::list<extent_protocol::extentid_t>;
	
	assert(pthread_mutex_init(&cache_mutex, NULL) == 0);

	//here is the put of extent 1
	
}


//starts reading from the number that indicates the length of the string and after has the index of the next such number
unsigned long long extent_client::read_nr(unsigned int start, std::string str, unsigned int & after)
{
	unsigned int i = start;
	unsigned long long nr = 0;
	char c;
	
	while  ((i < str.size()) && (c = str.at(i)) != ' ') {
		char  aux[100];
		sprintf(aux, "%c", c);
		nr = nr * 10 + atoi(aux);
		i++;
	}
	//printf("in read_nr we read the number %llu ", nr);
	i++;
	after = i;
	return nr;
}


void print_attr(extent_protocol::attr  buf) {
	//printf("size %u mtime %ld\n", buf.size, buf.mtime );	
}

extent_protocol::attr * copy(extent_protocol::attr a)
{
	extent_protocol::attr * new_a = new extent_protocol::attr;
	new_a->ctime = a.ctime;
	new_a->mtime = a.mtime;
	new_a->atime = a.atime;
	new_a->size = a.size;
	return new_a;
}


//precondition: it points to the first digit of the size number
//starts reading from the number that indicates the length of the string and after has the index of the next such number
std::string extent_client::read_name(unsigned int start, std::string str, unsigned int & after) 
{
	unsigned long long len = read_nr(start, str, after);
	while ((after < str.size())&&(str.at(after)==' ')){
		after++;
	}
	if (after >= str.size()) {
		return "";
	}
	std::string name = "";
	for (unsigned int j = 0; j < len; j++) {
		char aux[100];
		sprintf(aux, "%c", str.at(after+j) );
		name.append(aux);
		
	}
	//printf("in read_name, we read the name: %s\n", name.c_str());
	after = after + len + 1;
	
	return name;
}

int extent_client::get_content_position(std::string full_value) {
	unsigned int i = 0;
	
	while ((i < full_value.size()) && (full_value.at(i) == ' ')) {
		i++;
	}
	if (i < full_value.size()) {
		read_name(i, full_value, i);
	} else {
		return -1;
	}
	while ((i < full_value.size()) && (full_value.at(i) == ' ' )) {
		i++;
	}
	
	if (i < full_value.size()) {
		read_nr(i, full_value, i);
	} else {
		return -1;
	}
	while ((i < full_value.size()) && (full_value.at(i) == ' ')) {
		i++;
	}
	if (i < full_value.size()) {
		read_nr(i, full_value, i);
	} else {
		return -1;
	}
	return i;
	

}

bool gotRemoved(std::list<extent_protocol::extentid_t> * lst, extent_protocol::extentid_t eid) {

	std::list<extent_protocol::extentid_t>::iterator it = lst->begin();
	
	while (it != lst->end()) {	
		if (*it == eid) {
			return true;
		}
		it++;
	}

	return false;

}

extent_protocol::status extent_client::get_cache_data(extent_protocol::extentid_t eid, std::string & buf) 
{

	//if (eid != 1)
	//printf("get_cache_data entering with eid %llu \n", eid);
	
	pthread_mutex_lock(&cache_mutex);
	if (gotRemoved(remove_data, eid)) {
		printf("get_cache_data: returning with NOENT");
		pthread_mutex_unlock(&cache_mutex);
		return extent_protocol::NOENT;
	}

	std::map<extent_protocol::extentid_t, data_unit *>::iterator it = data_cache->find(eid);

	if (it == data_cache->end()) {
		//the data is not in the cache
		printf("CACHE MISS for eid %llu get \n", eid);
		if (eid == 1) {
			extent_protocol::status  ret = cl.call(dst, extent_protocol::get, eid, buf);
			if (ret != extent_protocol::OK) {
				printf("extent_protocol: returning with Error \n");
				pthread_mutex_unlock(&cache_mutex);
				return ret;
			}
			data_unit * new_data_unit = new data_unit;
  			new_data_unit->buf = buf;
  			new_data_unit->dirty = false;
			data_cache->insert(std::make_pair<extent_protocol::extentid_t, data_unit *>(eid, new_data_unit));
		} else {
			return extent_protocol::NOENT;
		}
	} else {
		//it is in the cache
		buf = it->second->buf;
	
	}
	//if (eid  != 1)
	//printf("get_cache_data leaving with eid %llu and buf result is %s \n", eid, buf.c_str());
	pthread_mutex_unlock(&cache_mutex);
	return extent_protocol::OK;
}




extent_protocol::status extent_client::get_cache_attr(extent_protocol::extentid_t eid, extent_protocol::attr & buf) 
{  
	//if (eid != 1)
	//printf("get_cache_attr: entering with eid %llu get attr \n",eid);
	pthread_mutex_lock(&cache_mutex);
	extent_protocol::status ret = extent_protocol::OK;	

	if (gotRemoved(remove_attr, eid)) {
		printf("returning with Error from get_cache_attr: entry was removed \n");
		pthread_mutex_unlock(&cache_mutex);
		return extent_protocol::NOENT;
	}

	std::map<extent_protocol::extentid_t, attr_unit *>::iterator it = attr_cache->find(eid);

	if (it == attr_cache->end()) {
		//the data is not in the cache
		printf("CACHE MISS for eid %llu \n", eid);
		if (eid == 1) {
			ret = cl.call(dst, extent_protocol::getattr, eid, buf);
			if (ret != extent_protocol::OK) {
				printf("returning with Error from get_cache_attr\n");
				pthread_mutex_unlock(&cache_mutex);
				return ret;
			}
               		attr_unit * new_attr_unit = new attr_unit;
			new_attr_unit->attr = *copy(buf);
  			new_attr_unit->dirty = false;
			attr_cache->insert(std::make_pair<extent_protocol::extentid_t, attr_unit *>(eid, new_attr_unit));
		} else {
			return extent_protocol::NOENT;
		}
	} else {
		//it is in the cache
		buf = it->second->attr;
	
	}
	//if (eid != 1)
	//printf("get_cache_attr: leaving with eid %llu, attr result is ",eid); print_attr(buf);
	pthread_mutex_unlock(&cache_mutex);
	return ret;
}


//returns true if the remove record was removed from the remove list
bool undoRemove(std::list<extent_protocol::extentid_t> * lst, extent_protocol::extentid_t eid) 
{
	std::list<extent_protocol::extentid_t>::iterator it = lst->begin();
	
	while (it != lst->end()) {	
		if (*it == eid) {
			lst->erase(it);
			return true;
		} 
		it++;
	}

	return false;

}

	
extent_protocol::status extent_client::put_cache_data(extent_protocol::extentid_t eid, std::string  buf) 
{
	//if (eid!=1)
	//printf("put_cache_data: entering for eid %llu , in hex, %08x and buf %s \n", eid, eid, buf.c_str() );
	extent_protocol::status ret = extent_protocol::OK;
	
	pthread_mutex_lock(&cache_mutex);
	
	undoRemove(remove_data, eid);

	std::map<extent_protocol::extentid_t, data_unit *>::iterator it = data_cache->find(eid);

	if (it == data_cache->end()) {
		//the data is not in the cache
		data_unit * new_data_unit = new data_unit;
  		new_data_unit->buf = buf;
  		new_data_unit->dirty = true;
		data_cache->insert(std::make_pair<extent_protocol::extentid_t, data_unit *>(eid, new_data_unit));
	}
	else {
		it->second->buf = buf;
		it->second->dirty = true;
			
	}
	
	undoRemove(remove_attr, eid);
	
	std::map<extent_protocol::extentid_t, attr_unit *>::iterator it2 = attr_cache->find(eid);
	
	if (it2 == attr_cache->end()) {
		//create attribute and make it dirty so that we flush it also
		attr_unit * new_attr_unit = new attr_unit;
		if (eid & 0x80000000) {
			new_attr_unit->attr.size = buf.size()- get_content_position(buf);
		} else { 
			new_attr_unit->attr.size = 0;
		}
		
		struct timeval timeval1;
		struct timezone timezone1;
		gettimeofday(&timeval1, &timezone1 );
		
		new_attr_unit->attr.atime = timeval1.tv_usec;
		new_attr_unit->attr.ctime = timeval1.tv_usec;
		new_attr_unit->attr.mtime = timeval1.tv_usec;
		//printf("modified time is %ld \n", (long) new_attr_unit->attr.mtime);
		new_attr_unit->dirty = true;
		attr_cache->insert(std::make_pair<extent_protocol::extentid_t, attr_unit *>(eid, new_attr_unit));
	} else {
		
		if (eid & 0x80000000) {
			it2->second->attr.size = buf.size()- get_content_position(buf);
		} else { 
			it2->second->attr.size = 0;
		}
		
		struct timeval timeval1;
		struct timezone timezone1;
		gettimeofday(&timeval1, &timezone1 );
		
		it2->second->attr.atime = timeval1.tv_usec;
		it2->second->attr.ctime = timeval1.tv_usec;
		it2->second->attr.mtime = timeval1.tv_usec;
		//printf("modified time is %ld \n", (long) it2->second->attr.mtime);
		it2->second->dirty = true;
	}
		
	//if (eid!=1)
	//printf("put_cache_data:leaving for eid %llu \n", eid);
	pthread_mutex_unlock(&cache_mutex);
	return ret;
}


extent_protocol::status extent_client::put_cache_attr(extent_protocol::extentid_t eid, extent_protocol::attr  buf) 
{
	//if (eid !=1)
	//printf("put_cache_attr:entering for eid %llu and attr \n", eid );print_attr(buf);
	extent_protocol::status ret = extent_protocol::OK;
	pthread_mutex_lock(&cache_mutex);
	undoRemove(remove_attr, eid);

	struct timeval timeval1;
	struct timezone timezone1;
	gettimeofday(&timeval1, &timezone1 );
	
	std::map<extent_protocol::extentid_t, attr_unit *>::iterator it = attr_cache->find(eid);

	if (it == attr_cache->end()) {
		//the data is not in the cache
		attr_unit * new_attr_unit = new attr_unit;
		new_attr_unit->attr = *copy(buf);
		new_attr_unit->attr.mtime = timeval1.tv_usec;
  		new_attr_unit->dirty = true;
		attr_cache->insert(std::make_pair<extent_protocol::extentid_t, attr_unit *>(eid, new_attr_unit));
	} else {
		//it is in the cache
		
		it->second->attr = buf;
		it->second->attr.mtime = timeval1.tv_usec;
		it->second->dirty = true;
	
	}
	//if (eid != 1)
	//printf("put_cache_attr:leaving for eid %llu \n", eid);
	pthread_mutex_unlock(&cache_mutex);
	return ret;
}

extent_protocol::status extent_client::do_remove(extent_protocol::extentid_t eid) {
	
	//printf("remove_from_cache: entering for eid %llu \n", eid);
	
	//remove from data cache
	std::string buf;
	/*extent_protocol::status result;
	 if ((result = get_cache_data(eid, buf)) == extent_protocol::NOENT) {
		
		return extent_protocol::NOENT;
	}*/
	
	pthread_mutex_lock(&cache_mutex);
	
	
	remove_data->push_back(eid);
	remove_attr->push_back(eid);
	
	data_cache->erase(eid);
	
	//remove from attr cache	
	/* extent_protocol::attr attr;	

	if ((result = get_cache_attr(eid, attr)) == extent_protocol::NOENT) {
		pthread_mutex_unlock(&cache_mutex);
		return extent_protocol::NOENT;
	}*/
	
	remove_attr->push_back(eid);

	attr_cache->erase(eid);
	
	//printf("remove_from_cache:leaving for eid %llu \n", eid);
	pthread_mutex_unlock(&cache_mutex);
	return extent_protocol::OK;
	

}

//flushes the data at the server that refers to this eid
extent_protocol::status extent_client::flush(extent_protocol::extentid_t eid) {

	int r;
	extent_protocol::status ret;

	//printf("flush: entering for eid %llu  here is what data I have:\n", eid);
	pthread_mutex_lock(&cache_mutex);
	//printf("flush: acquired the lock \n");
	if (gotRemoved(remove_data, eid)) {
		assert(gotRemoved(remove_attr, eid));
		undoRemove(remove_attr, eid);//take from list of removes
		undoRemove(remove_data, eid);//take from list of removes
		assert(data_cache->find(eid) == data_cache->end());
		assert(attr_cache->find(eid) == attr_cache->end());
		int ret = cl.call(dst, extent_protocol::remove, eid, r);
		pthread_mutex_unlock(&cache_mutex);
		printf("eid removed from extent \n");
		return ret;
	}
	//printf("c1\n");
	//not removed, maybe update
	
	std::map<extent_protocol::extentid_t, data_unit *>::iterator it = data_cache->find(eid);	

	if (it != data_cache->end()) {
		//printf("buf %s, dirty %d \n", it->second->buf.c_str(), it->second->dirty);
		if (it->second->dirty) {
			ret = cl.call(dst, extent_protocol::put, eid, it->second->buf, r);
			data_cache->erase(eid);
			if (ret != extent_protocol::OK) {
				printf("flush %llu : ERROR with put \n", eid);
				pthread_mutex_unlock(&cache_mutex);
				return ret;
			} 
		} else {
			data_cache->erase(eid);
		}
	} else {
		//printf("we have no data to flush\n");
	}
		
	std::map<extent_protocol::extentid_t, attr_unit *>::iterator it2 = attr_cache->find(eid);	

	if (it2 != attr_cache->end()) {
		//printf("attr size %d, dirty %d \n", it2->second->attr.size, it2->second->dirty);
		if (it2->second->dirty) {
			ret = cl.call(dst, extent_protocol::setattr, eid, it2->second->attr, r);
			attr_cache->erase(eid);
			printf("c2\n");
			if (ret != extent_protocol::OK) {
				printf("flush: ERROR with setattr \n");
				pthread_mutex_unlock(&cache_mutex);
				return ret;
			} 
			
		} else { //printf("c3\n");
			attr_cache->erase(eid);
		}
		//printf("c4\n");
		
	} else {
		//printf("no attr to flush\n");
	}

	

	
	//printf("flush: leaving for eid %llu \n", eid);
	pthread_mutex_unlock(&cache_mutex);
	return extent_protocol::OK;
	
}

extent_protocol::status
extent_client::get(extent_protocol::extentid_t eid, std::string &buf)
{
  return get_cache_data(eid, buf);  
}

extent_protocol::status
extent_client::getattr(extent_protocol::extentid_t eid, 
		       extent_protocol::attr &attr)
{
  return get_cache_attr(eid, attr);
 }

extent_protocol::status
		extent_client::setattr(extent_protocol::extentid_t eid, 
				       extent_protocol::attr &attr)
{
	return put_cache_attr(eid, attr);	
}

extent_protocol::status
extent_client::put(extent_protocol::extentid_t eid, std::string buf)
{

  return put_cache_data(eid, buf);

}

extent_protocol::status
extent_client::remove(extent_protocol::extentid_t eid)
{
	return do_remove(eid);
}

extent_protocol::status
		extent_client::print_map() {
	return 0; 
}



