From a247e34ea4851c4e996efc6b92d434504ae5e951 Mon Sep 17 00:00:00 2001 From: Jay Date: Sat, 20 Jul 2019 01:22:25 +0800 Subject: [PATCH] Add traffic records matched statistics and total processing time (#46) * feat: statistics fwd and bwd traffic matched * fix: missed line wrap * feat: statistics osrm_traffic_updater processing time --- .../osrm_traffic_updater/dumper_statistic.go | 48 +++++++++++-------- .../dumper_statistic_test.go | 11 ++--- .../osrm_traffic_updater.go | 27 +++++++---- .../speed_table_dumper.go | 31 +++++++----- 4 files changed, 68 insertions(+), 49 deletions(-) diff --git a/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go b/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go index b59ad43f6..a3064a27a 100644 --- a/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go +++ b/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go @@ -5,27 +5,29 @@ import ( ) type dumperStatisticItems struct { - wayCnt uint64 - nodeCnt uint64 - fwdRecordCnt uint64 - bwdRecordCnt uint64 - wayMatchedCnt uint64 - nodeMatchedCnt uint64 + wayCnt uint64 + nodeCnt uint64 + fwdRecordCnt uint64 + bwdRecordCnt uint64 + wayMatchedCnt uint64 + nodeMatchedCnt uint64 + fwdTrafficMatchedCnt uint64 + bwdTrafficMatchedCnt uint64 } type dumperStatistic struct { - c chan dumperStatisticItems - sum dumperStatisticItems - init bool - close bool + c chan dumperStatisticItems + sum dumperStatisticItems + init bool + close bool } -func (d* dumperStatistic) Init(n int) { +func (d *dumperStatistic) Init(n int) { d.c = make(chan dumperStatisticItems, n) d.init = true } -func (d* dumperStatistic) Close() { +func (d *dumperStatistic) Close() { if !d.init { return } @@ -37,24 +39,27 @@ func (d* dumperStatistic) Close() { d.sum.bwdRecordCnt += item.bwdRecordCnt d.sum.wayMatchedCnt += item.wayMatchedCnt d.sum.nodeMatchedCnt += item.nodeMatchedCnt + d.sum.fwdTrafficMatchedCnt += item.fwdTrafficMatchedCnt + d.sum.bwdTrafficMatchedCnt += item.bwdTrafficMatchedCnt } d.close = true } -func (d* dumperStatistic) Sum() (dumperStatisticItems) { +func (d *dumperStatistic) Sum() dumperStatisticItems { return d.sum } -func (d* dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64, - bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64) { +func (d *dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64, + bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64, + fwdTrafficMatchedCnt uint64, bwdTrafficMatchedCnt uint64) { if !d.init { fmt.Printf("dumperStatistic->Update() failed, please call Init() first otherwise will block all functions. \n") return } - d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt}) + d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt, fwdTrafficMatchedCnt, bwdTrafficMatchedCnt}) } -func (d* dumperStatistic) Output() { +func (d *dumperStatistic) Output() { if !d.close { fmt.Printf("Close() hasn't been called, no statistic collected.\n") return @@ -62,9 +67,10 @@ func (d* dumperStatistic) Output() { fmt.Printf("Statistic: \n") fmt.Printf("Load %d way from data with %d nodes.\n", d.sum.wayCnt, d.sum.nodeCnt) - fmt.Printf("%d way with %d nodes matched with traffic record.\n", + fmt.Printf("%d way with %d nodes matched with traffic record.\n", d.sum.wayMatchedCnt, d.sum.nodeMatchedCnt) - fmt.Printf("Generate %d records in final result with %d of them from forward traffic and %d from backword.\n", - d.sum.fwdRecordCnt+ d.sum.bwdRecordCnt, d.sum.fwdRecordCnt, d.sum.bwdRecordCnt) + fmt.Printf("%d traffic records(%d forward and %d backward) have been matched.\n", + d.sum.fwdTrafficMatchedCnt+d.sum.bwdTrafficMatchedCnt, d.sum.fwdTrafficMatchedCnt, d.sum.bwdTrafficMatchedCnt) + fmt.Printf("Generate %d records in final result with %d of them from forward traffic and %d from backword.\n", + d.sum.fwdRecordCnt+d.sum.bwdRecordCnt, d.sum.fwdRecordCnt, d.sum.bwdRecordCnt) } - diff --git a/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go b/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go index b40635c24..5d97fb5ba 100644 --- a/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go +++ b/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go @@ -1,8 +1,8 @@ package main import ( - "testing" "sync" + "testing" ) func TestDumperStatistic(t *testing.T) { @@ -17,16 +17,15 @@ func TestDumperStatistic(t *testing.T) { wg.Wait() d.Close() - sum:= d.Sum() + sum := d.Sum() if (sum.wayCnt != 10) || (sum.nodeCnt != 20) || (sum.fwdRecordCnt != 30) || (sum.bwdRecordCnt != 40) || (sum.wayMatchedCnt != 50) || (sum.nodeMatchedCnt != 60) { - t.Error("TestDumperStatistic failed.\n") - } + t.Error("TestDumperStatistic failed.\n") + } } func accumulateDumper(d *dumperStatistic, wg *sync.WaitGroup) { - d.Update(1,2,3,4,5,6) + d.Update(1, 2, 3, 4, 5, 6, 7, 8) wg.Done() } - diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go index e03323477..e4ba4bc80 100644 --- a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "time" ) var flags struct { @@ -27,6 +28,12 @@ const CACHEDOBJECTS = 4000000 func main() { flag.Parse() + startTime := time.Now() + defer func() { + endTime := time.Now() + fmt.Printf("Total processing time %f seconds\n", endTime.Sub(startTime).Seconds()) + }() + isFlowDoneChan := make(chan bool, 1) wayid2speed := make(map[int64]int) go getTrafficFlow(flags.ip, flags.port, wayid2speed, isFlowDoneChan) @@ -46,19 +53,19 @@ func main() { } } -func wait4PreConditions(flowChan <-chan bool) (bool) { +func wait4PreConditions(flowChan <-chan bool) bool { var isFlowDone bool - loop: +loop: for { select { - case f := <- flowChan : - if !f { - fmt.Printf("[ERROR] Communication with traffic server failed.\n") - break loop - } else { - isFlowDone = true - break loop - } + case f := <-flowChan: + if !f { + fmt.Printf("[ERROR] Communication with traffic server failed.\n") + break loop + } else { + isFlowDone = true + break loop + } } } return isFlowDone diff --git a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go index e6eaf125d..8145fc1c5 100644 --- a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go +++ b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go @@ -1,20 +1,20 @@ package main import ( - "os" - "log" - "fmt" "bufio" + "fmt" + "log" + "os" + "strconv" + "strings" "sync" "time" - "strings" - "strconv" ) var tasksWg sync.WaitGroup var dumpFinishedWg sync.WaitGroup -func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, +func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, outputPath string, ds *dumperStatistic) { startTime := time.Now() @@ -31,7 +31,7 @@ func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan s fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds()) } -func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, +func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, sink chan<- string, ds *dumperStatistic) { tasksWg.Add(TASKNUM) for i := 0; i < TASKNUM; i++ { @@ -52,7 +52,7 @@ func wait4AllTasksFinished(sink chan string, ds *dumperStatistic) { } func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, ds *dumperStatistic) { - var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched uint64 + var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched uint64 var err error for str := range source { elements := strings.Split(str, ",") @@ -75,6 +75,13 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d var nodes []string = elements[1:] wayMatched += 1 nodeMatched += (uint64)(len(nodes)) + if okFwd { + fwdTrafficMatched += 1 + } + if okBwd { + bwdTrafficMatched += 1 + } + for i := 0; (i + 1) < len(nodes); i++ { var n1, n2 uint64 if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil { @@ -93,19 +100,19 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d bwdRecordCnt += 1 sink <- generateSingleRecord(n1, n2, speedBwd, false) } - + } } } - ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched) - tasksWg.Done() + ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched) + tasksWg.Done() } // format // if dir = true, means traffic for forward, generate: from, to, speed // if dir = false, means this speed is for backward flow, generate: to, from, speed -func generateSingleRecord(from, to uint64, speed int, dir bool) (string){ +func generateSingleRecord(from, to uint64, speed int, dir bool) string { if dir { return fmt.Sprintf("%d,%d,%d\n", from, to, speed) } else {