/* * inDirectChannel is used in this version */ #include "router.hxx" /* class Event_Packet */ Event_Packet::Event_Packet(const Event_Packet &evt) : Event(evt) { ptype_ = evt.ptype_; next_ = NULL; uid_ = evt.uid_; seqno_ = evt.seqno_; } Event *Event_Packet::eventCopy() { return new Event_Packet(*this); } /* class Entity_Source */ Entity_Source::Entity_Source(char * name, int myid) { /* parameters */ int len = strlen(name); name_ = new char[len+1]; assert(name_); strcpy(name_, name); myid_ = myid; /* state variables */ state_rng_ = new Random; assert(state_rng_); /* channels */ outch_ = new outChannel(this , SRC_TRANSMISSION_DELAY); assert(outch_); /* processes */ new Process((Entity *)this, (void (Entity::*)())emit); } procedure void Entity_Source::emit() { int seqno; Time tm; Event_Packet * evt; seqno = 0; while (1) { tm = state_rng_->exponential(SRC_INTERARRIVAL_RATE); waitFor(tm); evt = new Event_Packet(1, 1); evt->seqno_ = seqno++; fprintf(arvl, "%f \n", now()); //fprintf(stderr, "%f: emit()->%p\n", now(), evt); outch_->write(evt); } } /* class Router */ Entity_Router::Entity_Router(char * name, int myid) { /* parameters */ int len = strlen(name); name_ = new char[len+1]; assert(name_); strcpy(name_, name); myid_ = myid; /* channels */ inch_ = new inDirectChannel(this); assert(inch_); inch_->attach(this, (evtFunc) &scan); // outch = new outChannel(this, 0); assert(outch); /* demultiplexier */ demu_ = new Entity_Classifier(); assert(demu_); /* process */ //new Process((Entity *)this, (void (Entity::*)()) scan); } void Entity_Router::scan(Event * evt) { Event_Packet * pkt; outChannel * outch; pkt = (Event_Packet *) evt; //fprintf(stderr, "%f: scan(%p)\n", now(), evt); assert(demu_); /* check the routing table, choose the outChannel to send the packet to */ outch = demu_->find(pkt); if(outch) outch->write(pkt); else error_quit("must be something wrong!\n"); } /* class Entity_Link */ Entity_Link::Entity_Link(char * name, int myid) { /* parameters */ int len = strlen(name); name_ = new char[len+1]; assert(name_); strcpy(name_, name); myid_ = myid; /* Qeueu */ // qu_ = new DropTail(Q_LIM); qu_ = new REDQueue(Q_LIM); qu_->link_ = this; /* channels */ inch_ = new inChannel(this); assert(inch_); outch_ = new outChannel(this, 1); assert(outch_); intch_in_ = new inChannel(this); assert(intch_in_); intch_out_ = new outChannel(this); assert(intch_out_); intch_out_->mapto(intch_in_); /* process */ new Process((Entity *)this, (void (Entity::*)()) scan); new Process((Entity *) this, (void (Entity::*)()) serve); } procedure void Entity_Link::scan() { Event_Packet ** pkt; waitsOn(inch_); while(1) { waitOn(); /* get the arrival packet and put it into the queue */ pkt = (Event_Packet **) inch_->activeEvents(); qu_->enque(pkt[0]); delete[] pkt; printf("In Link: Packet %i \n", pkt[0]->uid_); /* If the queue is empty right now, send alert message to serve() */ if (qu_->length() == 1) { intch_out_->write(new Event()); } } } procedure void Entity_Link::serve() { double serv_tm; Event_Packet * outpkt; /* wait on the incoming signal */ waitsOn(intch_in_); for(;;) { waitOn(); /* serve the queue */ while (qu_->length() > 0) { outpkt = qu_->deque(); serv_tm = 0.1; waitFor(serv_tm); fprintf(dp, "%f \n", now()); /* send the pkt off */ } } } /* class: Entity_PacketQueue */ void Entity_PacketQueue::remove(Event_Packet* target) { for (Event_Packet** p = &head_; *p != 0; p = &(*p)->next_) { Event_Packet* pkt = *p; if (pkt == target) { *p = pkt->next_; if (tail_ == &pkt->next_) tail_ = p; --len_; return; } } abort(); } /* class: Entity_Queue */ Entity_Queue::Entity_Queue(int lim_) : blocked_(0), unblock_on_resume_(1), pq_(0) /* temporarily NULL */ { qlim_ = lim_; } void Entity_Queue::drop(Event_Packet * p) { p->unreference(); } void Entity_Queue::recv(Event_Packet* p) { enque(p); if (!blocked_) { /* * We're not blocked. Get a packet and send it on. * We perform an extra check because the queue * might drop the packet even if it was * previously empty! (e.g., RED can do this.) */ p = deque(); if (p != 0) { blocked_ = 1; // target_->recv(p, &qh_); } } } void Entity_Queue::resume() { Event_Packet* p = deque(); if (p != 0) { // target_->recv(p, &qh_); } else { if (unblock_on_resume_) blocked_ = 0; else blocked_ = 1; } } void Entity_Queue::reset() { Event_Packet* p; while ((p = deque()) != 0) drop(p); } /* class DropTail */ void DropTail::enque(Event_Packet* p) { q_->enque(p); if (q_->length() >= qlim_) { if (drop_front_) { /* remove from head of queue */ Event_Packet *pp = q_->deque(); drop(pp); } else { q_->remove(p); drop(p); } } } Event_Packet* DropTail::deque() { return q_->deque(); } /* class Entity_Classifier */ Entity_Classifier::Entity_Classifier() : slot_(0), nslot_(0), maxslot_(-1) , shift_(0), mask_(0xffffffff) { } Entity_Classifier::~Entity_Classifier() { if(slot_) delete[] slot_; /* use delete [] slot_ instead ?? */ } void Entity_Classifier::alloc(int slot) { outChannel** old = slot_; int n = nslot_; if (old == 0) nslot_ = 32; while (nslot_ <= slot) nslot_ <<= 1; slot_ = new outChannel*[nslot_]; memset(slot_, 0, nslot_ * sizeof(outChannel*)); if (old) { // there is bug in original ns code for (int i = 0; i < n; ++i) slot_[i] = old[i]; delete[] old; } } void Entity_Classifier::install(int slot, outChannel* p) { if (slot >= nslot_) alloc(slot); slot_[slot] = p; if (slot >= maxslot_) maxslot_ = slot; } void Entity_Classifier::clear(int slot) { slot_[slot] = 0; if (slot == maxslot_) { while (--maxslot_ >= 0 && slot_[maxslot_] == 0) ; } } int Entity_Classifier::getnxt() { int i; for (i=0; i < nslot_; i++) if (slot_[i]==0) return i; i=nslot_; alloc(nslot_); return i; } /* * perform the mapping from packet to outChannel * perform upcall if no mapping */ outChannel * Entity_Classifier::find(Event_Packet* const p) { outChannel * och = NULL; int cl = classify(p); if (cl < 0 || cl >= nslot_ || (och = slot_[cl]) == 0) { /* * Try again. Maybe callback patched up the table. */ // cl = classify(p); // if (cl < 0 || cl >= nslot_ || (och = slot_[cl]) == 0) return (NULL); } return (och); } /* calss REDQueue */ REDQueue::REDQueue(int lim) : Entity_Queue(lim), idle_(0), bcount_(0) { q_ = new Entity_PacketQueue(); // underlying queue pq_ = q_; ug_ = new Random; reset(); // #ifdef notdef print_edp(); print_edv(); // #endif } void REDQueue::reset() { /* * If queue is measured in bytes, scale min/max thresh * by the size of an average packet (which is specified by user). */ if (qib_) { edp_.th_min *= edp_.mean_pktsize; edp_.th_max *= edp_.mean_pktsize; } /* * Compute the "packet time constant" if we know the * link bandwidth. The ptc is the max number of (avg sized) * pkts per second which can be placed on the link. * The link bw is given in bits/sec, so scale mean psize * accordingly. */ // *** if (link_) edp_.ptc = link_->bandwidth() / (8. * edp_.mean_pktsize); edv_.v_ave = 0.0; edv_.v_slope = 0.0; edv_.count = 0; edv_.count_bytes = 0; edv_.old = 0; edv_.v_a = 1 / (edp_.th_max - edp_.th_min); edv_.v_b = - edp_.th_min / (edp_.th_max - edp_.th_min); idle_ = 1; idletime_ = now(); Entity_Queue::reset(); bcount_ = 0; } /* * Compute the average queue size. * The code contains two alternate methods for this, the plain EWMA * and the Holt-Winters method. * nqueued can be bytes or packets */ void REDQueue::run_estimator(int nqueued, int m) { float f, f_sl, f_old; f = edv_.v_ave; f_sl = edv_.v_slope; //#define RED_EWMA //#ifdef RED_EWMA while (--m >= 1) { f_old = f; f *= 1.0 - edp_.q_w; } f_old = f; f *= 1.0 - edp_.q_w; f += edp_.q_w * nqueued; //#endif /* #ifdef RED_HOLT_WINTERS while (--m >= 1) { f_old = f; f += f_sl; f *= 1.0 - edp_.q_w; f_sl *= 1.0 - 0.5 * edp_.q_w; f_sl += 0.5 * edp_.q_w * (f - f_old); } f_old = f; f += f_sl; f *= 1.0 - edp_.q_w; f += edp_.q_w * nqueued; f_sl *= 1.0 - 0.5 * edp_.q_w; f_sl += 0.5 * edp_.q_w * (f - f_old); #endif */ edv_.v_ave = f; edv_.v_slope = f_sl; } /* * Return the next packet in the queue for transmission. */ Event_Packet* REDQueue::deque() { Event_Packet *p; p = q_->deque(); if (p != 0) { idle_ = 0; // bcount_ -= ((hdr_cmn*)p->access(off_cmn_))->size(); } else { idle_ = 1; // deque() may invoked by Queue::reset at init // time (before the scheduler is instantiated). // deal with this case idletime_ = now(); } return (p); } int REDQueue::drop_early(Event_Packet* pkt) { // hdr_cmn* ch = (hdr_cmn*)pkt->access(off_cmn_); double p = edv_.v_a * edv_.v_ave + edv_.v_b; p /= edp_.max_p_inv; edv_.v_prob1 = p; if (edv_.v_prob1 > 1.0) edv_.v_prob1 = 1.0; double count1 = edv_.count; if (edp_.bytes) count1 = (double) (edv_.count_bytes/edp_.mean_pktsize); if (edp_.wait) { if (count1 * p < 1.0) p = 0.0; else if (count1 * p < 2.0) p /= (2 - count1 * p); else p = 1.0; } else { if (count1 * p < 1.0) p /= (1.0 - count1 * p); else p = 1.0; } /* if (edp_.bytes && p < 1.0) { p = p * ch->size() / edp_.mean_pktsize; } */ if (p > 1.0) p = 1.0; edv_.v_prob = p; // drop probability is computed, pick random number and act double u = ug_->uniform(); if (u <= edv_.v_prob) { // DROP or MARK edv_.count = 0; edv_.count_bytes = 0; // hdr_flags* hf = (hdr_flags*)pickPacketForECN(pkt)->access(off_flags_); // if (edp_.setbit && hf->ecn_capable_) { // hf->ecn_to_echo_ = 1; // } else { return (1); // } } return (0); // no DROP/mark } /* * Pick packet for early congestion notification (ECN). This packet is then * marked or dropped. Having a separate function do this is convenient for * supporting derived classes that use the standard RED algorithm to compute * average queue size but use a different algorithm for choosing the packet for * ECN notification. */ Event_Packet* REDQueue::pickPacketForECN(Event_Packet* pkt) { return pkt; /* pick the packet that just arrived */ } /* * Pick packet to drop. Having a separate function do this is convenient for * supporting derived classes that use the standard RED algorithm to compute * average queue size but use a different algorithm for choosing the victim. */ Event_Packet* REDQueue::pickPacketToDrop() { int victim; if (drop_front_) ; // victim = min(1, q_->length()-1); else if (drop_rand_) ; // victim = Random::integer(q_->length()); else /* default is drop_tail_ */ victim = q_->length() - 1; return(q_->lookup(victim)); } /* * Receive a new packet arriving at the queue. * The average queue size is computed. If the average size * exceeds the threshold, then the dropping probability is computed, * and the newly-arriving packet is dropped with that probability. * The packet is also dropped if the maximum queue size is exceeded. * * "Forced" drops mean a packet arrived when the underlying queue was * full or when the average q size exceeded maxthresh. * "Unforced" means a RED random drop. * * For forced drops, either the arriving packet is dropped or one in the * queue is dropped, depending on the setting of drop_tail_. * For unforced drops, the arriving packet is always the victim. */ void REDQueue::enque(Event_Packet* pkt) { /* * if we were idle, we pretend that m packets arrived during * the idle period. m is set to be the ptc times the amount * of time we've been idle for */ int m = 0; if (idle_) { double cur_tm = now(); // Scheduler::instance().clock(); /* To account for the period when the queue was empty. */ idle_ = 0; m = int(edp_.ptc * (cur_tm - idletime_)); } /* * Run the estimator with either 1 new packet arrival, or with * the scaled version above [scaled by m due to idle time] * (bcount_ maintains the byte count in the underlying queue). * If the underlying queue is able to delete packets without * us knowing, then bcount_ will not be maintained properly! */ run_estimator(qib_ ? bcount_ : q_->length(), m + 1); /* * count and count_bytes keeps a tally of arriving traffic * that has not been dropped (i.e. how long, in terms of traffic, * it has been since the last early drop) */ // *** should I add the packet header, as in ns, into Event_Packet *** // hdr_cmn* ch = (hdr_cmn*)pkt->access(off_cmn_); ++edv_.count; // edv_.count_bytes += ch->size(); /* * DROP LOGIC: * q = current q size, ~q = averaged q size * 1> if ~q > maxthresh, this is a FORCED drop * 2> if minthresh < ~q < maxthresh, this may be an UNFORCED drop * 3> if (q+1) > hard q limit, this is a FORCED drop */ register double qavg = edv_.v_ave; int droptype = DTYPE_NONE; int qlen = qib_ ? bcount_ : q_->length(); int qlim = qib_ ? (qlim_ * edp_.mean_pktsize) : qlim_; curq_ = qlen; // helps to trace queue during arrival, if enabled if (qavg >= edp_.th_min && qlen > 1) { if (qavg >= edp_.th_max) { droptype = DTYPE_FORCED; } else if (edv_.old == 0) { // SALLY: would like a comment here edv_.count = 1; // edv_.count_bytes = ch->size(); edv_.old = 1; } else if (drop_early(pkt)) { droptype = DTYPE_UNFORCED; } } else { edv_.v_prob = 0.0; edv_.old = 0; // explain } if (qlen >= qlim) { // see if we've exceeded the queue size droptype = DTYPE_FORCED; } if (droptype == DTYPE_UNFORCED) { /* pick packet for ECN, which is dropping in this case */ Event_Packet *pkt_to_drop = pickPacketForECN(pkt); /* * If the packet picked is different that the one that just arrived, * add it to the queue and remove the chosen packet. */ if (pkt_to_drop != pkt) { q_->enque(pkt); // bcount_ += ch->size(); q_->remove(pkt_to_drop); // bcount_ -= ((hdr_cmn*)pkt_to_drop->access(off_cmn_))->size(); pkt = pkt_to_drop; /* XXX okay because pkt is not needed anymore */ } // deliver to special "edrop" target, if defined /* if (de_drop_ != NULL) de_drop_->recv(pkt); else drop(pkt); */ } else { /* forced drop, or not a drop: first enqueue pkt */ q_->enque(pkt); // bcount_ += ch->size(); /* drop a packet if we were told to */ if (droptype == DTYPE_FORCED) { /* drop random victim or last one */ pkt = pickPacketToDrop(); q_->remove(pkt); // bcount_ -= ((hdr_cmn*)pkt->access(off_cmn_))->size(); drop(pkt); } } return; } /* * Routine called by TracedVar facility when variables change values. * Currently used to trace values of avg queue size, drop probability, * and the instantaneous queue size seen by arriving packets. * Note that the tracing of each var must be enabled in tcl to work. */ /* for debugging help */ void REDQueue::print_edp() { printf("mean_pktsz: %d\n", edp_.mean_pktsize); printf("bytes: %d, wait: %d, setbit: %d\n", edp_.bytes, edp_.wait, edp_.setbit); printf("minth: %f, maxth: %f\n", edp_.th_min, edp_.th_max); printf("max_p_inv: %f, qw: %f, ptc: %f\n", edp_.max_p_inv, edp_.q_w, edp_.ptc); printf("qlim: %d, idletime: %f\n", qlim_, idletime_); printf("=========\n"); } void REDQueue::print_edv() { printf("v_a: %f, v_b: %f\n", edv_.v_a, edv_.v_b); } /* class Entity_MyRoot */ Entity_MyRoot::Entity_MyRoot(int ac, char **av, int id, int np, Time etime) : Root(ac, av, id, np, etime) { src = new Entity_Source *[NUM_SRC]; assert(src); router = new Entity_Router *[NUM_RUT]; assert(router); link = new Entity_Link *[NUM_LNK]; assert(link); } void Entity_MyRoot::init() { Root::init(); /* open the arvl and dp file descriptor */ arvl = fopen("Arvl.dat", "w"); if(!arvl) syserr_quit("can't open Arvl.data"); dp = fopen("Dp.dat", "w"); if(!dp) syserr_quit("can't open Dp.dat"); for(int i=0; imakeIndependent(); } for(int i=0; imakeIndependent(); } for (int i=0; imakeIndependent(); } src[0]->alignTo(router[0]); link[0]->alignTo(router[0]); } void Entity_MyRoot::build() { /* In current version, both NUM_SRC and NUM_RUT are zero */ for(int i=0; ioutch_->mapto(router[i]->inch_); for (int i=0; ioutch->mapto(((Entity_MyRoot *)(rootAt(0)))->link[i]->inch); // router[i]->outch->mapto(link[i]->inch); outChannel * outch = new outChannel(router[i], 0); assert(outch); outch->mapto(link[i]->inch_); /* build routing table Note: there is only one session and its * uid_ is 1 */ router[i]->demu_->install(1 , outch); } } void Entity_MyRoot::wrapup() { Root::wrapup(); for(int i=0; i