Feature/traffic updater statistic (#44)

* feat: implement statistic for osrm traffic dumper.
issues: https://github.com/Telenav/osrm-backend/issues/43

* feat: Implement statistic for loading traffic map.
issues: https://github.com/Telenav/osrm-backend/issues/43

* fix: Experiment to use channel to collect statistic information.
issues: https://github.com/Telenav/osrm-backend/issues/43
This commit is contained in:
Xun(Perry) Liu 2019-07-18 18:32:33 -07:00 committed by Jay
parent bb147f369b
commit c613b1737c
7 changed files with 149 additions and 11 deletions

View File

@ -0,0 +1,70 @@
package main
import (
"fmt"
)
type dumperStatisticItems struct {
wayCnt uint64
nodeCnt uint64
fwdRecordCnt uint64
bwdRecordCnt uint64
wayMatchedCnt uint64
nodeMatchedCnt uint64
}
type dumperStatistic struct {
c chan dumperStatisticItems
sum dumperStatisticItems
init bool
close bool
}
func (d* dumperStatistic) Init(n int) {
d.c = make(chan dumperStatisticItems, n)
d.init = true
}
func (d* dumperStatistic) Close() {
if !d.init {
return
}
close(d.c)
for item := range d.c {
d.sum.wayCnt += item.wayCnt
d.sum.nodeCnt += item.nodeCnt
d.sum.fwdRecordCnt += item.fwdRecordCnt
d.sum.bwdRecordCnt += item.bwdRecordCnt
d.sum.wayMatchedCnt += item.wayMatchedCnt
d.sum.nodeMatchedCnt += item.nodeMatchedCnt
}
d.close = true
}
func (d* dumperStatistic) Sum() (dumperStatisticItems) {
return d.sum
}
func (d* dumperStatistic) Update(wayCnt uint64, nodeCnt uint64, fwdRecordCnt uint64,
bwdRecordCnt uint64, wayMatchedCnt uint64, nodeMatchedCnt uint64) {
if !d.init {
fmt.Printf("dumperStatistic->Update() failed, please call Init() first otherwise will block all functions. \n")
return
}
d.c <- (dumperStatisticItems{wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatchedCnt, nodeMatchedCnt})
}
func (d* dumperStatistic) Output() {
if !d.close {
fmt.Printf("Close() hasn't been called, no statistic collected.\n")
return
}
fmt.Printf("Statistic: \n")
fmt.Printf("Load %d way from data with %d nodes.\n", d.sum.wayCnt, d.sum.nodeCnt)
fmt.Printf("%d way with %d nodes matched with traffic record.\n",
d.sum.wayMatchedCnt, d.sum.nodeMatchedCnt)
fmt.Printf("Generate %d records in final result with %d of them from forward traffic and %d from backword.\n",
d.sum.fwdRecordCnt+ d.sum.bwdRecordCnt, d.sum.fwdRecordCnt, d.sum.bwdRecordCnt)
}

View File

@ -0,0 +1,32 @@
package main
import (
"testing"
"sync"
)
func TestDumperStatistic(t *testing.T) {
var d dumperStatistic
var wg sync.WaitGroup
wg.Add(10)
d.Init(10)
for i := 0; i < 10; i++ {
go accumulateDumper(&d, &wg)
}
wg.Wait()
d.Close()
sum:= d.Sum()
if (sum.wayCnt != 10) || (sum.nodeCnt != 20) || (sum.fwdRecordCnt != 30) || (sum.bwdRecordCnt != 40) || (sum.wayMatchedCnt != 50) || (sum.nodeMatchedCnt != 60) {
t.Error("TestDumperStatistic failed.\n")
}
}
func accumulateDumper(d *dumperStatistic, wg *sync.WaitGroup) {
d.Update(1,2,3,4,5,6)
wg.Done()
}

View File

@ -70,8 +70,17 @@ func getTrafficFlow(ip string, port int, m map[int64]int, c chan<- bool) {
} }
func flows2map(flows []*proxy.Flow, m map[int64]int) { func flows2map(flows []*proxy.Flow, m map[int64]int) {
var fwdCnt, bwdCnt uint64
for _, flow := range flows { for _, flow := range flows {
wayid := flow.WayId wayid := flow.WayId
m[wayid] = int(flow.Speed) m[wayid] = int(flow.Speed)
if wayid > 0 {
fwdCnt++
} else {
bwdCnt++
}
} }
fmt.Printf("Load map[wayid] to speed with %d items, %d forward and %d backward.\n", (fwdCnt+bwdCnt), fwdCnt, bwdCnt)
} }

View File

@ -22,7 +22,7 @@ func init() {
} }
const TASKNUM = 128 const TASKNUM = 128
const CACHEDOBJECTS = 1000000 const CACHEDOBJECTS = 4000000
func main() { func main() {
flag.Parse() flag.Parse()
@ -39,7 +39,10 @@ func main() {
isFlowDone := wait4PreConditions(isFlowDoneChan) isFlowDone := wait4PreConditions(isFlowDoneChan)
if isFlowDone { if isFlowDone {
dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile) var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile, &ds)
ds.Output()
} }
} }

