Feature/optimize traffic convertor (#41)

* feat: Optimize output of wayid2nodeids format, use delta format to comparess data

Issues: https://github.com/Telenav/osrm-backend/issues/31

* feat: Implement logic to compress/decompress file to snappy.

issues: https://github.com/Telenav/osrm-backend/issues/31

* feat: Modify osrm speed table generator to support snappy compression format.

issues: https://github.com/Telenav/osrm-backend/issues/31

* feat: Fix bug during conversion

* feat: Adjust traffic updater's logic to improve performance.

* feat: Adjust traffic updater's logic to improve performance.

issues: https://github.com/Telenav/osrm-backend/pull/39

* feat: Refine the code for osrm_traffic_updater.

issues: https://github.com/Telenav/osrm-backend/issues/31

* fix: fix dead lock in the code.

* fix: optimize performance with new architecture.
issues: https://github.com/Telenav/osrm-backend/issues/31

* fix: revert way id generator to original solution

* fix: Use string to pass between different components
issues: https://github.com/Telenav/osrm-backend/issues/31

* fix: update unit test for latest changes
issues: https://github.com/Telenav/osrm-backend/issues/31

* fix: remove useless printf

* fix: update unit test for osrm_traffic_updater.go

* fix: fix the misunderstanding with requirement.  Traffic server generates -wayid indicate for traffic flow on reverse direction.
issues: https://github.com/Telenav/osrm-backend/issues/31
This commit is contained in:
Xun(Perry) Liu 2019-07-16 18:11:38 -07:00 committed by Jay
parent 5726d46284
commit a3eb24a0fc
18 changed files with 642 additions and 247 deletions

View File

@ -0,0 +1,77 @@
package main
import (
"context"
"fmt"
"strconv"
"time"
"github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy"
"github.com/apache/thrift/lib/go/thrift"
)
func getTrafficFlow(ip string, port int, m map[int64]int, c chan<- bool) {
var transport thrift.TTransport
var err error
startTime := time.Now()
// make socket
targetServer := ip + ":" + strconv.Itoa(port)
fmt.Println("connect traffic proxy " + targetServer)
transport, err = thrift.NewTSocket(targetServer)
if err != nil {
fmt.Println("Error opening socket:", err)
c <- false
return
}
// Buffering
transport, err = thrift.NewTFramedTransportFactoryMaxLength(thrift.NewTTransportFactory(), 1024*1024*1024).GetTransport(transport)
if err != nil {
fmt.Println("Error get transport:", err)
c <- false
return
}
defer transport.Close()
if err := transport.Open(); err != nil {
fmt.Println("Error opening transport:", err)
c <- false
return
}
// protocol encoder&decoder
protocol := thrift.NewTCompactProtocolFactory().GetProtocol(transport)
// create proxy client
client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol))
// get flows
fmt.Println("getting flows")
var defaultCtx = context.Background()
var flows []*proxy.Flow
flows, err = client.GetAllFlows(defaultCtx)
if err != nil {
fmt.Println("get flows failed:", err)
c <- false
return
}
fmt.Printf("got flows count: %d\n", len(flows))
endTime := time.Now()
fmt.Printf("Processing time for getting traffic flow takes %f seconds\n", endTime.Sub(startTime).Seconds())
flows2map(flows, m)
endTime2 := time.Now()
fmt.Printf("Processing time for building traffic map takes %f seconds\n", endTime2.Sub(endTime).Seconds())
c <- true
return
}
func flows2map(flows []*proxy.Flow, m map[int64]int) {
for _, flow := range flows {
wayid := flow.WayId
m[wayid] = int(flow.Speed)
}
}

View File

