Add traffic records matched statistics and total processing time (#46)

* feat: statistics fwd and bwd traffic matched

* fix: missed line wrap

* feat: statistics osrm_traffic_updater processing time
This commit is contained in:
Jay 2019-07-20 01:22:25 +08:00 committed by Xun(Perry) Liu
parent c613b1737c
commit a247e34ea4
4 changed files with 68 additions and 49 deletions

View File

@ -11,6 +11,8 @@ type dumperStatisticItems struct {
bwdRecordCnt uint64
wayMatchedCnt uint64
nodeMatchedCnt uint64
fwdTrafficMatchedCnt uint64
bwdTrafficMatchedCnt uint64
}
type dumperStatistic struct {
@ -20,12 +22,12 @@ type dumperStatistic struct {
close bool
}
func (d* dumperStatistic) Init(n int) {
func (d *dumperStatistic) Init(n int) {
d.c = make(chan dumperStatisticItems, n)
d.init = true
}
func (d* dumperStatistic) Close() {
func (d *dumperStatistic) Close() {
if !d.init {
return
}
@ -37,24 +39,27 @@ func (d* dumperStatistic) Close() {
d.sum.bwdRecordCnt += item.bwdRecordCnt
d.sum.wayMatchedCnt += item.wayMatchedCnt
d.sum.nodeMatchedCnt += item.nodeMatchedCnt
d.sum.fwdTrafficMatchedCnt += item.fwdTrafficMatchedCnt
d.sum.bwdTrafficMatchedCnt += item.bwdTrafficMatchedCnt
}
d.close = true
}
func (d* dumperStatistic) Sum() (dumperStatisticItems) {
func (d *dumperStatistic) Sum() dumperStatisticItems {
return d.sum
}
func (d* dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64,
bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64) {
func (d *dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64,
bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64,
fwdTrafficMatchedCnt uint64, bwdTrafficMatchedCnt uint64) {
if !d.init {
fmt.Printf("dumperStatistic->Update() failed, please call Init() first otherwise will block all functions. \n")
return
}
d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt})
d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt, fwdTrafficMatchedCnt, bwdTrafficMatchedCnt})
}
func (d* dumperStatistic) Output() {
func (d *dumperStatistic) Output() {
if !d.close {
fmt.Printf("Close() hasn't been called, no statistic collected.\n")
return
@ -64,7 +69,8 @@ func (d* dumperStatistic) Output() {
fmt.Printf("Load %d way from data with %d nodes.\n", d.sum.wayCnt, d.sum.nodeCnt)
fmt.Printf("%d way with %d nodes matched with traffic record.\n",
d.sum.wayMatchedCnt, d.sum.nodeMatchedCnt)
fmt.Printf("%d traffic records(%d forward and %d backward) have been matched.\n",
d.sum.fwdTrafficMatchedCnt+d.sum.bwdTrafficMatchedCnt, d.sum.fwdTrafficMatchedCnt, d.sum.bwdTrafficMatchedCnt)
fmt.Printf("Generate %d records in final result with %d of them from forward traffic and %d from backword.\n",
d.sum.fwdRecordCnt+ d.sum.bwdRecordCnt, d.sum.fwdRecordCnt, d.sum.bwdRecordCnt)
d.sum.fwdRecordCnt+d.sum.bwdRecordCnt, d.sum.fwdRecordCnt, d.sum.bwdRecordCnt)
}

View File

@ -1,8 +1,8 @@
package main
import (
"testing"
"sync"
"testing"
)
func TestDumperStatistic(t *testing.T) {
@ -17,7 +17,7 @@ func TestDumperStatistic(t *testing.T) {
wg.Wait()
d.Close()
sum:= d.Sum()
sum := d.Sum()
if (sum.wayCnt != 10) || (sum.nodeCnt != 20) || (sum.fwdRecordCnt != 30) || (sum.bwdRecordCnt != 40) || (sum.wayMatchedCnt != 50) || (sum.nodeMatchedCnt != 60) {
t.Error("TestDumperStatistic failed.\n")
@ -26,7 +26,6 @@ func TestDumperStatistic(t *testing.T) {
}
func accumulateDumper(d *dumperStatistic, wg *sync.WaitGroup) {
d.Update(1,2,3,4,5,6)
d.Update(1, 2, 3, 4, 5, 6, 7, 8)
wg.Done()
}

View File

@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"time"
)
var flags struct {
@ -27,6 +28,12 @@ const CACHEDOBJECTS = 4000000
func main() {
flag.Parse()
startTime := time.Now()
defer func() {
endTime := time.Now()
fmt.Printf("Total processing time %f seconds\n", endTime.Sub(startTime).Seconds())
}()
isFlowDoneChan := make(chan bool, 1)
wayid2speed := make(map[int64]int)
go getTrafficFlow(flags.ip, flags.port, wayid2speed, isFlowDoneChan)
@ -46,12 +53,12 @@ func main() {
}
}
func wait4PreConditions(flowChan <-chan bool) (bool) {
func wait4PreConditions(flowChan <-chan bool) bool {
var isFlowDone bool
loop:
loop:
for {
select {
case f := <- flowChan :
case f := <-flowChan:
if !f {
fmt.Printf("[ERROR] Communication with traffic server failed.\n")
break loop

View File

@ -1,14 +1,14 @@
package main
import (
"os"
"log"
"fmt"
"bufio"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
"strings"
"strconv"
)
var tasksWg sync.WaitGroup
@ -52,7 +52,7 @@ func wait4AllTasksFinished(sink chan string, ds *dumperStatistic) {
}
func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, ds *dumperStatistic) {
var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched uint64
var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched uint64
var err error
for str := range source {
elements := strings.Split(str, ",")
@ -75,6 +75,13 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
var nodes []string = elements[1:]
wayMatched += 1
nodeMatched += (uint64)(len(nodes))
if okFwd {
fwdTrafficMatched += 1
}
if okBwd {
bwdTrafficMatched += 1
}
for i := 0; (i + 1) < len(nodes); i++ {
var n1, n2 uint64
if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil {
@ -98,14 +105,14 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
}
}
ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched)
ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched)
tasksWg.Done()
}
// format
// if dir = true, means traffic for forward, generate: from, to, speed
// if dir = false, means this speed is for backward flow, generate: to, from, speed
func generateSingleRecord(from, to uint64, speed int, dir bool) (string){
func generateSingleRecord(from, to uint64, speed int, dir bool) string {
if dir {
return fmt.Sprintf("%d,%d,%d\n", from, to, speed)
} else {