From cbd192c52e85c9012693304230f0c3d2c6af15aa Mon Sep 17 00:00:00 2001 From: "Xun(Perry) Liu" Date: Thu, 27 Jun 2019 20:10:54 -0700 Subject: [PATCH] feat: Implement logic to generate way-2-nodes mapping and convert traffic result to nodes-2-speed (#28) * feat: Implement logic to generate way-2-nodes mapping and convert from way-2-speed to nodes-2-speed for OSRM Issue: https://github.com/Telenav/osrm-backend/issues/22 * fix:Comments and typo in traffic generator. * fix: Fix the issue of protocal and refine function names * fix: Remove suffix of "100" for telenav wayids * feat: Add unit test for generatespeedtable * fix: Avoid upload test pbf into git * feat: Handle traffic flow and add related unit test * Update Readme * fix: enable real function (cherry picked from commit 81015b9977847ffe61c7e8793e1cecb229727a07) * docs: fix a typo --- traffic_updater/README.md | 26 ++- .../osrm_speedtable_generator.go | 114 +++++++++++++ .../osrm_speedtable_generator_test.go | 72 ++++++++ .../osrm_traffic_updater.go | 64 ++----- .../osrm_traffic_updater/testdata/expect.csv | 6 + .../testdata/id-mapping.csv | 4 + .../testdata/mock-traffic.csv | 4 + .../osrm_traffic_updater/testdata/target.csv | 6 + .../wayid2nodeids_extractor.go | 158 ++++++++++++++++++ 9 files changed, 396 insertions(+), 58 deletions(-) create mode 100644 traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go create mode 100644 traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go create mode 100755 traffic_updater/go/osrm_traffic_updater/testdata/expect.csv create mode 100755 traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv create mode 100755 traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv create mode 100755 traffic_updater/go/osrm_traffic_updater/testdata/target.csv create mode 100644 traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go diff --git a/traffic_updater/README.md b/traffic_updater/README.md index 82106fb5a..5d3ce8d46 100644 --- a/traffic_updater/README.md +++ b/traffic_updater/README.md @@ -27,11 +27,29 @@ $ go install github.com/Telenav/osrm-backend/traffic_updater/go/osrm_traffic_upd $ ./bin/osrm_traffic_updater -h Usage of ./bin/osrm_traffic_updater: -c string - traffic proxy ip address (default "127.0.0.1") - -d use high precision speeds, i.e. decimal (default true) + traffic proxy ip address (default "127.0.0.1") + -d use high precision speeds, i.e. decimal. (default false) -f string - OSRM traffic csv file (default "traffic.csv") + OSRM traffic csv file (default "traffic.csv") + -m string + OSRM way id to node ids mapping table (default "wayid2nodeids.csv") -p int - traffic proxy listening port (default 6666) + traffic proxy listening port (default 6666) +``` + +# wayid2nodeids_extractor +Extract wayid to nodeids mapping from PBF + +### Usage +```bash +$ cd $GOPATH +$ go install github.com/Telenav/osrm-backend/traffic_updater/go/wayid2nodeid_extractor +$ ./bin/wayid2nodeid_extractor -h +Usage of ./bin/wayid2nodeid_extractor: + -b Is pbf generated by telenav internally + -i string + Input pbf file. + -o string + Output csv file ``` diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go b/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go new file mode 100644 index 000000000..3dc5d54c5 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go @@ -0,0 +1,114 @@ +package main + +import ( + "bufio" + "fmt" + "log" + "os" + "strconv" + "strings" + "time" +) + +// todo: +// Write data into more compressed format(parquet) +// Statistic to avoid unmatched element +// Multiple go routine for convert() +func generateSpeedTable(wayid2speed map[uint64]int, way2nodeidsPath string, target string) { + startTime := time.Now() + + // format is: wayid, nodeid, nodeid, nodeid... + source := make(chan string) + // format is fromid, toid, speed + sink := make(chan string) + + go load(way2nodeidsPath, source) + go convert(source, sink, wayid2speed) + write(target, sink) + + endTime := time.Now() + fmt.Printf("Processing time for generate speed table takes %f seconds\n", endTime.Sub(startTime).Seconds()) +} + +func load(mappingPath string, source chan<- string) { + defer close(source) + + f, err := os.Open(mappingPath) + defer f.Close() + if err != nil { + log.Fatal(err) + fmt.Printf("Open idsmapping file of %v failed.\n", mappingPath) + return + } + fmt.Printf("Open idsmapping file of %s succeed.\n", mappingPath) + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + source <- (scanner.Text()) + } +} + +func convert(source <-chan string, sink chan<- string, wayid2speed map[uint64]int) { + var err error + defer close(sink) + + for str := range source { + elements := strings.Split(str, ",") + if len(elements) < 3 { + fmt.Printf("Invalid string %s in wayid2nodeids mapping file\n", str) + continue + } + + var wayid uint64 + if wayid, err = strconv.ParseUint(elements[0], 10, 64); err != nil { + fmt.Printf("#Error during decoding wayid, row = %v\n", elements) + continue + } + + if speed, ok := wayid2speed[wayid]; ok { + var nodes []string = elements[1:] + for i := 0; (i + 1) < len(nodes); i++ { + var n1, n2 uint64 + if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil { + fmt.Printf("#Error during decoding nodeid, row = %v\n", elements) + continue + } + if n2, err = strconv.ParseUint(nodes[i+1], 10, 64); err != nil { + fmt.Printf("#Error during decoding nodeid, row = %v\n", elements) + continue + } + + var s string + if speed >= 0 { + s = fmt.Sprintf("%d,%d,%d\n", n1, n2, speed) + } else { + s = fmt.Sprintf("%d,%d,%d\n", n2, n1, -speed) + } + + sink <- s + } + } + } +} + +func write(targetPath string, sink chan string) { + outfile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0755) + defer outfile.Close() + defer outfile.Sync() + if err != nil { + log.Fatal(err) + fmt.Printf("Open output file of %s failed.\n", targetPath) + return + } + fmt.Printf("Open output file of %s succeed.\n", targetPath) + + w := bufio.NewWriter(outfile) + defer w.Flush() + for str := range sink { + _, err := w.WriteString(str) + if err != nil { + log.Fatal(err) + return + } + } +} diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go b/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go new file mode 100644 index 000000000..58712c804 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "encoding/csv" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "strconv" + "strings" + "testing" +) + +func TestGenerateSpeedTable(t *testing.T) { + wayid2speed := make(map[uint64]int) + loadMockTraffic("./testdata/mock-traffic.csv", wayid2speed) + generateSpeedTable(wayid2speed, "./testdata/id-mapping.csv", "./testdata/target.csv") + + b1, err1 := ioutil.ReadFile("./testdata/target.csv") + if err1 != nil { + fmt.Print(err1) + } + str1 := string(b1) + + b2, err2 := ioutil.ReadFile("./testdata/expect.csv") + if err2 != nil { + fmt.Print(err2) + } + str2 := string(b2) + + if strings.Compare(str1, str2) != 0 { + t.Error("TestGenerateSpeedTable failed\n") + } +} + +func loadMockTraffic(trafficPath string, wayid2speed map[uint64]int) { + // load mock traffic file + mockfile, err := os.Open(trafficPath) + defer mockfile.Close() + if err != nil { + log.Fatal(err) + fmt.Printf("Open pbf file of %v failed.\n", trafficPath) + return + } + fmt.Printf("Open pbf file of %s succeed.\n", trafficPath) + + csvr := csv.NewReader(mockfile) + for { + row, err := csvr.Read() + if err != nil { + if err == io.EOF { + err = nil + break + } else { + fmt.Printf("Error during decoding mock traffic, row = %v\n", err) + return + } + } + + var wayid uint64 + var speed int64 + if wayid, err = strconv.ParseUint(row[0], 10, 64); err != nil { + fmt.Printf("#Error during decoding wayid, row = %v\n", row) + } + if speed, err = strconv.ParseInt(row[1], 10, 32); err != nil { + fmt.Printf("#Error during decoding speed, row = %v\n", row) + } + + wayid2speed[wayid] = (int)(speed) + } +} diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go index f6853b1f3..4d9d084ba 100644 --- a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go @@ -1,13 +1,10 @@ package main import ( - "bufio" "context" "flag" "fmt" - "os" "strconv" - "time" "github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy" "github.com/apache/thrift/lib/go/thrift" @@ -16,6 +13,7 @@ import ( var flags struct { port int ip string + mappingFile string csvFile string highPrecision bool } @@ -23,53 +21,16 @@ var flags struct { func init() { flag.IntVar(&flags.port, "p", 6666, "traffic proxy listening port") flag.StringVar(&flags.ip, "c", "127.0.0.1", "traffic proxy ip address") + flag.StringVar(&flags.mappingFile, "m", "wayid2nodeids.csv", "OSRM way id to node ids mapping table") flag.StringVar(&flags.csvFile, "f", "traffic.csv", "OSRM traffic csv file") - flag.BoolVar(&flags.highPrecision, "d", true, "use high precision speeds, i.e. decimal") + flag.BoolVar(&flags.highPrecision, "d", false, "use high precision speeds, i.e. decimal") } -func dumpFlowsToCsv(csvFile string, flows []*proxy.Flow) { - - if _, err := os.Stat(csvFile); err == nil { - // csvFile exists, remove it - rmErr := os.Remove(csvFile) - if rmErr != nil { - fmt.Println(rmErr) - return - } +func flows2map(flows []*proxy.Flow, m map[uint64]int) { + for _, flow := range flows { + wayid := (uint64)(flow.WayId) + m[wayid] = int(flow.Speed) } - - f, err := os.OpenFile(csvFile, os.O_RDWR|os.O_CREATE, 0755) - if err != nil { - fmt.Println(err) - return - } - defer f.Close() - writer := bufio.NewWriter(f) - - for i, flow := range flows { - var osrmTrafficLine string - if flags.highPrecision { - osrmTrafficLine = fmt.Sprintf("%d,%d,%f\n", flow.FromId, flow.ToId, flow.Speed) - } else { - osrmTrafficLine = fmt.Sprintf("%d,%d,%d\n", flow.FromId, flow.ToId, int(flow.Speed)) - } - - // print first 10 lines for debug - if i < 10 { - fmt.Printf("[ %d ] %v\n", i, flow) - fmt.Printf("[ %d ] %s\n", i, osrmTrafficLine) - } - - // write to csv - _, err := writer.WriteString(osrmTrafficLine) - if err != nil { - fmt.Println(err) - return - } - } - writer.Flush() - f.Sync() - fmt.Printf("total wrote to %s count: %d\n", csvFile, len(flows)) } func main() { @@ -106,7 +67,6 @@ func main() { client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol)) // get flows - startTime := time.Now() fmt.Println("getting flows") var defaultCtx = context.Background() flows, err := client.GetAllFlows(defaultCtx) @@ -115,13 +75,9 @@ func main() { return } fmt.Printf("got flows count: %d\n", len(flows)) - afterGotFlowTime := time.Now() - fmt.Printf("get flows time used: %f seconds\n", afterGotFlowTime.Sub(startTime).Seconds()) - // dump to csv - fmt.Println("dump flows to: " + flags.csvFile) - dumpFlowsToCsv(flags.csvFile, flows) - endTime := time.Now() - fmt.Printf("dump csv time used: %f seconds\n", endTime.Sub(afterGotFlowTime).Seconds()) + wayid2speed := make(map[uint64]int) + flows2map(flows, wayid2speed) + generateSpeedTable(wayid2speed, flags.mappingFile, flags.csvFile) } diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv b/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv new file mode 100755 index 000000000..4c95ae579 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv @@ -0,0 +1,6 @@ +84760891102,19496208102,81 +84762609102,244183320001101,87 +244183320001101,84762607102,87 +84760849102,84760850102,47 +84760858102,84760846102,59 +847608599102,84760858102,59 diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv new file mode 100755 index 000000000..6d942a1bc --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv @@ -0,0 +1,4 @@ +24418325,84760891102,19496208102 +24418332,84762609102,244183320001101,84762607102 +24418343,84760849102,84760850102 +24418344,84760846102,84760858102,847608599102 \ No newline at end of file diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv b/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv new file mode 100755 index 000000000..8a8ce40d4 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv @@ -0,0 +1,4 @@ +24418325,81 +24418332,87 +24418343,47 +24418344,-59 \ No newline at end of file diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/target.csv b/traffic_updater/go/osrm_traffic_updater/testdata/target.csv new file mode 100755 index 000000000..4c95ae579 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/testdata/target.csv @@ -0,0 +1,6 @@ +84760891102,19496208102,81 +84762609102,244183320001101,87 +244183320001101,84762607102,87 +84760849102,84760850102,47 +84760858102,84760846102,59 +847608599102,84760858102,59 diff --git a/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go b/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go new file mode 100644 index 000000000..ba7a3ceb7 --- /dev/null +++ b/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go @@ -0,0 +1,158 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "io" + "log" + "math/rand" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/qedus/osmpbf" +) + +const telenavNavigableWaySuffix string = "100" + +func generateWayid2nodeidsMapping(input, output string) { + infile, err := os.Open(input) + defer infile.Close() + if err != nil { + log.Fatal(err) + fmt.Printf("Open pbf file of %v failed.\n", input) + return + } + fmt.Printf("Open pbf file of %s succeed.\n", input) + + outfile, err := os.OpenFile(output, os.O_RDWR|os.O_CREATE, 0755) + defer outfile.Close() + defer outfile.Sync() + if err != nil { + log.Fatal(err) + fmt.Printf("Open output file of %s failed.\n", output) + return + } + fmt.Printf("Open output file of %s succeed.\n", output) + + wayid2nodeids(infile, outfile) +} + +func wayid2nodeids(infile io.Reader, outfile io.Writer) { + // Init extractor + extractor := osmpbf.NewDecoder(infile) + extractor.SetBufferSize(osmpbf.MaxBlobSize) + err := extractor.Start(runtime.GOMAXPROCS(-1)) + if err != nil { + log.Fatal(err) + return + } + + // Init loader + loader := bufio.NewWriter(outfile) + defer loader.Flush() + + var wc, nc uint32 + for { + if v, err := extractor.Decode(); err == io.EOF { + break + } else if err != nil { + log.Fatal(err) + } else { + switch v := v.(type) { + case *osmpbf.Node: + case *osmpbf.Way: + wayid := strconv.FormatUint((uint64)(v.ID), 10) + if !isNavigableWay(wayid) { + continue + } + wayid = trimNavigableWaySuffix(wayid) + + // Transform + str := convertWayObj2IdMappingString(v, wayid) + //str := convertWayObj2MockSpeed(v, wayid) + + _, err := loader.WriteString(str) + if err != nil { + log.Fatal(err) + return + } + + wc++ + nc += (uint32)(len(v.NodeIDs)) + case *osmpbf.Relation: + default: + log.Fatalf("unknown type %T\n", v) + } + } + } + + fmt.Printf("Total ways: %d, total nodes: %d\n", wc, nc) +} + +// This optimization is for telenav internal pbf only +// User need manully set flags.istelenavpbf = true to enable this +// Telenav PBF add "100" for all navigable ways +func isNavigableWay(wayid string) bool { + if flags.istelenavpbf { + if strings.HasSuffix(wayid, telenavNavigableWaySuffix) { + return true + } else { + return false + } + } else { + return true + } +} + +// Remove "100" suffix for telenav ways, other components such as telenav traffic +// already remove it +func trimNavigableWaySuffix(wayid string) string { + if flags.istelenavpbf { + return strings.TrimSuffix(wayid, telenavNavigableWaySuffix) + } + return wayid +} + +func convertWayObj2IdMappingString(v *osmpbf.Way, wayid string) string { + // format: wayid,nodeid1,nodeid2, ... + return wayid + "," + + strings.Trim(strings.Join(strings.Fields(fmt.Sprint(v.NodeIDs)), ","), "[]") + + "\n" +} + +func convertWayObj2MockSpeed(v *osmpbf.Way, wayid string) string { + // format: wayid,random speed + return wayid + "," + + strconv.Itoa(rand.Intn(100)) + + "\n" +} + +var flags struct { + input string + output string + istelenavpbf bool +} + +func init() { + flag.StringVar(&flags.input, "i", "", "Input pbf file.") + flag.StringVar(&flags.output, "o", "", "Output csv file") + flag.BoolVar(&flags.istelenavpbf, "b", false, "Is pbf generated by telenav internally") +} + +func main() { + flag.Parse() + + if len(flags.input) == 0 || len(flags.output) == 0 { + fmt.Printf("[ERROR]Input or Output file path is empty.\n") + return + } + + startTime := time.Now() + generateWayid2nodeidsMapping(flags.input, flags.output) + endTime := time.Now() + fmt.Printf("Total processing time for wayid2nodeids-extract takes %f seconds\n", endTime.Sub(startTime).Seconds()) +}