@ -1,114 +0,0 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
)
// todo:
// Write data into more compressed format(parquet)
// Statistic to avoid unmatched element
// Multiple go routine for convert()
func generateSpeedTable(wayid2speed map[uint64]int, way2nodeidsPath string, target string) {
startTime := time.Now()
// format is: wayid, nodeid, nodeid, nodeid...
source := make(chan string)
// format is fromid, toid, speed
sink := make(chan string)
go load(way2nodeidsPath, source)
go convert(source, sink, wayid2speed)
write(target, sink)
endTime := time.Now()
fmt.Printf("Processing time for generate speed table takes %f seconds\n", endTime.Sub(startTime).Seconds())
}
func load(mappingPath string, source chan<- string) {
defer close(source)
f, err := os.Open(mappingPath)
defer f.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open idsmapping file of %v failed.\n", mappingPath)
return
}
fmt.Printf("Open idsmapping file of %s succeed.\n", mappingPath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
source <- (scanner.Text())
}
}
func convert(source <-chan string, sink chan<- string, wayid2speed map[uint64]int) {
var err error
defer close(sink)
for str := range source {
elements := strings.Split(str, ",")
if len(elements) < 3 {
fmt.Printf("Invalid string %s in wayid2nodeids mapping file\n", str)
continue
}
var wayid uint64
if wayid, err = strconv.ParseUint(elements[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding wayid, row = %v\n", elements)
continue
}
if speed, ok := wayid2speed[wayid]; ok {
var nodes []string = elements[1:]
for i := 0; (i + 1) < len(nodes); i++ {
var n1, n2 uint64
if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil {
fmt.Printf("#Error during decoding nodeid, row = %v\n", elements)
continue
}
if n2, err = strconv.ParseUint(nodes[i+1], 10, 64); err != nil {
fmt.Printf("#Error during decoding nodeid, row = %v\n", elements)
continue
}
var s string
if speed >= 0 {
s = fmt.Sprintf("%d,%d,%d\n", n1, n2, speed)
} else {
s = fmt.Sprintf("%d,%d,%d\n", n2, n1, -speed)
}
sink <- s
}
}
}
}
func write(targetPath string, sink chan string) {
outfile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0755)
defer outfile.Close()
defer outfile.Sync()
if err != nil {
log.Fatal(err)
fmt.Printf("Open output file of %s failed.\n", targetPath)
return
}
fmt.Printf("Open output file of %s succeed.\n", targetPath)
w := bufio.NewWriter(outfile)
defer w.Flush()
for str := range sink {
_, err := w.WriteString(str)
if err != nil {
log.Fatal(err)
return
}
}
}

View File

@ -1,72 +0,0 @@
package main
import (
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"testing"
)
func TestGenerateSpeedTable(t *testing.T) {
wayid2speed := make(map[uint64]int)
loadMockTraffic("./testdata/mock-traffic.csv", wayid2speed)
generateSpeedTable(wayid2speed, "./testdata/id-mapping.csv", "./testdata/target.csv")
b1, err1 := ioutil.ReadFile("./testdata/target.csv")
if err1 != nil {
fmt.Print(err1)
}
str1 := string(b1)
b2, err2 := ioutil.ReadFile("./testdata/expect.csv")
if err2 != nil {
fmt.Print(err2)
}
str2 := string(b2)
if strings.Compare(str1, str2) != 0 {
t.Error("TestGenerateSpeedTable failed\n")
}
}
func loadMockTraffic(trafficPath string, wayid2speed map[uint64]int) {
// load mock traffic file
mockfile, err := os.Open(trafficPath)
defer mockfile.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open pbf file of %v failed.\n", trafficPath)
return
}
fmt.Printf("Open pbf file of %s succeed.\n", trafficPath)
csvr := csv.NewReader(mockfile)
for {
row, err := csvr.Read()
if err != nil {
if err == io.EOF {
err = nil
break
} else {
fmt.Printf("Error during decoding mock traffic, row = %v\n", err)
return
}
}
var wayid uint64
var speed int64
if wayid, err = strconv.ParseUint(row[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding wayid, row = %v\n", row)
}
if speed, err = strconv.ParseInt(row[1], 10, 32); err != nil {
fmt.Printf("#Error during decoding speed, row = %v\n", row)
}
wayid2speed[wayid] = (int)(speed)
}
}

View File