View File

@ -2,6 +2,7 @@ package main
import ( import (
"testing" "testing"
"fmt"
) )
func setChan2True(c chan<- bool) { func setChan2True(c chan<- bool) {
@ -24,6 +25,7 @@ func TestWait4AllPreconditions1(t *testing.T) {
} }
func TestWait4AllPreconditions2(t *testing.T) { func TestWait4AllPreconditions2(t *testing.T) {
fmt.Printf("Expect to see: [ERROR] Communication with traffic server failed.\n")
c2 := make(chan bool, 1) c2 := make(chan bool, 1)
go setChan2False(c2) go setChan2False(c2)

View File

@ -14,7 +14,8 @@ import (
var tasksWg sync.WaitGroup var tasksWg sync.WaitGroup
var dumpFinishedWg sync.WaitGroup var dumpFinishedWg sync.WaitGroup
func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, outputPath string) { func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string,
outputPath string, ds *dumperStatistic) {
startTime := time.Now() startTime := time.Now()
if len(wayid2speed) == 0 { if len(wayid2speed) == 0 {
@ -22,18 +23,19 @@ func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan s
} }
sink := make(chan string) sink := make(chan string)
startTasks(wayid2speed, sources, sink) startTasks(wayid2speed, sources, sink, ds)
startDump(outputPath, sink) startDump(outputPath, sink)
wait4AllTasksFinished(sink) wait4AllTasksFinished(sink, ds)
endTime := time.Now() endTime := time.Now()
fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds()) fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds())
} }
func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, sink chan<- string) { func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string,
sink chan<- string, ds *dumperStatistic) {
tasksWg.Add(TASKNUM) tasksWg.Add(TASKNUM)
for i := 0; i < TASKNUM; i++ { for i := 0; i < TASKNUM; i++ {
go task(wayid2speed, sources[i], sink) go task(wayid2speed, sources[i], sink, ds)
} }
} }
@ -42,16 +44,20 @@ func startDump(outputPath string, sink <-chan string) {
go write(outputPath, sink) go write(outputPath, sink)
} }
func wait4AllTasksFinished(sink chan string) { func wait4AllTasksFinished(sink chan string, ds *dumperStatistic) {
tasksWg.Wait() tasksWg.Wait()
close(sink) close(sink)
ds.Close()
dumpFinishedWg.Wait() dumpFinishedWg.Wait()
} }
func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) { func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, ds *dumperStatistic) {
var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched uint64
var err error var err error
for str := range source { for str := range source {
elements := strings.Split(str, ",") elements := strings.Split(str, ",")
wayCnt += 1
nodeCnt += (uint64)(len(elements) - 1)
if len(elements) < 3 { if len(elements) < 3 {
continue continue
} }
@ -67,6 +73,8 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) {
if okFwd || okBwd { if okFwd || okBwd {
var nodes []string = elements[1:] var nodes []string = elements[1:]
wayMatched += 1
nodeMatched += (uint64)(len(nodes))
for i := 0; (i + 1) < len(nodes); i++ { for i := 0; (i + 1) < len(nodes); i++ {
var n1, n2 uint64 var n1, n2 uint64
if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil { if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil {
@ -78,16 +86,19 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) {
continue continue
} }
if okFwd { if okFwd {
fwdRecordCnt += 1
sink <- generateSingleRecord(n1, n2, speedFwd, true) sink <- generateSingleRecord(n1, n2, speedFwd, true)
} }
if okBwd { if okBwd {
bwdRecordCnt += 1
sink <- generateSingleRecord(n1, n2, speedBwd, false) sink <- generateSingleRecord(n1, n2, speedBwd, false)
} }
} }
} }
} }
ds.Update(wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched)
tasksWg.Done() tasksWg.Done()
} }

View File

@ -29,9 +29,12 @@ func TestSpeedTableDumper1(t *testing.T) {
wayid2speed := make(map[int64]int) wayid2speed := make(map[int64]int)
flows2map(flows, wayid2speed) flows2map(flows, wayid2speed)
dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv") var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv", &ds)
compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t) compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t)
validateStatistic(&ds, t)
} }
func TestGenerateSingleRecord1(t *testing.T) { func TestGenerateSingleRecord1(t *testing.T) {
@ -48,6 +51,14 @@ func TestGenerateSingleRecord2(t *testing.T) {
} }
} }
func validateStatistic(ds *dumperStatistic, t *testing.T) {
sum := ds.Sum()
if (sum.wayCnt != 4) || (sum.nodeCnt != 9) || (sum.fwdRecordCnt != 4) || (sum.bwdRecordCnt != 3) || (sum.wayMatchedCnt != 4) || (sum.nodeMatchedCnt != 9) {
t.Error("TestLoadWay2Nodeids failed with incorrect statistic.\n")
}
}
func loadMockTraffic(trafficPath string, flows []*proxy.Flow) []*proxy.Flow { func loadMockTraffic(trafficPath string, flows []*proxy.Flow) []*proxy.Flow {
// load mock traffic file // load mock traffic file
mockfile, err := os.Open(trafficPath) mockfile, err := os.Open(trafficPath)