diff --git a/docs/design/graph/osrm-release-deployment-pipeline.mmd b/docs/design/graph/osrm-release-deployment-pipeline.mmd new file mode 100644 index 000000000..750ad7349 --- /dev/null +++ b/docs/design/graph/osrm-release-deployment-pipeline.mmd @@ -0,0 +1,18 @@ + +sequenceDiagram + participant U as User + participant J as Jenkins + participant D as DockerRegistry + participant K as Kubernetes + + U ->>+ J: Build OSRM docker + Note over U,J: include or NOT include mapdata + J ->>- D: Push image to registry + J ->> U: Build done + + U ->> K: Trigger deployment by Kubernetes + Loop e.g. every 5 minutes + K ->> K: Blue/Green deploy based on docker image + note over K: update traffic when container startup + end + diff --git a/docs/design/graph/osrm-release-deployment-pipeline.mmd.png b/docs/design/graph/osrm-release-deployment-pipeline.mmd.png new file mode 100644 index 000000000..f76d3dac2 Binary files /dev/null and b/docs/design/graph/osrm-release-deployment-pipeline.mmd.png differ diff --git a/docs/design/graph/osrm-with-telenav-traffic-architecture.mmd b/docs/design/graph/osrm-with-telenav-traffic-architecture.mmd new file mode 100644 index 000000000..bd1450e24 --- /dev/null +++ b/docs/design/graph/osrm-with-telenav-traffic-architecture.mmd @@ -0,0 +1,24 @@ +%% Call below command to convert .mmd to .png +%% Adjust -w or -H if necessary +%% mmdc -p puppeteer-config.json -i osrm-with-telenav-traffic-architecture.mmd -o osrm-with-telenav-traffic-architecture.mmd.png + +graph LR + +Title[OSRM with Telenav Traffic Architecture] +Title-->User +style Title fill:#FFF,stroke:#FFF +linkStyle 0 stroke:#FFF,stroke-width:0; + +User["User"] -- request --> OSRM_ROUTED["osrm-routed"] + +subgraph OSRM Containers +OSRM_ROUTED +OSRM_Traffic_Updater["OSRMTrafficUpdater"] +style OSRM_Traffic_Updater fill:#acbfff,stroke-dasharray: 5, 5 +end + +OSRM_Traffic_Updater -- RPC --> TrafficProxy["TrafficProxy"] +subgraph Traffic Containers +TrafficProxy +style TrafficProxy fill:#acbfff,stroke-dasharray: 5, 5 +end diff --git a/docs/design/graph/osrm-with-telenav-traffic-architecture.mmd.png b/docs/design/graph/osrm-with-telenav-traffic-architecture.mmd.png new file mode 100644 index 000000000..4364811df Binary files /dev/null and b/docs/design/graph/osrm-with-telenav-traffic-architecture.mmd.png differ diff --git a/docs/design/graph/osrm-with-traffic-startup-flow-chart.mmd b/docs/design/graph/osrm-with-traffic-startup-flow-chart.mmd new file mode 100644 index 000000000..5551361cf --- /dev/null +++ b/docs/design/graph/osrm-with-traffic-startup-flow-chart.mmd @@ -0,0 +1,31 @@ +%% Call below command to convert .mmd to .png +%% Adjust -w or -H if necessary +%% mmdc -p puppeteer-config.json -i osrm-with-traffic-startup-flow-chart.mmd -o osrm-with-traffic-startup-flow-chart.mmd.png + +graph TD + +Title[OSRM with Telenav Traffic Container Startup Flow Chart] +Title-->Start +style Title fill:#FFF,stroke:#FFF +linkStyle 0 stroke:#FFF,stroke-width:0; + +Start("Start") --> PrepareCompiledMapdata["Prepare Compile Mapdata

- NA compile mapdata size based on OSM: about 43 GB

- Way 1: Package compiled mapdata inside the docker image