@ -1,13 +1,8 @@
package main
import (
"context"
"flag"
"fmt"
"strconv"
"github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy"
"github.com/apache/thrift/lib/go/thrift"
)
var flags struct {
@ -26,58 +21,42 @@ func init() {
flag.BoolVar(&flags.highPrecision, "d", false, "use high precision speeds, i.e. decimal")
}
func flows2map(flows []*proxy.Flow, m map[uint64]int) {
for _, flow := range flows {
wayid := (uint64)(flow.WayId)
m[wayid] = int(flow.Speed)
}
}
const TASKNUM = 128
const CACHEDOBJECTS = 1000000
func main() {
flag.Parse()
var transport thrift.TTransport
var err error
isFlowDoneChan := make(chan bool, 1)
wayid2speed := make(map[int64]int)
go getTrafficFlow(flags.ip, flags.port, wayid2speed, isFlowDoneChan)
// make socket
targetServer := flags.ip + ":" + strconv.Itoa(flags.port)
fmt.Println("connect traffic proxy " + targetServer)
transport, err = thrift.NewTSocket(targetServer)
if err != nil {
fmt.Println("Error opening socket:", err)
return
var sources [TASKNUM]chan string
for i := range sources {
sources[i] = make(chan string, CACHEDOBJECTS)
}
go loadWay2NodeidsTable(flags.mappingFile, sources)
isFlowDone := wait4PreConditions(isFlowDoneChan)
if isFlowDone {
dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile)
}
}
// Buffering
transport, err = thrift.NewTFramedTransportFactoryMaxLength(thrift.NewTTransportFactory(), 1024*1024*1024).GetTransport(transport)
if err != nil {
fmt.Println("Error get transport:", err)
return
func wait4PreConditions(flowChan <-chan bool) (bool) {
var isFlowDone bool
loop:
for {
select {
case f := <- flowChan :
if !f {
fmt.Printf("[ERROR] Communication with traffic server failed.\n")
break loop
} else {
isFlowDone = true
break loop
}
defer transport.Close()
if err := transport.Open(); err != nil {
fmt.Println("Error opening transport:", err)
return
}
// protocol encoder&decoder
protocol := thrift.NewTCompactProtocolFactory().GetProtocol(transport)
// create proxy client
client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol))
// get flows
fmt.Println("getting flows")
var defaultCtx = context.Background()
flows, err := client.GetAllFlows(defaultCtx)
if err != nil {
fmt.Println("get flows failed:", err)
return
}
fmt.Printf("got flows count: %d\n", len(flows))
wayid2speed := make(map[uint64]int)
flows2map(flows, wayid2speed)
generateSpeedTable(wayid2speed, flags.mappingFile, flags.csvFile)
return isFlowDone
}

View File

@ -0,0 +1,35 @@
package main
import (
"testing"
)
func setChan2True(c chan<- bool) {
c <- true
}
func setChan2False(c chan<- bool) {
c <- false
}
func TestWait4AllPreconditions1(t *testing.T) {
c1 := make(chan bool, 1)
go setChan2True(c1)
b1 := wait4PreConditions(c1)
if !b1 {
t.Error("Testwait4AllPreconditions1 failed.\n")
}
}
func TestWait4AllPreconditions2(t *testing.T) {
c2 := make(chan bool, 1)
go setChan2False(c2)
b2 := wait4PreConditions(c2)
if b2 {
t.Error("Testwait4AllPreconditions1 failed.\n")
}
}

View File

@ -0,0 +1,127 @@
package main
import (
"os"
"log"
"fmt"
"bufio"
"sync"
"time"
"strings"
"strconv"
)
var tasksWg sync.WaitGroup
var dumpFinishedWg sync.WaitGroup
func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string, outputPath string) {
startTime := time.Now()
if len(wayid2speed) == 0 {
return
}
sink := make(chan string)
startTasks(wayid2speed, sources, sink)
startDump(outputPath, sink)
wait4AllTasksFinished(sink)
endTime := time.Now()
fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds())
}
func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string, sink chan<- string) {
tasksWg.Add(TASKNUM)
for i := 0; i < TASKNUM; i++ {
go task(wayid2speed, sources[i], sink)
}
}
func startDump(outputPath string, sink <-chan string) {
dumpFinishedWg.Add(1)
go write(outputPath, sink)
}
func wait4AllTasksFinished(sink chan string) {
tasksWg.Wait()
close(sink)
dumpFinishedWg.Wait()
}
func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string) {
var err error
for str := range source {
elements := strings.Split(str, ",")
if len(elements) < 3 {
continue
}
var wayid uint64
if wayid, err = strconv.ParseUint(elements[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding wayid, row = %v\n", elements)
continue
}
speedFwd, okFwd := wayid2speed[(int64)(wayid)]
speedBwd, okBwd := wayid2speed[(int64)(-wayid)]
if okFwd || okBwd {
var nodes []string = elements[1:]
for i := 0; (i + 1) < len(nodes); i++ {
var n1, n2 uint64
if n1, err = strconv.ParseUint(nodes[i], 10, 64); err != nil {
fmt.Printf("#Error during decoding nodeid, row = %v\n", elements)
continue
}
if n2, err = strconv.ParseUint(nodes[i+1], 10, 64); err != nil {
fmt.Printf("#Error during decoding nodeid, row = %v\n", elements)
continue
}
if okFwd {
sink <- generateSingleRecord(n1, n2, speedFwd, true)
}
if okBwd {
sink <- generateSingleRecord(n1, n2, speedBwd, false)
}
}
}
}
tasksWg.Done()
}
// format
// if dir = true, means traffic for forward, generate: from, to, speed
// if dir = false, means this speed is for backward flow, generate: to, from, speed
func generateSingleRecord(from, to uint64, speed int, dir bool) (string){
if dir {
return fmt.Sprintf("%d,%d,%d\n", from, to, speed)
} else {
return fmt.Sprintf("%d,%d,%d\n", to, from, speed)
}
}
func write(targetPath string, sink <-chan string) {
outfile, err := os.OpenFile(targetPath, os.O_RDWR|os.O_CREATE, 0755)
defer outfile.Close()
defer outfile.Sync()
if err != nil {
log.Fatal(err)
fmt.Printf("Open output file of %s failed.\n", targetPath)
return
}
fmt.Printf("Open output file of %s succeed.\n", targetPath)
w := bufio.NewWriter(outfile)
defer w.Flush()
for str := range sink {
_, err := w.WriteString(str)
if err != nil {
log.Fatal(err)
return
}
}
dumpFinishedWg.Done()
}

View File

@ -0,0 +1,169 @@
package main
import (
"testing"
"os"
"log"
"fmt"
"strconv"
"io"
"encoding/csv"
"io/ioutil"
"strings"
"reflect"
"github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy"
)
func TestSpeedTableDumper1(t *testing.T) {
// load result into sources
var sources [TASKNUM]chan string
for i := range sources {
sources[i] = make(chan string, 10000)
}
go loadWay2NodeidsTable("./testdata/id-mapping.csv.snappy", sources)
// construct mock traffic
var flows []*proxy.Flow
flows = loadMockTraffic("./testdata/mock-traffic.csv", flows)
wayid2speed := make(map[int64]int)
flows2map(flows, wayid2speed)
dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv")
compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t)
}
func TestGenerateSingleRecord1(t *testing.T) {
str := generateSingleRecord(12345, 54321, 33, true)
if strings.Compare(str, "12345,54321,33\n") != 0 {
t.Error("Test GenerateSingleRecord failed.\n")
}
}
func TestGenerateSingleRecord2(t *testing.T) {
str := generateSingleRecord(12345, 54321, 33, false)
if strings.Compare(str, "54321,12345,33\n") != 0 {
t.Error("Test GenerateSingleRecord failed.\n")
}
}
func loadMockTraffic(trafficPath string, flows []*proxy.Flow) []*proxy.Flow {
// load mock traffic file
mockfile, err := os.Open(trafficPath)
defer mockfile.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open pbf file of %v failed.\n", trafficPath)
return nil
}
fmt.Printf("Open pbf file of %s succeed.\n", trafficPath)
csvr := csv.NewReader(mockfile)
for {
row, err := csvr.Read()
if err != nil {
if err == io.EOF {
err = nil
break
} else {
fmt.Printf("Error during decoding mock traffic, err = %v\n", err)
return nil
}
}
var wayid int64
var speed int64
if wayid, err = strconv.ParseInt(row[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding wayid, row = %v\n", row)
}
if speed, err = strconv.ParseInt(row[1], 10, 32); err != nil {
fmt.Printf("#Error during decoding speed, row = %v\n", row)
}
var flow proxy.Flow
flow.WayId = wayid
flow.Speed = (float64)(speed)
flows = append(flows, &flow)
}
return flows
}
type tNodePair struct {
f, t uint64
}
func loadSpeedCsv(f string, m map[tNodePair]int) {
// load mock traffic file
mockfile, err := os.Open(f)
defer mockfile.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open file of %v failed.\n", f)
return
}
fmt.Printf("Open file of %s succeed.\n", f)
csvr := csv.NewReader(mockfile)
for {
row, err := csvr.Read()
if err != nil {
if err == io.EOF {
err = nil
break
} else {
fmt.Printf("Error during decoding file %s, err = %v\n", f, err)
return
}
}
var from, to uint64
var speed int
if from, err = strconv.ParseUint(row[0], 10, 64); err != nil {
fmt.Printf("#Error during decoding, row = %v\n", row)
return
}
if to, err = strconv.ParseUint(row[1], 10, 64); err != nil {
fmt.Printf("#Error during decoding, row = %v\n", row)
return
}
if speed, err = strconv.Atoi(row[2]); err != nil {
fmt.Printf("#Error during decoding, row = %v\n", row)
return
}
m[tNodePair{from, to}] = speed
}
}
func compareFileContentStable(f1, f2 string, t *testing.T) {
b1, err1 := ioutil.ReadFile(f1)
if err1 != nil {
fmt.Print(err1)
}
str1 := string(b1)
b2, err2 := ioutil.ReadFile(f2)
if err2 != nil {
fmt.Print(err2)
}
str2 := string(b2)
if strings.Compare(str1, str2) != 0 {
t.Error("Compare file content failed\n")
}
}
func compareFileContentUnstable(f1, f2 string, t *testing.T) {
r1 := make(map[tNodePair]int)
loadSpeedCsv(f1, r1)
r2 := make(map[tNodePair]int)
loadSpeedCsv(f2, r2)
eq := reflect.DeepEqual(r1, r2)
if !eq {
t.Error("TestLoadWay2Nodeids failed to generate correct map\n")
}
}

