From c613b1737c29e42a9aff3400c2ca42f1363a2899 Mon Sep 17 00:00:00 2001 From: "Xun(Perry) Liu" Date: Thu, 18 Jul 2019 18:32:33 -0700 Subject: [PATCH] Feature/traffic updater statistic (#44) * feat: implement statistic for osrm traffic dumper. issues: https://github.com/Telenav/osrm-backend/issues/43 * feat: Implement statistic for loading traffic map. issues: https://github.com/Telenav/osrm-backend/issues/43 * fix: Experiment to use channel to collect statistic information. issues: https://github.com/Telenav/osrm-backend/issues/43 --- .../osrm_traffic_updater/dumper_statistic.go | 70 +++++++++++++++++++ .../dumper_statistic_test.go | 32 +++++++++ .../get_telenav_traffic.go | 9 +++ .../osrm_traffic_updater.go | 7 +- .../osrm_traffic_updater_test.go | 2 + .../speed_table_dumper.go | 27 ++++--- .../speed_table_dumper_test.go | 13 +++- 7 files changed, 149 insertions(+), 11 deletions(-) create mode 100644 traffic_updater/go/osrm_traffic_updater/dumper_statistic.go create mode 100644 traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go diff --git a/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go b/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go new file mode 100644 index 000000000..b59ad43f6 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/dumper_statistic.go @@ -0,0 +1,70 @@ +package main + +import ( + "fmt" +) + +type dumperStatisticItems struct { + wayCnt uint64 + nodeCnt uint64 + fwdRecordCnt uint64 + bwdRecordCnt uint64 + wayMatchedCnt uint64 + nodeMatchedCnt uint64 +} + +type dumperStatistic struct { + c chan dumperStatisticItems + sum dumperStatisticItems + init bool + close bool +} + +func (d* dumperStatistic) Init(n int) { + d.c = make(chan dumperStatisticItems, n) + d.init = true +} + +func (d* dumperStatistic) Close() { + if !d.init { + return + } + close(d.c) + for item := range d.c { + d.sum.wayCnt += item.wayCnt + d.sum.nodeCnt += item.nodeCnt + d.sum.fwdRecordCnt += item.fwdRecordCnt + d.sum.bwdRecordCnt += item.bwdRecordCnt + d.sum.wayMatchedCnt += item.wayMatchedCnt + d.sum.nodeMatchedCnt += item.nodeMatchedCnt + } + d.close = true +} + +func (d* dumperStatistic) Sum() (dumperStatisticItems) { + return d.sum +} + +func (d* dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64, + bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64) { + if !d.init { + fmt.Printf("dumperStatistic->Update() failed, please call Init() first otherwise will block all functions. \n") + return + } + d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt}) +} + +func (d* dumperStatistic) Output() { + if !d.close { + fmt.Printf("Close() hasn't been called, no statistic collected.\n") + return + } + + fmt.Printf("Statistic: \n") + fmt.Printf("Load %d way from data with %d nodes.\n", d.sum.wayCnt, d.sum.nodeCnt) + fmt.Printf("%d way with %d nodes matched with traffic record.\n", + d.sum.wayMatchedCnt, d.sum.nodeMatchedCnt) + fmt.Printf("Generate %d records in final result with %d of them from forward traffic and %d from backword.\n", + d.sum.fwdRecordCnt+ d.sum.bwdRecordCnt, d.sum.fwdRecordCnt, d.sum.bwdRecordCnt) +} + diff --git a/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go b/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go new file mode 100644 index 000000000..b40635c24 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/dumper_statistic_test.go @@ -0,0 +1,32 @@ +package main + +import ( + "testing" + "sync" +) + +func TestDumperStatistic(t *testing.T) { + var d dumperStatistic + var wg sync.WaitGroup + + wg.Add(10) + d.Init(10) + for i := 0; i < 10; i++ { + go accumulateDumper(&d, &wg) + } + wg.Wait() + d.Close() + + sum:= d.Sum() + + if (sum.wayCnt != 10) || (sum.nodeCnt != 20) || (sum.fwdRecordCnt != 30) || (sum.bwdRecordCnt != 40) || (sum.wayMatchedCnt != 50) || (sum.nodeMatchedCnt != 60) { + t.Error("TestDumperStatistic failed.\n") + } + +} + +func accumulateDumper(d *dumperStatistic, wg *sync.WaitGroup) { + d.Update(1,2,3,4,5,6) + wg.Done() +} + diff --git a/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go b/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go index 183c9fa0b..09df3ed0c 100644 --- a/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go +++ b/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go @@ -70,8 +70,17 @@ func getTrafficFlow(ip string, port int, m map[int64]int, c chan<- bool) { } func flows2map(flows []*proxy.Flow, m map[int64]int) { + var fwdCnt, bwdCnt uint64 for _, flow := range flows { wayid := flow.WayId m[wayid] = int(flow.Speed) + + if wayid > 0 { + fwdCnt++ + } else { + bwdCnt++ + } } + + fmt.Printf("Load map[wayid] to speed with %d items, %d forward and %d backward.\n", (fwdCnt+bwdCnt), fwdCnt, bwdCnt) } diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go index 33e2cd203..e03323477 100644 --- a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go @@ -22,7 +22,7 @@ func init() { } const TASKNUM = 128 -const CACHEDOBJECTS = 1000000 +const CACHEDOBJECTS = 4000000 func main() { flag.Parse() @@ -39,7 +39,10 @@ func main() { isFlowDone := wait4PreConditions(isFlowDoneChan) if isFlowDone { - dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile) + var ds dumperStatistic + ds.Init(TASKNUM) + dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile, &ds) + ds.Output() } } diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go index a3348aa36..6abe3b0ea 100644 --- a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go @@ -2,6 +2,7 @@ package main import ( "testing" + "fmt" ) func setChan2True(c chan<- bool) { @@ -24,6 +25,7 @@ func TestWait4AllPreconditions1(t *testing.T) { } func TestWait4AllPreconditions2(t *testing.T) { + fmt.Printf("Expect to see: [ERROR] Communication with traffic server failed.\n") c2 := make(chan bool, 1) go setChan2False(c2) diff --git a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go index 0a75b3c95..e6eaf125d 100644 --- a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go +++ b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go @@ -14,7 +14,8 @@ import ( var tasksWg sync.WaitGroup var dumpFinishedWg sync.WaitGroup -func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, outputPath string) { +func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, + outputPath string, ds *dumperStatistic) { startTime := time.Now() if len(wayid2speed) == 0 { @@ -22,18 +23,19 @@ func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan s } sink := make(chan string) - startTasks(wayid2speed, sources, sink) + startTasks(wayid2speed, sources, sink, ds) startDump(outputPath, sink) - wait4AllTasksFinished(sink) + wait4AllTasksFinished(sink, ds) endTime := time.Now() fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds()) } -func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, sink chan<- string) { +func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, + sink chan<- string, ds *dumperStatistic) { tasksWg.Add(TASKNUM) for i := 0; i < TASKNUM; i++ { - go task(wayid2speed, sources[i], sink) + go task(wayid2speed, sources[i], sink, ds) } } @@ -42,16 +44,20 @@ func startDump(outputPath string, sink <-chan string) { go write(outputPath, sink) } -func wait4AllTasksFinished(sink chan string) { +func wait4AllTasksFinished(sink chan string, ds *dumperStatistic) { tasksWg.Wait() close(sink) + ds.Close() dumpFinishedWg.Wait() } -func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) { +func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, ds *dumperStatistic) { + var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched uint64 var err error for str := range source { elements := strings.Split(str, ",") + wayCnt += 1 + nodeCnt += (uint64)(len(elements) - 1) if len(elements) < 3 { continue } @@ -67,6 +73,8 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) { if okFwd || okBwd { var nodes []string = elements[1:] + wayMatched += 1 + nodeMatched += (uint64)(len(nodes)) for i := 0; (i + 1) < len(nodes); i++ { var n1, n2 uint64 if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil { @@ -78,16 +86,19 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) { continue } if okFwd { + fwdRecordCnt += 1 sink <- generateSingleRecord(n1, n2, speedFwd, true) } if okBwd { + bwdRecordCnt += 1 sink <- generateSingleRecord(n1, n2, speedBwd, false) } } } - } + + ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched) tasksWg.Done() } diff --git a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go index 48dd6188f..5d70bb6ab 100644 --- a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go +++ b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go @@ -29,9 +29,12 @@ func TestSpeedTableDumper1(t *testing.T) { wayid2speed := make(map[int64]int) flows2map(flows, wayid2speed) - dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv") + var ds dumperStatistic + ds.Init(TASKNUM) + dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv", &ds) compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t) + validateStatistic(&ds, t) } func TestGenerateSingleRecord1(t *testing.T) { @@ -48,6 +51,14 @@ func TestGenerateSingleRecord2(t *testing.T) { } } + +func validateStatistic(ds *dumperStatistic, t *testing.T) { + sum := ds.Sum() + if (sum.wayCnt != 4) || (sum.nodeCnt != 9) || (sum.fwdRecordCnt != 4) || (sum.bwdRecordCnt != 3) || (sum.wayMatchedCnt != 4) || (sum.nodeMatchedCnt != 9) { + t.Error("TestLoadWay2Nodeids failed with incorrect statistic.\n") + } +} + func loadMockTraffic(trafficPath string, flows []*proxy.Flow) []*proxy.Flow { // load mock traffic file mockfile, err := os.Open(trafficPath)