feat: Implement logic to generate way-2-nodes mapping and convert traffic result to nodes-2-speed (#28)

* feat: Implement logic to generate way-2-nodes mapping and convert from way-2-speed to nodes-2-speed for OSRM

Issue: https://github.com/Telenav/osrm-backend/issues/22

* fix:Comments and typo in traffic generator.

* fix: Fix the issue of protocal and refine function names

* fix: Remove suffix of "100" for telenav wayids

* feat: Add unit test for generatespeedtable

* fix: Avoid upload test pbf into git

* feat: Handle traffic flow and add related unit test

* Update Readme

* fix: enable real function

(cherry picked from commit 81015b9977847ffe61c7e8793e1cecb229727a07)

* docs: fix a typo
This commit is contained in:
Xun(Perry) Liu 2019-06-27 20:10:54 -07:00 committed by Jay
parent f0ef92c87a
commit cbd192c52e
9 changed files with 396 additions and 58 deletions

View File

@ -28,10 +28,28 @@ $ ./bin/osrm_traffic_updater -h
Usage of ./bin/osrm_traffic_updater:
-c string
traffic proxy ip address (default "127.0.0.1")
-d use high precision speeds, i.e. decimal (default true)
-d use high precision speeds, i.e. decimal. (default false)
-f string
OSRM traffic csv file (default "traffic.csv")
-m string
OSRM way id to node ids mapping table (default "wayid2nodeids.csv")
-p int
traffic proxy listening port (default 6666)
```
# wayid2nodeids_extractor
Extract wayid to nodeids mapping from PBF
### Usage
```bash
$ cd $GOPATH
$ go install github.com/Telenav/osrm-backend/traffic_updater/go/wayid2nodeid_extractor
$ ./bin/wayid2nodeid_extractor -h
Usage of ./bin/wayid2nodeid_extractor:
-b Is pbf generated by telenav internally
-i string
Input pbf file.
-o string
Output csv file
```

View File

@ -0,0 +1,114 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
)
// todo:
// Write data into more compressed format(parquet)
// Statistic to avoid unmatched element
// Multiple go routine for convert()
func generateSpeedTable(wayid2speed map[uint64]int, way2nodeidsPath string, target string) {
startTime := time.Now()
// format is: wayid, nodeid, nodeid, nodeid...
source := make(chan string)
// format is fromid, toid, speed
sink := make(chan string)
go load(way2nodeidsPath, source)
go convert(source, sink, wayid2speed)
write(target, sink)
endTime := time.Now()
fmt.Printf("Processing time for generate speed table takes %f seconds\n", endTime.Sub(startTime).Seconds())
}
func load(mappingPath string, source chan<- string) {
defer close(source)
f, err := os.Open(mappingPath)
defer f.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open idsmapping file of %v failed.\n", mappingPath)
return
}
fmt.Printf("Open idsmapping file of %s succeed.\n", mappingPath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
source <- (scanner.Text())
}
}
func convert(source <-chan string, sink chan<- string, wayid2speed map[uint64]int) {
var err error
defer close(sink)
for str := range source {
elements := strings.Split(str, ",")
if len(elements) < 3 {
fmt.Printf("Invalid string %s in wayid2nodeids mapping file\n", str)
continue
}
var wayid uint64
if wayid, err = strconv.ParseUint(elements[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding wayid, row = %v\n", elements)
continue
}
if speed, ok := wayid2speed[wayid]; ok {
var nodes []string = elements[1:]
for i := 0; (i + 1) < len(nodes); i++ {
var n1, n2 uint64
if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil {
fmt.Printf("#Error during decoding nodeid, row = %v\n", elements)
continue
}
if n2, err = strconv.ParseUint(nodes[i+1], 10, 64); err != nil {
fmt.Printf("#Error during decoding nodeid, row = %v\n", elements)
continue
}
var s string
if speed >= 0 {
s = fmt.Sprintf("%d,%d,%d\n", n1, n2, speed)
} else {
s = fmt.Sprintf("%d,%d,%d\n", n2, n1, -speed)
}
sink <- s
}
}
}
}
func write(targetPath string, sink chan string) {
outfile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0755)
defer outfile.Close()
defer outfile.Sync()
if err != nil {
log.Fatal(err)
fmt.Printf("Open output file of %s failed.\n", targetPath)
return
}
fmt.Printf("Open output file of %s succeed.\n", targetPath)
w := bufio.NewWriter(outfile)
defer w.Flush()
for str := range sink {
_, err := w.WriteString(str)
if err != nil {
log.Fatal(err)
return
}
}
}

View File

@ -0,0 +1,72 @@
package main
import (
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"testing"
)
func TestGenerateSpeedTable(t *testing.T) {
wayid2speed := make(map[uint64]int)
loadMockTraffic("./testdata/mock-traffic.csv", wayid2speed)
generateSpeedTable(wayid2speed, "./testdata/id-mapping.csv", "./testdata/target.csv")
b1, err1 := ioutil.ReadFile("./testdata/target.csv")
if err1 != nil {
fmt.Print(err1)
}
str1 := string(b1)
b2, err2 := ioutil.ReadFile("./testdata/expect.csv")
if err2 != nil {
fmt.Print(err2)
}
str2 := string(b2)
if strings.Compare(str1, str2) != 0 {
t.Error("TestGenerateSpeedTable failed\n")
}
}
func loadMockTraffic(trafficPath string, wayid2speed map[uint64]int) {
// load mock traffic file
mockfile, err := os.Open(trafficPath)
defer mockfile.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open pbf file of %v failed.\n", trafficPath)
return
}
fmt.Printf("Open pbf file of %s succeed.\n", trafficPath)
csvr := csv.NewReader(mockfile)
for {
row, err := csvr.Read()
if err != nil {
if err == io.EOF {
err = nil
break
} else {
fmt.Printf("Error during decoding mock traffic, row = %v\n", err)
return
}
}
var wayid uint64
var speed int64
if wayid, err = strconv.ParseUint(row[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding wayid, row = %v\n", row)
}
if speed, err = strconv.ParseInt(row[1], 10, 32); err != nil {
fmt.Printf("#Error during decoding speed, row = %v\n", row)
}
wayid2speed[wayid] = (int)(speed)
}
}

View File

@ -1,13 +1,10 @@
package main
import (
"bufio"
"context"
"flag"
"fmt"
"os"
"strconv"
"time"
"github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy"
"github.com/apache/thrift/lib/go/thrift"
@ -16,6 +13,7 @@ import (
var flags struct {
port int
ip string
mappingFile string
csvFile string
highPrecision bool
}
@ -23,55 +21,18 @@ var flags struct {
func init() {
flag.IntVar(&flags.port, "p", 6666, "traffic proxy listening port")
flag.StringVar(&flags.ip, "c", "127.0.0.1", "traffic proxy ip address")
flag.StringVar(&flags.mappingFile, "m", "wayid2nodeids.csv", "OSRM way id to node ids mapping table")
flag.StringVar(&flags.csvFile, "f", "traffic.csv", "OSRM traffic csv file")
flag.BoolVar(&flags.highPrecision, "d", true, "use high precision speeds, i.e. decimal")
flag.BoolVar(&flags.highPrecision, "d", false, "use high precision speeds, i.e. decimal")
}
func dumpFlowsToCsv(csvFile string, flows []*proxy.Flow) {
if _, err := os.Stat(csvFile); err == nil {
// csvFile exists, remove it
rmErr := os.Remove(csvFile)
if rmErr != nil {
fmt.Println(rmErr)
return
func flows2map(flows []*proxy.Flow, m map[uint64]int) {
for _, flow := range flows {
wayid := (uint64)(flow.WayId)
m[wayid] = int(flow.Speed)
}
}
f, err := os.OpenFile(csvFile, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
fmt.Println(err)
return
}
defer f.Close()
writer := bufio.NewWriter(f)
for i, flow := range flows {
var osrmTrafficLine string
if flags.highPrecision {
osrmTrafficLine = fmt.Sprintf("%d,%d,%f\n", flow.FromId, flow.ToId, flow.Speed)
} else {
osrmTrafficLine = fmt.Sprintf("%d,%d,%d\n", flow.FromId, flow.ToId, int(flow.Speed))
}
// print first 10 lines for debug
if i < 10 {
fmt.Printf("[ %d ] %v\n", i, flow)
fmt.Printf("[ %d ] %s\n", i, osrmTrafficLine)
}
// write to csv
_, err := writer.WriteString(osrmTrafficLine)
if err != nil {
fmt.Println(err)
return
}
}
writer.Flush()
f.Sync()
fmt.Printf("total wrote to %s count: %d\n", csvFile, len(flows))
}
func main() {
flag.Parse()
@ -106,7 +67,6 @@ func main() {
client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol))
// get flows
startTime := time.Now()
fmt.Println("getting flows")
var defaultCtx = context.Background()
flows, err := client.GetAllFlows(defaultCtx)
@ -115,13 +75,9 @@ func main() {
return
}
fmt.Printf("got flows count: %d\n", len(flows))
afterGotFlowTime := time.Now()
fmt.Printf("get flows time used: %f seconds\n", afterGotFlowTime.Sub(startTime).Seconds())
// dump to csv
fmt.Println("dump flows to: " + flags.csvFile)
dumpFlowsToCsv(flags.csvFile, flows)
endTime := time.Now()
fmt.Printf("dump csv time used: %f seconds\n", endTime.Sub(afterGotFlowTime).Seconds())
wayid2speed := make(map[uint64]int)
flows2map(flows, wayid2speed)
generateSpeedTable(wayid2speed, flags.mappingFile, flags.csvFile)
}

View File

@ -0,0 +1,6 @@
84760891102,19496208102,81
84762609102,244183320001101,87
244183320001101,84762607102,87
84760849102,84760850102,47
84760858102,84760846102,59
847608599102,84760858102,59
1 84760891102 19496208102 81
2 84762609102 244183320001101 87
3 244183320001101 84762607102 87
4 84760849102 84760850102 47
5 84760858102 84760846102 59
6 847608599102 84760858102 59

View File

@ -0,0 +1,4 @@
24418325,84760891102,19496208102
24418332,84762609102,244183320001101,84762607102
24418343,84760849102,84760850102
24418344,84760846102,84760858102,847608599102
1 24418325,84760891102,19496208102
2 24418332,84762609102,244183320001101,84762607102
3 24418343,84760849102,84760850102
4 24418344,84760846102,84760858102,847608599102

View File

@ -0,0 +1,4 @@
24418325,81
24418332,87
24418343,47
24418344,-59
1 24418325 81
2 24418332 87
3 24418343 47
4 24418344 -59

View File

@ -0,0 +1,6 @@
84760891102,19496208102,81
84762609102,244183320001101,87
244183320001101,84762607102,87
84760849102,84760850102,47
84760858102,84760846102,59
847608599102,84760858102,59
1 84760891102 19496208102 81
2 84762609102 244183320001101 87
3 244183320001101 84762607102 87
4 84760849102 84760850102 47
5 84760858102 84760846102 59
6 847608599102 84760858102 59

View File

@ -0,0 +1,158 @@
package main
import (
"bufio"
"flag"
"fmt"
"io"
"log"
"math/rand"
"os"
"runtime"
"strconv"
"strings"
"time"
"github.com/qedus/osmpbf"
)
const telenavNavigableWaySuffix string = "100"
func generateWayid2nodeidsMapping(input, output string) {
infile, err := os.Open(input)
defer infile.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open pbf file of %v failed.\n", input)
return
}
fmt.Printf("Open pbf file of %s succeed.\n", input)
outfile, err := os.OpenFile(output, os.O_RDWR|os.O_CREATE, 0755)
defer outfile.Close()
defer outfile.Sync()
if err != nil {
log.Fatal(err)
fmt.Printf("Open output file of %s failed.\n", output)
return
}
fmt.Printf("Open output file of %s succeed.\n", output)
wayid2nodeids(infile, outfile)
}
func wayid2nodeids(infile io.Reader, outfile io.Writer) {
// Init extractor
extractor := osmpbf.NewDecoder(infile)
extractor.SetBufferSize(osmpbf.MaxBlobSize)
err := extractor.Start(runtime.GOMAXPROCS(-1))
if err != nil {
log.Fatal(err)
return
}
// Init loader
loader := bufio.NewWriter(outfile)
defer loader.Flush()
var wc, nc uint32
for {
if v, err := extractor.Decode(); err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
} else {
switch v := v.(type) {
case *osmpbf.Node:
case *osmpbf.Way:
wayid := strconv.FormatUint((uint64)(v.ID), 10)
if !isNavigableWay(wayid) {
continue
}
wayid = trimNavigableWaySuffix(wayid)
// Transform
str := convertWayObj2IdMappingString(v, wayid)
//str := convertWayObj2MockSpeed(v, wayid)
_, err := loader.WriteString(str)
if err != nil {
log.Fatal(err)
return
}
wc++
nc += (uint32)(len(v.NodeIDs))
case *osmpbf.Relation:
default:
log.Fatalf("unknown type %T\n", v)
}
}
}
fmt.Printf("Total ways: %d, total nodes: %d\n", wc, nc)
}
// This optimization is for telenav internal pbf only
// User need manully set flags.istelenavpbf = true to enable this
// Telenav PBF add "100" for all navigable ways
func isNavigableWay(wayid string) bool {
if flags.istelenavpbf {
if strings.HasSuffix(wayid, telenavNavigableWaySuffix) {
return true
} else {
return false
}
} else {
return true
}
}
// Remove "100" suffix for telenav ways, other components such as telenav traffic
// already remove it
func trimNavigableWaySuffix(wayid string) string {
if flags.istelenavpbf {
return strings.TrimSuffix(wayid, telenavNavigableWaySuffix)
}
return wayid
}
func convertWayObj2IdMappingString(v *osmpbf.Way, wayid string) string {
// format: wayid,nodeid1,nodeid2, ...
return wayid + "," +
strings.Trim(strings.Join(strings.Fields(fmt.Sprint(v.NodeIDs)), ","), "[]") +
"\n"
}
func convertWayObj2MockSpeed(v *osmpbf.Way, wayid string) string {
// format: wayid,random speed
return wayid + "," +
strconv.Itoa(rand.Intn(100)) +
"\n"
}
var flags struct {
input string
output string
istelenavpbf bool
}
func init() {
flag.StringVar(&flags.input, "i", "", "Input pbf file.")
flag.StringVar(&flags.output, "o", "", "Output csv file")
flag.BoolVar(&flags.istelenavpbf, "b", false, "Is pbf generated by telenav internally")
}
func main() {
flag.Parse()
if len(flags.input) == 0 || len(flags.output) == 0 {
fmt.Printf("[ERROR]Input or Output file path is empty.\n")
return
}
startTime := time.Now()
generateWayid2nodeidsMapping(flags.input, flags.output)
endTime := time.Now()
fmt.Printf("Total processing time for wayid2nodeids-extract takes %f seconds\n", endTime.Sub(startTime).Seconds())
}