View File

@ -1,6 +1,7 @@
84760891102,19496208102,81
84762609102,244183320001101,87
244183320001101,84762607102,87
244183320001101,84762609102,87
84762607102,244183320001101,87
84760849102,84760850102,47
84760858102,84760846102,59
847608599102,84760858102,59
84762609102,244183320001101,87
244183320001101,84762607102,87

1 84760891102 19496208102 81
2 84762609102 244183320001101 244183320001101 84762609102 87
3 244183320001101 84762607102 84762607102 244183320001101 87
4 84760849102 84760850102 47
5 84760858102 84760846102 59
6 847608599102 84762609102 84760858102 244183320001101 59 87
7 244183320001101 84762607102 87

View File

@ -0,0 +1,4 @@
24418325,84760891102,-65264683000
7,1718000,244098557391999,-244098557393999
11,-1760000,1000
1,-3000,12000
1 24418325,84760891102,-65264683000
2 7,1718000,244098557391999,-244098557393999
3 11,-1760000,1000
4 1,-3000,12000

View File

@ -1,4 +1,4 @@
24418325,84760891102,19496208102
24418332,84762609102,244183320001101,84762607102
24418343,84760849102,84760850102
24418344,84760846102,84760858102,847608599102
24418344,84760846102,84760858102
1 24418325,84760891102,19496208102
2 24418332,84762609102,244183320001101,84762607102
3 24418343,84760849102,84760850102
4 24418344,84760846102,84760858102,847608599102 24418344,84760846102,84760858102

