diff --git a/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go b/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go new file mode 100644 index 000000000..183c9fa0b --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/get_telenav_traffic.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy" + "github.com/apache/thrift/lib/go/thrift" +) + +func getTrafficFlow(ip string, port int, m map[int64]int, c chan<- bool) { + var transport thrift.TTransport + var err error + + startTime := time.Now() + + // make socket + targetServer := ip + ":" + strconv.Itoa(port) + fmt.Println("connect traffic proxy " + targetServer) + transport, err = thrift.NewTSocket(targetServer) + if err != nil { + fmt.Println("Error opening socket:", err) + c <- false + return + } + + // Buffering + transport, err = thrift.NewTFramedTransportFactoryMaxLength(thrift.NewTTransportFactory(), 1024*1024*1024).GetTransport(transport) + if err != nil { + fmt.Println("Error get transport:", err) + c <- false + return + } + defer transport.Close() + if err := transport.Open(); err != nil { + fmt.Println("Error opening transport:", err) + c <- false + return + } + + // protocol encoder&decoder + protocol := thrift.NewTCompactProtocolFactory().GetProtocol(transport) + + // create proxy client + client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol)) + + // get flows + fmt.Println("getting flows") + var defaultCtx = context.Background() + var flows []*proxy.Flow + flows, err = client.GetAllFlows(defaultCtx) + if err != nil { + fmt.Println("get flows failed:", err) + c <- false + return + } + fmt.Printf("got flows count: %d\n", len(flows)) + + endTime := time.Now() + fmt.Printf("Processing time for getting traffic flow takes %f seconds\n", endTime.Sub(startTime).Seconds()) + + flows2map(flows, m) + endTime2 := time.Now() + fmt.Printf("Processing time for building traffic map takes %f seconds\n", endTime2.Sub(endTime).Seconds()) + + c <- true + return +} + +func flows2map(flows []*proxy.Flow, m map[int64]int) { + for _, flow := range flows { + wayid := flow.WayId + m[wayid] = int(flow.Speed) + } +} diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go b/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go deleted file mode 100644 index 3dc5d54c5..000000000 --- a/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator.go +++ /dev/null @@ -1,114 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "log" - "os" - "strconv" - "strings" - "time" -) - -// todo: -// Write data into more compressed format(parquet) -// Statistic to avoid unmatched element -// Multiple go routine for convert() -func generateSpeedTable(wayid2speed map[uint64]int, way2nodeidsPath string, target string) { - startTime := time.Now() - - // format is: wayid, nodeid, nodeid, nodeid... - source := make(chan string) - // format is fromid, toid, speed - sink := make(chan string) - - go load(way2nodeidsPath, source) - go convert(source, sink, wayid2speed) - write(target, sink) - - endTime := time.Now() - fmt.Printf("Processing time for generate speed table takes %f seconds\n", endTime.Sub(startTime).Seconds()) -} - -func load(mappingPath string, source chan<- string) { - defer close(source) - - f, err := os.Open(mappingPath) - defer f.Close() - if err != nil { - log.Fatal(err) - fmt.Printf("Open idsmapping file of %v failed.\n", mappingPath) - return - } - fmt.Printf("Open idsmapping file of %s succeed.\n", mappingPath) - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - source <- (scanner.Text()) - } -} - -func convert(source <-chan string, sink chan<- string, wayid2speed map[uint64]int) { - var err error - defer close(sink) - - for str := range source { - elements := strings.Split(str, ",") - if len(elements) < 3 { - fmt.Printf("Invalid string %s in wayid2nodeids mapping file\n", str) - continue - } - - var wayid uint64 - if wayid, err = strconv.ParseUint(elements[0], 10, 64); err != nil { - fmt.Printf("#Error during decoding wayid, row = %v\n", elements) - continue - } - - if speed, ok := wayid2speed[wayid]; ok { - var nodes []string = elements[1:] - for i := 0; (i + 1) < len(nodes); i++ { - var n1, n2 uint64 - if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil { - fmt.Printf("#Error during decoding nodeid, row = %v\n", elements) - continue - } - if n2, err = strconv.ParseUint(nodes[i+1], 10, 64); err != nil { - fmt.Printf("#Error during decoding nodeid, row = %v\n", elements) - continue - } - - var s string - if speed >= 0 { - s = fmt.Sprintf("%d,%d,%d\n", n1, n2, speed) - } else { - s = fmt.Sprintf("%d,%d,%d\n", n2, n1, -speed) - } - - sink <- s - } - } - } -} - -func write(targetPath string, sink chan string) { - outfile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0755) - defer outfile.Close() - defer outfile.Sync() - if err != nil { - log.Fatal(err) - fmt.Printf("Open output file of %s failed.\n", targetPath) - return - } - fmt.Printf("Open output file of %s succeed.\n", targetPath) - - w := bufio.NewWriter(outfile) - defer w.Flush() - for str := range sink { - _, err := w.WriteString(str) - if err != nil { - log.Fatal(err) - return - } - } -} diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go b/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go deleted file mode 100644 index 58712c804..000000000 --- a/traffic_updater/go/osrm_traffic_updater/osrm_speedtable_generator_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -import ( - "encoding/csv" - "fmt" - "io" - "io/ioutil" - "log" - "os" - "strconv" - "strings" - "testing" -) - -func TestGenerateSpeedTable(t *testing.T) { - wayid2speed := make(map[uint64]int) - loadMockTraffic("./testdata/mock-traffic.csv", wayid2speed) - generateSpeedTable(wayid2speed, "./testdata/id-mapping.csv", "./testdata/target.csv") - - b1, err1 := ioutil.ReadFile("./testdata/target.csv") - if err1 != nil { - fmt.Print(err1) - } - str1 := string(b1) - - b2, err2 := ioutil.ReadFile("./testdata/expect.csv") - if err2 != nil { - fmt.Print(err2) - } - str2 := string(b2) - - if strings.Compare(str1, str2) != 0 { - t.Error("TestGenerateSpeedTable failed\n") - } -} - -func loadMockTraffic(trafficPath string, wayid2speed map[uint64]int) { - // load mock traffic file - mockfile, err := os.Open(trafficPath) - defer mockfile.Close() - if err != nil { - log.Fatal(err) - fmt.Printf("Open pbf file of %v failed.\n", trafficPath) - return - } - fmt.Printf("Open pbf file of %s succeed.\n", trafficPath) - - csvr := csv.NewReader(mockfile) - for { - row, err := csvr.Read() - if err != nil { - if err == io.EOF { - err = nil - break - } else { - fmt.Printf("Error during decoding mock traffic, row = %v\n", err) - return - } - } - - var wayid uint64 - var speed int64 - if wayid, err = strconv.ParseUint(row[0], 10, 64); err != nil { - fmt.Printf("#Error during decoding wayid, row = %v\n", row) - } - if speed, err = strconv.ParseInt(row[1], 10, 32); err != nil { - fmt.Printf("#Error during decoding speed, row = %v\n", row) - } - - wayid2speed[wayid] = (int)(speed) - } -} diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go index 4d9d084ba..33e2cd203 100644 --- a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go @@ -1,13 +1,8 @@ package main import ( - "context" "flag" "fmt" - "strconv" - - "github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy" - "github.com/apache/thrift/lib/go/thrift" ) var flags struct { @@ -26,58 +21,42 @@ func init() { flag.BoolVar(&flags.highPrecision, "d", false, "use high precision speeds, i.e. decimal") } -func flows2map(flows []*proxy.Flow, m map[uint64]int) { - for _, flow := range flows { - wayid := (uint64)(flow.WayId) - m[wayid] = int(flow.Speed) - } -} +const TASKNUM = 128 +const CACHEDOBJECTS = 1000000 func main() { flag.Parse() - var transport thrift.TTransport - var err error + isFlowDoneChan := make(chan bool, 1) + wayid2speed := make(map[int64]int) + go getTrafficFlow(flags.ip, flags.port, wayid2speed, isFlowDoneChan) - // make socket - targetServer := flags.ip + ":" + strconv.Itoa(flags.port) - fmt.Println("connect traffic proxy " + targetServer) - transport, err = thrift.NewTSocket(targetServer) - if err != nil { - fmt.Println("Error opening socket:", err) - return + var sources [TASKNUM]chan string + for i := range sources { + sources[i] = make(chan string, CACHEDOBJECTS) } + go loadWay2NodeidsTable(flags.mappingFile, sources) - // Buffering - transport, err = thrift.NewTFramedTransportFactoryMaxLength(thrift.NewTTransportFactory(), 1024*1024*1024).GetTransport(transport) - if err != nil { - fmt.Println("Error get transport:", err) - return + isFlowDone := wait4PreConditions(isFlowDoneChan) + if isFlowDone { + dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile) } - defer transport.Close() - if err := transport.Open(); err != nil { - fmt.Println("Error opening transport:", err) - return - } - - // protocol encoder&decoder - protocol := thrift.NewTCompactProtocolFactory().GetProtocol(transport) - - // create proxy client - client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol)) - - // get flows - fmt.Println("getting flows") - var defaultCtx = context.Background() - flows, err := client.GetAllFlows(defaultCtx) - if err != nil { - fmt.Println("get flows failed:", err) - return - } - fmt.Printf("got flows count: %d\n", len(flows)) - - wayid2speed := make(map[uint64]int) - flows2map(flows, wayid2speed) - - generateSpeedTable(wayid2speed, flags.mappingFile, flags.csvFile) +} + +func wait4PreConditions(flowChan <-chan bool) (bool) { + var isFlowDone bool + loop: + for { + select { + case f := <- flowChan : + if !f { + fmt.Printf("[ERROR] Communication with traffic server failed.\n") + break loop + } else { + isFlowDone = true + break loop + } + } + } + return isFlowDone } diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go new file mode 100644 index 000000000..a3348aa36 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "testing" +) + +func setChan2True(c chan<- bool) { + c <- true +} + +func setChan2False(c chan<- bool) { + c <- false +} + +func TestWait4AllPreconditions1(t *testing.T) { + c1 := make(chan bool, 1) + + go setChan2True(c1) + + b1 := wait4PreConditions(c1) + if !b1 { + t.Error("Testwait4AllPreconditions1 failed.\n") + } +} + +func TestWait4AllPreconditions2(t *testing.T) { + c2 := make(chan bool, 1) + + go setChan2False(c2) + + b2 := wait4PreConditions(c2) + if b2 { + t.Error("Testwait4AllPreconditions1 failed.\n") + } +} diff --git a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go new file mode 100644 index 000000000..0a75b3c95 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper.go @@ -0,0 +1,127 @@ +package main + +import ( + "os" + "log" + "fmt" + "bufio" + "sync" + "time" + "strings" + "strconv" +) + +var tasksWg sync.WaitGroup +var dumpFinishedWg sync.WaitGroup + +func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, outputPath string) { + startTime := time.Now() + + if len(wayid2speed) == 0 { + return + } + + sink := make(chan string) + startTasks(wayid2speed, sources, sink) + startDump(outputPath, sink) + wait4AllTasksFinished(sink) + + endTime := time.Now() + fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds()) +} + +func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, sink chan<- string) { + tasksWg.Add(TASKNUM) + for i := 0; i < TASKNUM; i++ { + go task(wayid2speed, sources[i], sink) + } +} + +func startDump(outputPath string, sink <-chan string) { + dumpFinishedWg.Add(1) + go write(outputPath, sink) +} + +func wait4AllTasksFinished(sink chan string) { + tasksWg.Wait() + close(sink) + dumpFinishedWg.Wait() +} + +func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) { + var err error + for str := range source { + elements := strings.Split(str, ",") + if len(elements) < 3 { + continue + } + + var wayid uint64 + if wayid, err = strconv.ParseUint(elements[0], 10, 64); err != nil { + fmt.Printf("#Error during decoding wayid, row = %v\n", elements) + continue + } + + speedFwd, okFwd := wayid2speed[(int64)(wayid)] + speedBwd, okBwd := wayid2speed[(int64)(-wayid)] + + if okFwd || okBwd { + var nodes []string = elements[1:] + for i := 0; (i + 1) < len(nodes); i++ { + var n1, n2 uint64 + if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil { + fmt.Printf("#Error during decoding nodeid, row = %v\n", elements) + continue + } + if n2, err = strconv.ParseUint(nodes[i+1], 10, 64); err != nil { + fmt.Printf("#Error during decoding nodeid, row = %v\n", elements) + continue + } + if okFwd { + sink <- generateSingleRecord(n1, n2, speedFwd, true) + } + if okBwd { + sink <- generateSingleRecord(n1, n2, speedBwd, false) + } + + } + } + + } + tasksWg.Done() +} + +// format +// if dir = true, means traffic for forward, generate: from, to, speed +// if dir = false, means this speed is for backward flow, generate: to, from, speed +func generateSingleRecord(from, to uint64, speed int, dir bool) (string){ + if dir { + return fmt.Sprintf("%d,%d,%d\n", from, to, speed) + } else { + return fmt.Sprintf("%d,%d,%d\n", to, from, speed) + } +} + +func write(targetPath string, sink <-chan string) { + outfile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0755) + defer outfile.Close() + defer outfile.Sync() + if err != nil { + log.Fatal(err) + fmt.Printf("Open output file of %s failed.\n", targetPath) + return + } + fmt.Printf("Open output file of %s succeed.\n", targetPath) + + w := bufio.NewWriter(outfile) + defer w.Flush() + for str := range sink { + _, err := w.WriteString(str) + if err != nil { + log.Fatal(err) + return + } + } + + dumpFinishedWg.Done() +} diff --git a/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go new file mode 100644 index 000000000..48dd6188f --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/speed_table_dumper_test.go @@ -0,0 +1,169 @@ +package main + +import ( + "testing" + "os" + "log" + "fmt" + "strconv" + "io" + "encoding/csv" + "io/ioutil" + "strings" + "reflect" + + "github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy" +) + +func TestSpeedTableDumper1(t *testing.T) { + // load result into sources + var sources [TASKNUM]chan string + for i := range sources { + sources[i] = make(chan string, 10000) + } + go loadWay2NodeidsTable("./testdata/id-mapping.csv.snappy", sources) + + // construct mock traffic + var flows []*proxy.Flow + flows = loadMockTraffic("./testdata/mock-traffic.csv", flows) + wayid2speed := make(map[int64]int) + flows2map(flows, wayid2speed) + + dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv") + + compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t) +} + +func TestGenerateSingleRecord1(t *testing.T) { + str := generateSingleRecord(12345, 54321, 33, true) + if strings.Compare(str, "12345,54321,33\n") != 0 { + t.Error("Test GenerateSingleRecord failed.\n") + } +} + +func TestGenerateSingleRecord2(t *testing.T) { + str := generateSingleRecord(12345, 54321, 33, false) + if strings.Compare(str, "54321,12345,33\n") != 0 { + t.Error("Test GenerateSingleRecord failed.\n") + } +} + +func loadMockTraffic(trafficPath string, flows []*proxy.Flow) []*proxy.Flow { + // load mock traffic file + mockfile, err := os.Open(trafficPath) + defer mockfile.Close() + if err != nil { + log.Fatal(err) + fmt.Printf("Open pbf file of %v failed.\n", trafficPath) + return nil + } + fmt.Printf("Open pbf file of %s succeed.\n", trafficPath) + + csvr := csv.NewReader(mockfile) + for { + row, err := csvr.Read() + if err != nil { + if err == io.EOF { + err = nil + break + } else { + fmt.Printf("Error during decoding mock traffic, err = %v\n", err) + return nil + } + } + + var wayid int64 + var speed int64 + if wayid, err = strconv.ParseInt(row[0], 10, 64); err != nil { + fmt.Printf("#Error during decoding wayid, row = %v\n", row) + } + if speed, err = strconv.ParseInt(row[1], 10, 32); err != nil { + fmt.Printf("#Error during decoding speed, row = %v\n", row) + } + + var flow proxy.Flow + flow.WayId = wayid + flow.Speed = (float64)(speed) + flows = append(flows, &flow) + } + return flows +} + +type tNodePair struct { + f, t uint64 +} + +func loadSpeedCsv(f string, m map[tNodePair]int) { + // load mock traffic file + mockfile, err := os.Open(f) + defer mockfile.Close() + if err != nil { + log.Fatal(err) + fmt.Printf("Open file of %v failed.\n", f) + return + } + fmt.Printf("Open file of %s succeed.\n", f) + + csvr := csv.NewReader(mockfile) + for { + row, err := csvr.Read() + if err != nil { + if err == io.EOF { + err = nil + break + } else { + fmt.Printf("Error during decoding file %s, err = %v\n", f, err) + return + } + } + + var from, to uint64 + var speed int + if from, err = strconv.ParseUint(row[0], 10, 64); err != nil { + fmt.Printf("#Error during decoding, row = %v\n", row) + return + } + if to, err = strconv.ParseUint(row[1], 10, 64); err != nil { + fmt.Printf("#Error during decoding, row = %v\n", row) + return + } + if speed, err = strconv.Atoi(row[2]); err != nil { + fmt.Printf("#Error during decoding, row = %v\n", row) + return + } + + m[tNodePair{from, to}] = speed + } +} + +func compareFileContentStable(f1, f2 string, t *testing.T) { + b1, err1 := ioutil.ReadFile(f1) + if err1 != nil { + fmt.Print(err1) + } + str1 := string(b1) + + b2, err2 := ioutil.ReadFile(f2) + if err2 != nil { + fmt.Print(err2) + } + str2 := string(b2) + + if strings.Compare(str1, str2) != 0 { + t.Error("Compare file content failed\n") + } +} + +func compareFileContentUnstable(f1, f2 string, t *testing.T) { + r1 := make(map[tNodePair]int) + loadSpeedCsv(f1, r1) + + r2 := make(map[tNodePair]int) + loadSpeedCsv(f2, r2) + + eq := reflect.DeepEqual(r1, r2) + if !eq { + t.Error("TestLoadWay2Nodeids failed to generate correct map\n") + } +} + diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv b/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv index 4c95ae579..1c91e2e23 100755 --- a/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv +++ b/traffic_updater/go/osrm_traffic_updater/testdata/expect.csv @@ -1,6 +1,7 @@ 84760891102,19496208102,81 -84762609102,244183320001101,87 -244183320001101,84762607102,87 +244183320001101,84762609102,87 +84762607102,244183320001101,87 84760849102,84760850102,47 84760858102,84760846102,59 -847608599102,84760858102,59 +84762609102,244183320001101,87 +244183320001101,84762607102,87 diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping-delta.csv b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping-delta.csv new file mode 100755 index 000000000..326436f3c --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping-delta.csv @@ -0,0 +1,4 @@ +24418325,84760891102,-65264683000 +7,1718000,244098557391999,-244098557393999 +11,-1760000,1000 +1,-3000,12000 \ No newline at end of file diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping-delta.csv.snappy b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping-delta.csv.snappy new file mode 100755 index 000000000..53b828c98 Binary files /dev/null and b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping-delta.csv.snappy differ diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv index 6d942a1bc..94063b4e7 100755 --- a/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv +++ b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv @@ -1,4 +1,4 @@ 24418325,84760891102,19496208102 24418332,84762609102,244183320001101,84762607102 24418343,84760849102,84760850102 -24418344,84760846102,84760858102,847608599102 \ No newline at end of file +24418344,84760846102,84760858102 \ No newline at end of file diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv.snappy b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv.snappy new file mode 100755 index 000000000..e33eea19c Binary files /dev/null and b/traffic_updater/go/osrm_traffic_updater/testdata/id-mapping.csv.snappy differ diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv b/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv index 8a8ce40d4..25bc19235 100755 --- a/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv +++ b/traffic_updater/go/osrm_traffic_updater/testdata/mock-traffic.csv @@ -1,4 +1,5 @@ 24418325,81 +-24418332,87 24418332,87 24418343,47 -24418344,-59 \ No newline at end of file +-24418344,59 \ No newline at end of file diff --git a/traffic_updater/go/osrm_traffic_updater/testdata/target.csv b/traffic_updater/go/osrm_traffic_updater/testdata/target.csv index 4c95ae579..01fa83ad2 100755 --- a/traffic_updater/go/osrm_traffic_updater/testdata/target.csv +++ b/traffic_updater/go/osrm_traffic_updater/testdata/target.csv @@ -1,6 +1,7 @@ -84760891102,19496208102,81 -84762609102,244183320001101,87 -244183320001101,84762607102,87 -84760849102,84760850102,47 84760858102,84760846102,59 -847608599102,84760858102,59 +84762609102,244183320001101,87 +84760849102,84760850102,47 +84760891102,19496208102,81 +244183320001101,84762609102,87 +244183320001101,84762607102,87 +84762607102,244183320001101,87 diff --git a/traffic_updater/go/osrm_traffic_updater/way2nodeids_loader.go b/traffic_updater/go/osrm_traffic_updater/way2nodeids_loader.go new file mode 100644 index 000000000..8aa87a497 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/way2nodeids_loader.go @@ -0,0 +1,58 @@ +package main + +import ( + "bufio" + "fmt" + "log" + "os" + "time" + "github.com/golang/snappy" +) + +func loadWay2NodeidsTable(filepath string, sources [TASKNUM]chan string) { + startTime := time.Now() + + data := make(chan string) + go load(filepath, data) + convert(data, sources) + + endTime := time.Now() + fmt.Printf("Processing time for loadWay2NodeidsTable takes %f seconds\n", endTime.Sub(startTime).Seconds()) +} + +func load(mappingPath string, data chan<- string) { + defer close(data) + + f, err := os.Open(mappingPath) + defer f.Close() + if err != nil { + log.Fatal(err) + fmt.Printf("Open idsmapping file of %v failed.\n", mappingPath) + return + } + fmt.Printf("Open idsmapping file of %s succeed.\n", mappingPath) + + scanner := bufio.NewScanner(snappy.NewReader(f)) + for scanner.Scan() { + data <- (scanner.Text()) + } +} + + +// input data format +// wayid1, n1, n2 +// wayid2, n3, n4, n5 +func convert(data <-chan string, sources [TASKNUM]chan string) { + for i := range sources { + defer close(sources[i]) + } + + var count int + for str := range data { + chanIndex := count % TASKNUM + count++ + sources[chanIndex] <- str + } +} + + diff --git a/traffic_updater/go/osrm_traffic_updater/way2nodeids_loader_test.go b/traffic_updater/go/osrm_traffic_updater/way2nodeids_loader_test.go new file mode 100644 index 000000000..cdf608c86 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/way2nodeids_loader_test.go @@ -0,0 +1,64 @@ +package main + +import ( + "testing" + "reflect" + "sync" +) + + +func TestLoadWay2Nodeids(t *testing.T) { + // load result into sources + var sources [TASKNUM]chan string + for i := range sources { + //fmt.Printf("&&& current i is %d\n", i) + sources[i] = make(chan string, 10000) + } + go loadWay2NodeidsTable("./testdata/id-mapping.csv.snappy", sources) + + allWay2NodesChan := make(chan string, 10000) + var wgs sync.WaitGroup + wgs.Add(TASKNUM) + for i:= 0; i < TASKNUM; i++ { + //fmt.Printf("### current i is %d\n", i) + go mergeChannels(sources[i], allWay2NodesChan, &wgs) + } + wgs.Wait() + + // dump result into map + way2nodeids := make(map[string]bool) + var wg sync.WaitGroup + wg.Add(1) + go func() { + for elem := range allWay2NodesChan { + way2nodeids[elem] = true + } + wg.Done() + }() + close(allWay2NodesChan) + wg.Wait() + + // test map result + way2nodeidsExpect := make(map[string]bool) + generateMockWay2nodeids(way2nodeidsExpect) + + eq := reflect.DeepEqual(way2nodeids, way2nodeidsExpect) + if !eq { + t.Error("TestLoadWay2Nodeids failed to generate correct map\n") + } +} + +func mergeChannels(f <-chan string, t chan<- string, w *sync.WaitGroup) { + for elem := range f { + t <- elem + } + w.Done() +} + +func generateMockWay2nodeids(way2nodeids map[string]bool) { + way2nodeids["24418325,84760891102,19496208102"] = true + way2nodeids["24418332,84762609102,244183320001101,84762607102"] = true + way2nodeids["24418343,84760849102,84760850102"] = true + way2nodeids["24418344,84760846102,84760858102"] = true +} + diff --git a/traffic_updater/go/snappy_command/snappy_command.go b/traffic_updater/go/snappy_command/snappy_command.go new file mode 100644 index 000000000..0842bb2e0 --- /dev/null +++ b/traffic_updater/go/snappy_command/snappy_command.go @@ -0,0 +1,64 @@ +package main + +import ( + "flag" + "strings" + "fmt" + "os" + "io" + "github.com/golang/snappy" +) + +const snappySuffix string = ".snappy" + +var flags struct { + input string + output string + suffix string +} + +func init() { + flag.StringVar(&flags.input, "i", "", "Input file.") + flag.StringVar(&flags.output, "o", "", "Output file.") +} + +func main() { + flag.Parse() + + inputCompressed := strings.HasSuffix(flags.input, snappySuffix) + outputCompressed := strings.HasSuffix(flags.output, snappySuffix) + + if inputCompressed == outputCompressed { + fmt.Printf("Error. Input and output must have one with .snappy suffix and one without.\n") + return + } + + fi, err1 := os.Open(flags.input) + if err1 != nil { + fmt.Printf("Open input file failed.\n") + return + } + defer fi.Close() + + fo, err2 := os.OpenFile(flags.output, os.O_RDWR|os.O_CREATE, 0755) + if err2 != nil { + fmt.Printf("Open output file failed.\n") + return + } + defer fo.Close() + defer fo.Sync() + + buf := make([]byte, 128 * 1024) + if inputCompressed { + _, err := io.CopyBuffer(fo, snappy.NewReader(fi), buf) + if err != nil { + fmt.Printf("Decompression failed\n") + } + } else { + _, err := io.CopyBuffer(snappy.NewWriter(fo), fi, buf) + if err != nil { + fmt.Printf("Compression failed\n") + } + } + +} diff --git a/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go b/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go index ba7a3ceb7..306add1ee 100644 --- a/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go +++ b/traffic_updater/go/wayid2nodeid_extractor/wayid2nodeids_extractor.go @@ -156,3 +156,4 @@ func main() { endTime := time.Now() fmt.Printf("Total processing time for wayid2nodeids-extract takes %f seconds\n", endTime.Sub(startTime).Seconds()) } +