- Way 2: Pull compiled mapdata from network when startup the container "] +style PrepareCompiledMapdata fill:#acbfff,stroke-dasharray: 5, 5 + +PrepareCompiledMapdata --> ConnectTrafficProxy["OSRMTrafficUpdate Connect TrafficProxy by RPC"] + +subgraph OSRM Traffic Updater +ConnectTrafficProxy --> PullLatestTraffic["Pull latest traffic(from node, to node, speed) of full region"] +PullLatestTraffic --> WriteToCSV["Write to traffic.csv(as OSRM format)"] +WriteToCSV --> ExitOSRMTrafficUpdater["Exit ExitOSRMTrafficUpdater"] +end + +subgraph OSRM customize +ExitOSRMTrafficUpdater --> OSRMCustomize["Run osrm-customize on mapdata with traffic.csv"] +end + +subgraph OSRM routed +OSRMCustomize --> OSRMRouted["Run osrm-routed based on customized mapdata"] +end + +OSRMRouted --> End("End") \ No newline at end of file diff --git a/docs/design/graph/osrm-with-traffic-startup-flow-chart.mmd.png b/docs/design/graph/osrm-with-traffic-startup-flow-chart.mmd.png new file mode 100644 index 000000000..42cf5cd67 Binary files /dev/null and b/docs/design/graph/osrm-with-traffic-startup-flow-chart.mmd.png differ diff --git a/docs/design/graph/puppeteer-config.json b/docs/design/graph/puppeteer-config.json new file mode 100644 index 000000000..3201af7b7 --- /dev/null +++ b/docs/design/graph/puppeteer-config.json @@ -0,0 +1,3 @@ +{ + "args": ["--no-sandbox"] +} diff --git a/docs/design/osrm-with-telenav-traffic.md b/docs/design/osrm-with-telenav-traffic.md new file mode 100644 index 000000000..fb0f6a8bc --- /dev/null +++ b/docs/design/osrm-with-telenav-traffic.md @@ -0,0 +1,23 @@ +# OSRM with Telenav Traffic Design (Draft) + +## Architecture +![osrm-with-telenav-traffic-architecture](./graph/osrm-with-telenav-traffic-architecture.mmd.png) + +### OSRMTrafficUpdater(need to implement) +- as client +- connect `TrafficProxy` by `RPC` +- convert contents from `RPC` protocol to `OSRM` required `csv` format, then write to file + +### TrafficProxy(need to implement) +- as server +- provide traffic contents by region +- contents include at least `from node, to node, speed`(both `from node` and `to node` are come from original mapdata) +- known issues/questions: + - can not compile `OSM` mapdata to traffic graph? + + +## OSRM with Traffic Startup Flow +![osrm-with-traffic-startup-flow-chart](./graph/osrm-with-traffic-startup-flow-chart.mmd.png) + +## Release and Deployment Pipeline +![osrm-release-deployment-pipeline](./graph/osrm-release-deployment-pipeline.mmd.png) \ No newline at end of file diff --git a/traffic_updater/.gitignore b/traffic_updater/.gitignore new file mode 100644 index 000000000..19f47cebb --- /dev/null +++ b/traffic_updater/.gitignore @@ -0,0 +1,5 @@ + +*.py[cod] + +traffic.csv +*/traffic.csv diff --git a/traffic_updater/README.md b/traffic_updater/README.md new file mode 100644 index 000000000..82106fb5a --- /dev/null +++ b/traffic_updater/README.md @@ -0,0 +1,37 @@ +# OSRM Traffic Updater +The **OSRM Traffic Updater** is designed for pull traffic data from **Traffic Proxy(Telenav)** then dump to OSRM required `traffic.csv`. Refer to [OSRM with Telenav Traffic Design](../docs/design/osrm-with-telenav-traffic.md) and [OSRM Traffic](https://github.com/Project-OSRM/osrm-backend/wiki/Traffic) for more details. +We have implemented both `Python` and `Go` version. Both of them have same function(pull data then dump to csv), but the `Go` implementation is about **23 times** faster than `Python` implementation. So strongly recommended to use `Go` implementation as preference. +- E.g. `6727490` lines traffic of NA region + - `Go` Implementation: about `9 seconds` + - `Python` Implementation: about `210 seconds` + +## RPC Protocol +See [proxy.thrift](proxy.thrift) for details. + +## Python Implementation +The `python` based implementation has been deprecated due to bad performance. See [Deprecated Python Implementation Codes](https://github.com/Telenav/osrm-backend/blob/b4eb73f2d307fd4dbd8b8610bbc2a68c3b6ab1ae/traffic_updater/python/osrm_traffic_updater.py#L57) if you'd like to see code details. + +## Go Implementation +### Requirements +- `go version go1.12.5 linux/amd64` +- `thrift 0.12.0` + - clone `thrift` from `github.com/apache/thrift`, then checkout branch `0.12.0` +- change `thrift` imports in generated codes `gen-go/proxy` + - `git.apache.org/thrift.git/lib/go/thrift` -> `github.com/apache/thrift/lib/go/thrift` + + +### Usage +```bash +$ cd $GOPATH +$ go install github.com/Telenav/osrm-backend/traffic_updater/go/osrm_traffic_updater +$ ./bin/osrm_traffic_updater -h +Usage of ./bin/osrm_traffic_updater: + -c string + traffic proxy ip address (default "127.0.0.1") + -d use high precision speeds, i.e. decimal (default true) + -f string + OSRM traffic csv file (default "traffic.csv") + -p int + traffic proxy listening port (default 6666) +``` + diff --git a/traffic_updater/go/gen-go/proxy/GoUnusedProtection__.go b/traffic_updater/go/gen-go/proxy/GoUnusedProtection__.go new file mode 100644 index 000000000..701886b7f --- /dev/null +++ b/traffic_updater/go/gen-go/proxy/GoUnusedProtection__.go @@ -0,0 +1,7 @@ +// Autogenerated by Thrift Compiler (0.12.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package proxy + +var GoUnusedProtection__ int; + diff --git a/traffic_updater/go/gen-go/proxy/proxy-consts.go b/traffic_updater/go/gen-go/proxy/proxy-consts.go new file mode 100644 index 000000000..93d2c3c55 --- /dev/null +++ b/traffic_updater/go/gen-go/proxy/proxy-consts.go @@ -0,0 +1,24 @@ +// Autogenerated by Thrift Compiler (0.12.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package proxy + +import ( + "bytes" + "context" + "reflect" + "fmt" + "github.com/apache/thrift/lib/go/thrift" +) + +// (needed to ensure safety because of naive import list construction.) +var _ = thrift.ZERO +var _ = fmt.Printf +var _ = context.Background +var _ = reflect.DeepEqual +var _ = bytes.Equal + + +func init() { +} + diff --git a/traffic_updater/go/gen-go/proxy/proxy.go b/traffic_updater/go/gen-go/proxy/proxy.go new file mode 100644 index 000000000..b93fc4397 --- /dev/null +++ b/traffic_updater/go/gen-go/proxy/proxy.go @@ -0,0 +1,836 @@ +// Autogenerated by Thrift Compiler (0.12.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package proxy + +import ( + "bytes" + "context" + "reflect" + "fmt" + "github.com/apache/thrift/lib/go/thrift" +) + +// (needed to ensure safety because of naive import list construction.) +var _ = thrift.ZERO +var _ = fmt.Printf +var _ = context.Background +var _ = reflect.DeepEqual +var _ = bytes.Equal + +// Attributes: +// - FromId +// - ToId +// - WayId +// - Speed +// - TrafficLevel +type Flow struct { + FromId int64 `thrift:"fromId,1,required" db:"fromId" json:"fromId"` + ToId int64 `thrift:"toId,2,required" db:"toId" json:"toId"` + WayId int64 `thrift:"wayId,3,required" db:"wayId" json:"wayId"` + Speed float64 `thrift:"speed,4,required" db:"speed" json:"speed"` + TrafficLevel int32 `thrift:"trafficLevel,5,required" db:"trafficLevel" json:"trafficLevel"` +} + +func NewFlow() *Flow { + return &Flow{} +} + + +func (p *Flow) GetFromId() int64 { + return p.FromId +} + +func (p *Flow) GetToId() int64 { + return p.ToId +} + +func (p *Flow) GetWayId() int64 { + return p.WayId +} + +func (p *Flow) GetSpeed() float64 { + return p.Speed +} + +func (p *Flow) GetTrafficLevel() int32 { + return p.TrafficLevel +} +func (p *Flow) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetFromId bool = false; + var issetToId bool = false; + var issetWayId bool = false; + var issetSpeed bool = false; + var issetTrafficLevel bool = false; + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 1: + if fieldTypeId == thrift.I64 { + if err := p.ReadField1(iprot); err != nil { + return err + } + issetFromId = true + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + case 2: + if fieldTypeId == thrift.I64 { + if err := p.ReadField2(iprot); err != nil { + return err + } + issetToId = true + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + case 3: + if fieldTypeId == thrift.I64 { + if err := p.ReadField3(iprot); err != nil { + return err + } + issetWayId = true + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + case 4: + if fieldTypeId == thrift.DOUBLE { + if err := p.ReadField4(iprot); err != nil { + return err + } + issetSpeed = true + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + case 5: + if fieldTypeId == thrift.I32 { + if err := p.ReadField5(iprot); err != nil { + return err + } + issetTrafficLevel = true + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetFromId{ + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FromId is not set")); + } + if !issetToId{ + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field ToId is not set")); + } + if !issetWayId{ + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field WayId is not set")); + } + if !issetSpeed{ + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field Speed is not set")); + } + if !issetTrafficLevel{ + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TrafficLevel is not set")); + } + return nil +} + +func (p *Flow) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) +} else { + p.FromId = v +} + return nil +} + +func (p *Flow) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 2: ", err) +} else { + p.ToId = v +} + return nil +} + +func (p *Flow) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) +} else { + p.WayId = v +} + return nil +} + +func (p *Flow) ReadField4(iprot thrift.TProtocol) error { + if v, err := iprot.ReadDouble(); err != nil { + return thrift.PrependError("error reading field 4: ", err) +} else { + p.Speed = v +} + return nil +} + +func (p *Flow) ReadField5(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI32(); err != nil { + return thrift.PrependError("error reading field 5: ", err) +} else { + p.TrafficLevel = v +} + return nil +} + +func (p *Flow) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("Flow"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if p != nil { + if err := p.writeField1(oprot); err != nil { return err } + if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { return err } + if err := p.writeField4(oprot); err != nil { return err } + if err := p.writeField5(oprot); err != nil { return err } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *Flow) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("fromId", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:fromId: ", p), err) } + if err := oprot.WriteI64(int64(p.FromId)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.fromId (1) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:fromId: ", p), err) } + return err +} + +func (p *Flow) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("toId", thrift.I64, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:toId: ", p), err) } + if err := oprot.WriteI64(int64(p.ToId)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.toId (2) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:toId: ", p), err) } + return err +} + +func (p *Flow) writeField3(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("wayId", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:wayId: ", p), err) } + if err := oprot.WriteI64(int64(p.WayId)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.wayId (3) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:wayId: ", p), err) } + return err +} + +func (p *Flow) writeField4(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("speed", thrift.DOUBLE, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:speed: ", p), err) } + if err := oprot.WriteDouble(float64(p.Speed)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.speed (4) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:speed: ", p), err) } + return err +} + +func (p *Flow) writeField5(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("trafficLevel", thrift.I32, 5); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:trafficLevel: ", p), err) } + if err := oprot.WriteI32(int32(p.TrafficLevel)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.trafficLevel (5) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 5:trafficLevel: ", p), err) } + return err +} + +func (p *Flow) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("Flow(%+v)", *p) +} + +type ProxyService interface { + GetAllFlows(ctx context.Context) (r []*Flow, err error) + // Parameters: + // - WayId + GetFlowById(ctx context.Context, wayId int64) (r *Flow, err error) +} + +type ProxyServiceClient struct { + c thrift.TClient +} + +func NewProxyServiceClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *ProxyServiceClient { + return &ProxyServiceClient{ + c: thrift.NewTStandardClient(f.GetProtocol(t), f.GetProtocol(t)), + } +} + +func NewProxyServiceClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol) *ProxyServiceClient { + return &ProxyServiceClient{ + c: thrift.NewTStandardClient(iprot, oprot), + } +} + +func NewProxyServiceClient(c thrift.TClient) *ProxyServiceClient { + return &ProxyServiceClient{ + c: c, + } +} + +func (p *ProxyServiceClient) Client_() thrift.TClient { + return p.c +} +func (p *ProxyServiceClient) GetAllFlows(ctx context.Context) (r []*Flow, err error) { + var _args0 ProxyServiceGetAllFlowsArgs + var _result1 ProxyServiceGetAllFlowsResult + if err = p.Client_().Call(ctx, "getAllFlows", &_args0, &_result1); err != nil { + return + } + return _result1.GetSuccess(), nil +} + +// Parameters: +// - WayId +func (p *ProxyServiceClient) GetFlowById(ctx context.Context, wayId int64) (r *Flow, err error) { + var _args2 ProxyServiceGetFlowByIdArgs + _args2.WayId = wayId + var _result3 ProxyServiceGetFlowByIdResult + if err = p.Client_().Call(ctx, "getFlowById", &_args2, &_result3); err != nil { + return + } + return _result3.GetSuccess(), nil +} + +type ProxyServiceProcessor struct { + processorMap map[string]thrift.TProcessorFunction + handler ProxyService +} + +func (p *ProxyServiceProcessor) AddToProcessorMap(key string, processor thrift.TProcessorFunction) { + p.processorMap[key] = processor +} + +func (p *ProxyServiceProcessor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) { + processor, ok = p.processorMap[key] + return processor, ok +} + +func (p *ProxyServiceProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { + return p.processorMap +} + +func NewProxyServiceProcessor(handler ProxyService) *ProxyServiceProcessor { + + self4 := &ProxyServiceProcessor{handler:handler, processorMap:make(map[string]thrift.TProcessorFunction)} + self4.processorMap["getAllFlows"] = &proxyServiceProcessorGetAllFlows{handler:handler} + self4.processorMap["getFlowById"] = &proxyServiceProcessorGetFlowById{handler:handler} +return self4 +} + +func (p *ProxyServiceProcessor) Process(ctx context.Context, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + name, _, seqId, err := iprot.ReadMessageBegin() + if err != nil { return false, err } + if processor, ok := p.GetProcessorFunction(name); ok { + return processor.Process(ctx, seqId, iprot, oprot) + } + iprot.Skip(thrift.STRUCT) + iprot.ReadMessageEnd() + x5 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function " + name) + oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) + x5.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush(ctx) + return false, x5 + +} + +type proxyServiceProcessorGetAllFlows struct { + handler ProxyService +} + +func (p *proxyServiceProcessorGetAllFlows) Process(ctx context.Context, seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := ProxyServiceGetAllFlowsArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) + oprot.WriteMessageBegin("getAllFlows", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush(ctx) + return false, err + } + + iprot.ReadMessageEnd() + result := ProxyServiceGetAllFlowsResult{} +var retval []*Flow + var err2 error + if retval, err2 = p.handler.GetAllFlows(ctx); err2 != nil { + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing getAllFlows: " + err2.Error()) + oprot.WriteMessageBegin("getAllFlows", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush(ctx) + return true, err2 + } else { + result.Success = retval +} + if err2 = oprot.WriteMessageBegin("getAllFlows", thrift.REPLY, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(ctx); err == nil && err2 != nil { + err = err2 + } + if err != nil { + return + } + return true, err +} + +type proxyServiceProcessorGetFlowById struct { + handler ProxyService +} + +func (p *proxyServiceProcessorGetFlowById) Process(ctx context.Context, seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := ProxyServiceGetFlowByIdArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) + oprot.WriteMessageBegin("getFlowById", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush(ctx) + return false, err + } + + iprot.ReadMessageEnd() + result := ProxyServiceGetFlowByIdResult{} +var retval *Flow + var err2 error + if retval, err2 = p.handler.GetFlowById(ctx, args.WayId); err2 != nil { + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing getFlowById: " + err2.Error()) + oprot.WriteMessageBegin("getFlowById", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush(ctx) + return true, err2 + } else { + result.Success = retval +} + if err2 = oprot.WriteMessageBegin("getFlowById", thrift.REPLY, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(ctx); err == nil && err2 != nil { + err = err2 + } + if err != nil { + return + } + return true, err +} + + +// HELPER FUNCTIONS AND STRUCTURES + +type ProxyServiceGetAllFlowsArgs struct { +} + +func NewProxyServiceGetAllFlowsArgs() *ProxyServiceGetAllFlowsArgs { + return &ProxyServiceGetAllFlowsArgs{} +} + +func (p *ProxyServiceGetAllFlowsArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *ProxyServiceGetAllFlowsArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("getAllFlows_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if p != nil { + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *ProxyServiceGetAllFlowsArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("ProxyServiceGetAllFlowsArgs(%+v)", *p) +} + +// Attributes: +// - Success +type ProxyServiceGetAllFlowsResult struct { + Success []*Flow `thrift:"success,0" db:"success" json:"success,omitempty"` +} + +func NewProxyServiceGetAllFlowsResult() *ProxyServiceGetAllFlowsResult { + return &ProxyServiceGetAllFlowsResult{} +} + +var ProxyServiceGetAllFlowsResult_Success_DEFAULT []*Flow + +func (p *ProxyServiceGetAllFlowsResult) GetSuccess() []*Flow { + return p.Success +} +func (p *ProxyServiceGetAllFlowsResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *ProxyServiceGetAllFlowsResult) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 0: + if fieldTypeId == thrift.LIST { + if err := p.ReadField0(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *ProxyServiceGetAllFlowsResult) ReadField0(iprot thrift.TProtocol) error { + _, size, err := iprot.ReadListBegin() + if err != nil { + return thrift.PrependError("error reading list begin: ", err) + } + tSlice := make([]*Flow, 0, size) + p.Success = tSlice + for i := 0; i < size; i ++ { + _elem6 := &Flow{} + if err := _elem6.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", _elem6), err) + } + p.Success = append(p.Success, _elem6) + } + if err := iprot.ReadListEnd(); err != nil { + return thrift.PrependError("error reading list end: ", err) + } + return nil +} + +func (p *ProxyServiceGetAllFlowsResult) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("getAllFlows_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if p != nil { + if err := p.writeField0(oprot); err != nil { return err } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *ProxyServiceGetAllFlowsResult) writeField0(oprot thrift.TProtocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.LIST, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) } + if err := oprot.WriteListBegin(thrift.STRUCT, len(p.Success)); err != nil { + return thrift.PrependError("error writing list begin: ", err) + } + for _, v := range p.Success { + if err := v.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", v), err) + } + } + if err := oprot.WriteListEnd(); err != nil { + return thrift.PrependError("error writing list end: ", err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) } + } + return err +} + +func (p *ProxyServiceGetAllFlowsResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("ProxyServiceGetAllFlowsResult(%+v)", *p) +} + +// Attributes: +// - WayId +type ProxyServiceGetFlowByIdArgs struct { + WayId int64 `thrift:"wayId,1" db:"wayId" json:"wayId"` +} + +func NewProxyServiceGetFlowByIdArgs() *ProxyServiceGetFlowByIdArgs { + return &ProxyServiceGetFlowByIdArgs{} +} + + +func (p *ProxyServiceGetFlowByIdArgs) GetWayId() int64 { + return p.WayId +} +func (p *ProxyServiceGetFlowByIdArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 1: + if fieldTypeId == thrift.I64 { + if err := p.ReadField1(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *ProxyServiceGetFlowByIdArgs) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) +} else { + p.WayId = v +} + return nil +} + +func (p *ProxyServiceGetFlowByIdArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("getFlowById_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if p != nil { + if err := p.writeField1(oprot); err != nil { return err } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *ProxyServiceGetFlowByIdArgs) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("wayId", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:wayId: ", p), err) } + if err := oprot.WriteI64(int64(p.WayId)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.wayId (1) field write error: ", p), err) } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:wayId: ", p), err) } + return err +} + +func (p *ProxyServiceGetFlowByIdArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("ProxyServiceGetFlowByIdArgs(%+v)", *p) +} + +// Attributes: +// - Success +type ProxyServiceGetFlowByIdResult struct { + Success *Flow `thrift:"success,0" db:"success" json:"success,omitempty"` +} + +func NewProxyServiceGetFlowByIdResult() *ProxyServiceGetFlowByIdResult { + return &ProxyServiceGetFlowByIdResult{} +} + +var ProxyServiceGetFlowByIdResult_Success_DEFAULT *Flow +func (p *ProxyServiceGetFlowByIdResult) GetSuccess() *Flow { + if !p.IsSetSuccess() { + return ProxyServiceGetFlowByIdResult_Success_DEFAULT + } +return p.Success +} +func (p *ProxyServiceGetFlowByIdResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *ProxyServiceGetFlowByIdResult) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + switch fieldId { + case 0: + if fieldTypeId == thrift.STRUCT { + if err := p.ReadField0(iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *ProxyServiceGetFlowByIdResult) ReadField0(iprot thrift.TProtocol) error { + p.Success = &Flow{} + if err := p.Success.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err) + } + return nil +} + +func (p *ProxyServiceGetFlowByIdResult) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("getFlowById_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if p != nil { + if err := p.writeField0(oprot); err != nil { return err } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *ProxyServiceGetFlowByIdResult) writeField0(oprot thrift.TProtocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) } + if err := p.Success.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) } + } + return err +} + +func (p *ProxyServiceGetFlowByIdResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("ProxyServiceGetFlowByIdResult(%+v)", *p) +} + + diff --git a/traffic_updater/go/gen-go/proxy/proxy_service-remote/proxy_service-remote.go b/traffic_updater/go/gen-go/proxy/proxy_service-remote/proxy_service-remote.go new file mode 100755 index 000000000..b85471185 --- /dev/null +++ b/traffic_updater/go/gen-go/proxy/proxy_service-remote/proxy_service-remote.go @@ -0,0 +1,176 @@ +// Autogenerated by Thrift Compiler (0.12.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package main + +import ( + "context" + "flag" + "fmt" + "math" + "net" + "net/url" + "os" + "strconv" + "strings" + "github.com/apache/thrift/lib/go/thrift" + "proxy" +) + + +func Usage() { + fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:") + flag.PrintDefaults() + fmt.Fprintln(os.Stderr, "\nFunctions:") + fmt.Fprintln(os.Stderr, " getAllFlows()") + fmt.Fprintln(os.Stderr, " Flow getFlowById(i64 wayId)") + fmt.Fprintln(os.Stderr) + os.Exit(0) +} + +type httpHeaders map[string]string + +func (h httpHeaders) String() string { + var m map[string]string = h + return fmt.Sprintf("%s", m) +} + +func (h httpHeaders) Set(value string) error { + parts := strings.Split(value, ": ") + if len(parts) != 2 { + return fmt.Errorf("header should be of format 'Key: Value'") + } + h[parts[0]] = parts[1] + return nil +} + +func main() { + flag.Usage = Usage + var host string + var port int + var protocol string + var urlString string + var framed bool + var useHttp bool + headers := make(httpHeaders) + var parsedUrl *url.URL + var trans thrift.TTransport + _ = strconv.Atoi + _ = math.Abs + flag.Usage = Usage + flag.StringVar(&host, "h", "localhost", "Specify host and port") + flag.IntVar(&port, "p", 9090, "Specify port") + flag.StringVar(&protocol, "P", "binary", "Specify the protocol (binary, compact, simplejson, json)") + flag.StringVar(&urlString, "u", "", "Specify the url") + flag.BoolVar(&framed, "framed", false, "Use framed transport") + flag.BoolVar(&useHttp, "http", false, "Use http") + flag.Var(headers, "H", "Headers to set on the http(s) request (e.g. -H \"Key: Value\")") + flag.Parse() + + if len(urlString) > 0 { + var err error + parsedUrl, err = url.Parse(urlString) + if err != nil { + fmt.Fprintln(os.Stderr, "Error parsing URL: ", err) + flag.Usage() + } + host = parsedUrl.Host + useHttp = len(parsedUrl.Scheme) <= 0 || parsedUrl.Scheme == "http" || parsedUrl.Scheme == "https" + } else if useHttp { + _, err := url.Parse(fmt.Sprint("http://", host, ":", port)) + if err != nil { + fmt.Fprintln(os.Stderr, "Error parsing URL: ", err) + flag.Usage() + } + } + + cmd := flag.Arg(0) + var err error + if useHttp { + trans, err = thrift.NewTHttpClient(parsedUrl.String()) + if len(headers) > 0 { + httptrans := trans.(*thrift.THttpClient) + for key, value := range headers { + httptrans.SetHeader(key, value) + } + } + } else { + portStr := fmt.Sprint(port) + if strings.Contains(host, ":") { + host, portStr, err = net.SplitHostPort(host) + if err != nil { + fmt.Fprintln(os.Stderr, "error with host:", err) + os.Exit(1) + } + } + trans, err = thrift.NewTSocket(net.JoinHostPort(host, portStr)) + if err != nil { + fmt.Fprintln(os.Stderr, "error resolving address:", err) + os.Exit(1) + } + if framed { + trans = thrift.NewTFramedTransport(trans) + } + } + if err != nil { + fmt.Fprintln(os.Stderr, "Error creating transport", err) + os.Exit(1) + } + defer trans.Close() + var protocolFactory thrift.TProtocolFactory + switch protocol { + case "compact": + protocolFactory = thrift.NewTCompactProtocolFactory() + break + case "simplejson": + protocolFactory = thrift.NewTSimpleJSONProtocolFactory() + break + case "json": + protocolFactory = thrift.NewTJSONProtocolFactory() + break + case "binary", "": + protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() + break + default: + fmt.Fprintln(os.Stderr, "Invalid protocol specified: ", protocol) + Usage() + os.Exit(1) + } + iprot := protocolFactory.GetProtocol(trans) + oprot := protocolFactory.GetProtocol(trans) + client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(iprot, oprot)) + if err := trans.Open(); err != nil { + fmt.Fprintln(os.Stderr, "Error opening socket to ", host, ":", port, " ", err) + os.Exit(1) + } + + switch cmd { + case "getAllFlows": + if flag.NArg() - 1 != 0 { + fmt.Fprintln(os.Stderr, "GetAllFlows requires 0 args") + flag.Usage() + } + fmt.Print(client.GetAllFlows(context.Background())) + fmt.Print("\n") + break + case "getFlowById": + if flag.NArg() - 1 != 1 { + fmt.Fprintln(os.Stderr, "GetFlowById requires 1 args") + flag.Usage() + } + argvalue0, err7 := (strconv.ParseInt(flag.Arg(1), 10, 64)) + if err7 != nil { + Usage() + return + } + value0 := argvalue0 + fmt.Print(client.GetFlowById(context.Background(), value0)) + fmt.Print("\n") + break + case "": + Usage() + break + default: + fmt.Fprintln(os.Stderr, "Invalid function ", cmd) + } +} diff --git a/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go new file mode 100644 index 000000000..f6853b1f3 --- /dev/null +++ b/traffic_updater/go/osrm_traffic_updater/osrm_traffic_updater.go @@ -0,0 +1,127 @@ +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "os" + "strconv" + "time" + + "github.com/Telenav/osrm-backend/traffic_updater/go/gen-go/proxy" + "github.com/apache/thrift/lib/go/thrift" +) + +var flags struct { + port int + ip string + csvFile string + highPrecision bool +} + +func init() { + flag.IntVar(&flags.port, "p", 6666, "traffic proxy listening port") + flag.StringVar(&flags.ip, "c", "127.0.0.1", "traffic proxy ip address") + flag.StringVar(&flags.csvFile, "f", "traffic.csv", "OSRM traffic csv file") + flag.BoolVar(&flags.highPrecision, "d", true, "use high precision speeds, i.e. decimal") +} + +func dumpFlowsToCsv(csvFile string, flows []*proxy.Flow) { + + if _, err := os.Stat(csvFile); err == nil { + // csvFile exists, remove it + rmErr := os.Remove(csvFile) + if rmErr != nil { + fmt.Println(rmErr) + return + } + } + + f, err := os.OpenFile(csvFile, os.O_RDWR|os.O_CREATE, 0755) + if err != nil { + fmt.Println(err) + return + } + defer f.Close() + writer := bufio.NewWriter(f) + + for i, flow := range flows { + var osrmTrafficLine string + if flags.highPrecision { + osrmTrafficLine = fmt.Sprintf("%d,%d,%f\n", flow.FromId, flow.ToId, flow.Speed) + } else { + osrmTrafficLine = fmt.Sprintf("%d,%d,%d\n", flow.FromId, flow.ToId, int(flow.Speed)) + } + + // print first 10 lines for debug + if i < 10 { + fmt.Printf("[ %d ] %v\n", i, flow) + fmt.Printf("[ %d ] %s\n", i, osrmTrafficLine) + } + + // write to csv + _, err := writer.WriteString(osrmTrafficLine) + if err != nil { + fmt.Println(err) + return + } + } + writer.Flush() + f.Sync() + fmt.Printf("total wrote to %s count: %d\n", csvFile, len(flows)) +} + +func main() { + flag.Parse() + + var transport thrift.TTransport + var err error + + // make socket + targetServer := flags.ip + ":" + strconv.Itoa(flags.port) + fmt.Println("connect traffic proxy " + targetServer) + transport, err = thrift.NewTSocket(targetServer) + if err != nil { + fmt.Println("Error opening socket:", err) + return + } + + // Buffering + transport, err = thrift.NewTFramedTransportFactoryMaxLength(thrift.NewTTransportFactory(), 1024*1024*1024).GetTransport(transport) + if err != nil { + fmt.Println("Error get transport:", err) + return + } + defer transport.Close() + if err := transport.Open(); err != nil { + fmt.Println("Error opening transport:", err) + return + } + + // protocol encoder&decoder + protocol := thrift.NewTCompactProtocolFactory().GetProtocol(transport) + + // create proxy client + client := proxy.NewProxyServiceClient(thrift.NewTStandardClient(protocol, protocol)) + + // get flows + startTime := time.Now() + fmt.Println("getting flows") + var defaultCtx = context.Background() + flows, err := client.GetAllFlows(defaultCtx) + if err != nil { + fmt.Println("get flows failed:", err) + return + } + fmt.Printf("got flows count: %d\n", len(flows)) + afterGotFlowTime := time.Now() + fmt.Printf("get flows time used: %f seconds\n", afterGotFlowTime.Sub(startTime).Seconds()) + + // dump to csv + fmt.Println("dump flows to: " + flags.csvFile) + dumpFlowsToCsv(flags.csvFile, flows) + endTime := time.Now() + fmt.Printf("dump csv time used: %f seconds\n", endTime.Sub(afterGotFlowTime).Seconds()) + +} diff --git a/traffic_updater/proxy.thrift b/traffic_updater/proxy.thrift new file mode 100644 index 000000000..6bc259f2c --- /dev/null +++ b/traffic_updater/proxy.thrift @@ -0,0 +1,13 @@ + +struct Flow { + 1: required i64 fromId; + 2: required i64 toId; + 3: required i64 wayId; + 4: required double speed; + 5: required i32 trafficLevel; +} + +service ProxyService { + list getAllFlows() + Flow getFlowById(1:i64 wayId) +}