Binary file not shown.

View File

@ -1,4 +1,5 @@
24418325,81
-24418332,87
24418332,87
24418343,47
24418344,-59
-24418344,59
1 24418325 81
2 -24418332 87
3 24418332 87
4 24418343 47
5 24418344 -24418344 -59 59

View File

@ -1,6 +1,7 @@
84760891102,19496208102,81
84762609102,244183320001101,87
244183320001101,84762607102,87
84760849102,84760850102,47
84760858102,84760846102,59
847608599102,84760858102,59
84762609102,244183320001101,87
84760849102,84760850102,47
84760891102,19496208102,81
244183320001101,84762609102,87
244183320001101,84762607102,87
84762607102,244183320001101,87

1 84760891102 84760858102 19496208102 84760846102 81 59
84760891102 19496208102 81
84762609102 244183320001101 87
244183320001101 84762607102 87
84760849102 84760850102 47
1 84760858102 84760858102 84760846102 84760846102 59 59
2 847608599102 84762609102 84760858102 244183320001101 59 87
3 84760849102 84760850102 47
4 84760891102 19496208102 81
5 244183320001101 84762609102 87
6 244183320001101 84762607102 87
7 84762607102 244183320001101 87

View File

@ -0,0 +1,58 @@
package main
import (
"bufio"
"fmt"
"log"
"os"
"time"
"github.com/golang/snappy"
)
func loadWay2NodeidsTable(filepath string, sources [TASKNUM]chan string) {
startTime := time.Now()
data := make(chan string)
go load(filepath, data)
convert(data, sources)
endTime := time.Now()
fmt.Printf("Processing time for loadWay2NodeidsTable takes %f seconds\n", endTime.Sub(startTime).Seconds())
}
func load(mappingPath string, data chan<- string) {
defer close(data)
f, err := os.Open(mappingPath)
defer f.Close()
if err != nil {
log.Fatal(err)
fmt.Printf("Open idsmapping file of %v failed.\n", mappingPath)
return
}
fmt.Printf("Open idsmapping file of %s succeed.\n", mappingPath)
scanner := bufio.NewScanner(snappy.NewReader(f))
for scanner.Scan() {
data <- (scanner.Text())
}
}
// input data format
// wayid1, n1, n2
// wayid2, n3, n4, n5
func convert(data <-chan string, sources [TASKNUM]chan string) {
for i := range sources {
defer close(sources[i])
}
var count int
for str := range data {
chanIndex := count % TASKNUM
count++
sources[chanIndex] <- str
}
}

