This commit is contained in:
Simeng He 2021-06-25 16:07:28 -04:00
parent a291147a24
commit ab3eaf69ce
2 changed files with 125 additions and 8 deletions

View File

@ -171,7 +171,8 @@ func (srv *Isoping) initVars() {
srv.minCycleRxdiff = 0
srv.nextCycle = 0
srv.now = srv.Ustime()
srv.nextSend = 0
srv.nextSend = srv.now + uint32(srv.usecPerPkt)
srv.numLost = 0
srv.nextTxackIndex = 0
srv.Tx = Packet{}
srv.Rx = Packet{}
@ -194,7 +195,8 @@ func (srv *Isoping) generateInitialPacket() (*bytes.Buffer, error) {
srv.Tx.Usec_per_pkt = uint32(srv.usecPerPkt)
srv.Tx.Clockdiff = 0
if srv.startRtxtime > 0 {
srv.Rx.Clockdiff = srv.startRtxtime - srv.startRxtime
srv.Tx.Clockdiff = srv.startRtxtime - srv.startRxtime
log.Println("SETCLOCKDIFF to", srv.Rx.Clockdiff)
}
srv.Tx.Num_lost = srv.numLost
srv.Tx.First_ack = uint32(srv.nextTxackIndex)
@ -214,11 +216,45 @@ func (srv *Isoping) Start(address ...string) {
}
srv.initVars()
}
func (srv *Isoping) MainTest() {
for {
srv.initTimer()
}
}
func (srv *Isoping) initTimer() {
tv := time.Duration(0)
if DIFF(srv.nextSend, srv.now) < 0 {
log.Printf("Set to %v us\n", tv.Microseconds())
} else {
tv = time.Microsecond * time.Duration(DIFF(srv.nextSend, srv.now))
log.Printf("Set to %v us\n", tv.Microseconds())
}
log.Println(tv)
// emulate the select with timeout of tv.
if srv.RemoteAddr != nil {
deadline := time.Now().Add(time.Duration(tv.Microseconds()) * time.Microsecond)
log.Println("TIMEOUT DURATION : ", deadline)
err := srv.Conn.SetReadDeadline(deadline)
if err != nil {
log.Println(err)
return
}
} else {
// Reset the timeout, we will have no timeout in this case.
err := srv.Conn.SetReadDeadline(time.Time{})
if err != nil {
log.Println(err)
return
}
}
srv.now = srv.Ustime()
}
func (srv *Isoping) MainLoop() {
for {
log.Println("Action")
srv.now = srv.Ustime()
srv.initTimer()
log.Printf("%d - %d = %d\n", srv.now, srv.nextSend, DIFF(srv.now, srv.nextSend))
if srv.RemoteAddr != nil && DIFF(srv.now, srv.nextSend) >= 0 {
// Check if it is a server or not
@ -256,8 +292,10 @@ func (srv *Isoping) MainLoop() {
p := make([]byte, binary.Size(srv.Rx))
got, rxaddr, err := srv.Conn.ReadFromUDP(p)
if err != nil {
log.Println(err)
// log.Println("READ error : ", err)
continue
} else {
log.Println("SUCC")
}
srv.RxAddr = rxaddr
log.Println("AFTER READ")
@ -265,7 +303,7 @@ func (srv *Isoping) MainLoop() {
buffer := bytes.NewBuffer(p)
err = binary.Read(buffer, binary.BigEndian, &srv.Rx)
if err != nil {
log.Println(err)
log.Println("BINARY READ err: ", err)
continue
}
@ -335,7 +373,7 @@ func (srv *Isoping) MainLoop() {
rtt := clockdiff + int32(srv.Rx.Clockdiff)
// Casting issue here may exist
// offset := DIFF(uint32(clockdiff), uint32(rtt/2))
offset := DIFF(uint32(clockdiff), uint32(rtt/2))
if srv.Rx.Clockdiff == 0 {
srv.lastPrint = srv.now - uint32(srv.usecPerPrint) + 1
} else {
@ -354,7 +392,84 @@ func (srv *Isoping) MainLoop() {
srv.latRxVarSum += srv.latRx * srv.latRx
}
okToPrint := !srv.Quiet && DIFF(srv.now, srv.lastPrint) >= srv.usecPerPrint
log.Println("OKTOPRINT : ", okToPrint)
log.Printf("OKTOPRINT : %v, QUIET : %v\n", okToPrint, srv.Quiet)
log.Printf("%v - %v = %v\n", srv.now, srv.lastPrint, DIFF(srv.now, srv.lastPrint))
log.Printf("usecPerPrint : %v", srv.usecPerPrint)
// Print the information.
if okToPrint {
ackinfo := srv.LastAckInfo
msRx := float64((rxdiff + rtt/2) / 1000.0)
min := float64((rtt / 2) / 1000.0)
rxNumLost := int64(srv.Rx.Num_lost)
nextTxId := srv.nextTxId - 1
numLost := int64(srv.numLost)
nextRxId := int64(srv.nextRxId - 1)
log.Printf("%12s %6.1f ms rx (min=%.1f) loss: %v/%v tx %v/%v rx\n",
ackinfo, msRx, min, rxNumLost, nextTxId, numLost, nextRxId)
srv.lastPrint = srv.now
}
if rxdiff < srv.minCycleRxdiff {
srv.minCycleRxdiff = rxdiff
}
if DIFF(srv.now, srv.nextCycle) >= 0 {
if srv.minCycleRxdiff > 0 {
log.Printf("clock skew: sliding start by %v usec\n", srv.minCycleRxdiff)
srv.startRxtime += uint32(srv.minCycleRxdiff)
}
srv.minCycleRxdiff = 0x7fffffff
srv.nextCycle += uint32(srv.minCycleRxdiff)
}
log.Println("SCHEDULING")
srv.Tx.Acks[srv.nextTxackIndex].Id = id
srv.Tx.Acks[srv.nextTxackIndex].Rxtime = rxtime
srv.nextTxackIndex = (srv.nextTxackIndex + 1) % len(srv.Tx.Acks)
first_ack := uint32(srv.Rx.First_ack)
log.Println("FOR START")
for i := uint32(0); i < uint32(len(srv.Rx.Acks)); i++ {
var acki uint32 = (first_ack + i) % uint32(len(srv.Rx.Acks))
var ackid uint32 = srv.Rx.Acks[acki].Id
if ackid == 0 {
continue
}
if DIFF(ackid, srv.nextRxackId) >= 0 {
log.Println("EXPECTED AN ACK")
startTxTime := srv.nextSend - srv.nextTxId*uint32(srv.usecPerPkt)
txtime := startTxTime + ackid*uint32(srv.usecPerPkt)
rrxtime := srv.Rx.Acks[acki].Rxtime
rxtime := rrxtime + uint32(offset)
txdiff := DIFF(rxtime, txtime)
if srv.usecPerPkt <= 0 && len(srv.LastAckInfo) > 0 {
log.Printf("%12s\n", srv.LastAckInfo)
}
if len(srv.LastAckInfo) == 0 {
//populate it
}
srv.nextRxackId = ackid + 1
srv.latTxCount++
srv.latTx = int64(txdiff)
if srv.latTxMin > srv.latTx {
srv.latTxMin = srv.latTx
}
if srv.latTxMax < srv.latTx {
srv.latTxMax = srv.latTx
}
srv.latTxSum += srv.latTx
srv.latTxVarSum += srv.latTx * srv.latTx
}
}
srv.lastRxtime = rxtime
}
}
func (srv *Isoping) printResult() {
log.Printf("\n---\n")
}

View File

@ -93,6 +93,7 @@ func TestMainLoop(t *testing.T) {
server.Start()
defer server.Conn.Close()
server.MainLoop()
// server.MainTest()
}
func TestStartClient(t *testing.T) {
@ -100,4 +101,5 @@ func TestStartClient(t *testing.T) {
client.Start("[::]:4948")
defer client.Conn.Close()
client.MainLoop()
// client.MainTest()
}