View File

@ -0,0 +1,64 @@
package main
import (
"testing"
"reflect"
"sync"
)
func TestLoadWay2Nodeids(t *testing.T) {
// load result into sources
var sources [TASKNUM]chan string
for i := range sources {
//fmt.Printf("&&& current i is %d\n", i)
sources[i] = make(chan string, 10000)
}
go loadWay2NodeidsTable("./testdata/id-mapping.csv.snappy", sources)
allWay2NodesChan := make(chan string, 10000)
var wgs sync.WaitGroup
wgs.Add(TASKNUM)
for i:= 0; i < TASKNUM; i++ {
//fmt.Printf("### current i is %d\n", i)
go mergeChannels(sources[i], allWay2NodesChan, &wgs)
}
wgs.Wait()
// dump result into map
way2nodeids := make(map[string]bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for elem := range allWay2NodesChan {
way2nodeids[elem] = true
}
wg.Done()
}()
close(allWay2NodesChan)
wg.Wait()
// test map result
way2nodeidsExpect := make(map[string]bool)
generateMockWay2nodeids(way2nodeidsExpect)
eq := reflect.DeepEqual(way2nodeids, way2nodeidsExpect)
if !eq {
t.Error("TestLoadWay2Nodeids failed to generate correct map\n")
}
}
func mergeChannels(f <-chan string, t chan<- string, w *sync.WaitGroup) {
for elem := range f {
t <- elem
}
w.Done()
}
func generateMockWay2nodeids(way2nodeids map[string]bool) {
way2nodeids["24418325,84760891102,19496208102"] = true
way2nodeids["24418332,84762609102,244183320001101,84762607102"] = true
way2nodeids["24418343,84760849102,84760850102"] = true
way2nodeids["24418344,84760846102,84760858102"] = true
}

View File

@ -0,0 +1,64 @@
package main
import (
"flag"
"strings"
"fmt"
"os"
"io"
"github.com/golang/snappy"
)
const snappySuffix string = ".snappy"
var flags struct {
input string
output string
suffix string
}
func init() {
flag.StringVar(&flags.input, "i", "", "Input file.")
flag.StringVar(&flags.output, "o", "", "Output file.")
}
func main() {
flag.Parse()
inputCompressed := strings.HasSuffix(flags.input, snappySuffix)
outputCompressed := strings.HasSuffix(flags.output, snappySuffix)
if inputCompressed == outputCompressed {
fmt.Printf("Error. Input and output must have one with .snappy suffix and one without.\n")
return
}
fi, err1 := os.Open(flags.input)
if err1 != nil {
fmt.Printf("Open input file failed.\n")
return
}
defer fi.Close()
fo, err2 := os.OpenFile(flags.output, os.O_RDWR|os.O_CREATE, 0755)
if err2 != nil {
fmt.Printf("Open output file failed.\n")
return
}
defer fo.Close()
defer fo.Sync()
buf := make([]byte, 128 * 1024)
if inputCompressed {
_, err := io.CopyBuffer(fo, snappy.NewReader(fi), buf)
if err != nil {
fmt.Printf("Decompression failed\n")
}
} else {
_, err := io.CopyBuffer(snappy.NewWriter(fo), fi, buf)
if err != nil {
fmt.Printf("Compression failed\n")
}
}
}

View File

@ -156,3 +156,4 @@ func main() {
endTime := time.Now()
fmt.Printf("Total processing time for wayid2nodeids-extract takes %f seconds\n", endTime.Sub(startTime).Seconds())
}