Further refactorings

This commit is contained in:
Dennis Luxen 2012-11-12 10:59:23 +01:00
parent ac15c787cc
commit 90b07cc6b0

View File

@ -39,7 +39,7 @@ or see http://www.gnu.org/licenses/agpl.txt.
class PBFParser : public BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> { class PBFParser : public BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> {
typedef BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> super; // typedef BaseParser<ExtractorCallbacks, _Node, _RawRestrictionContainer, _Way> super;
enum EntityType { enum EntityType {
TypeNode = 1, TypeNode = 1,
@ -78,13 +78,10 @@ public:
std::cerr << fileName << ": File not found." << std::endl; std::cerr << fileName << ": File not found." << std::endl;
} }
#ifndef NDEBUG
blockCount = 0; blockCount = 0;
groupCount = 0; groupCount = 0;
#endif
//Dummy initialization
// wayCallback = NULL;
// nodeCallback = NULL;
// restrictionCallback = NULL;
} }
void RegisterCallbacks(ExtractorCallbacks * em) { void RegisterCallbacks(ExtractorCallbacks * em) {
@ -107,12 +104,11 @@ public:
google::protobuf::ShutdownProtobufLibrary(); google::protobuf::ShutdownProtobufLibrary();
#ifndef NDEBUG #ifndef NDEBUG
std::cout << "[info] blocks: " << blockCount << std::endl; DEBUG("parsed " << blockCount << " blocks from pbf with " << groupCount << " groups");
std::cout << "[info] groups: " << groupCount << std::endl;
#endif #endif
} }
bool Init() { inline bool Init() {
_ThreadData initData; _ThreadData initData;
/** read Header */ /** read Header */
if(!readPBFBlobHeader(input, &initData)) { if(!readPBFBlobHeader(input, &initData)) {
@ -125,12 +121,12 @@ public:
return false; return false;
} }
for(int i = 0; i < initData.PBFHeaderBlock.required_features_size(); ++i) { for(int i = 0, featureSize = initData.PBFHeaderBlock.required_features_size(); i < featureSize; ++i) {
const std::string& feature = initData.PBFHeaderBlock.required_features( i ); const std::string& feature = initData.PBFHeaderBlock.required_features( i );
bool supported = false; bool supported = false;
if ( feature == "OsmSchema-V0.6" ) if ( "OsmSchema-V0.6" == feature )
supported = true; supported = true;
else if ( feature == "DenseNodes" ) else if ( "DenseNodes" == feature )
supported = true; supported = true;
if ( !supported ) { if ( !supported ) {
@ -144,7 +140,7 @@ public:
return true; return true;
} }
void ReadData() { inline void ReadData() {
bool keepRunning = true; bool keepRunning = true;
do { do {
_ThreadData *threadData = new _ThreadData(); _ThreadData *threadData = new _ThreadData();
@ -159,19 +155,19 @@ public:
} while(keepRunning); } while(keepRunning);
} }
void ParseData() { inline void ParseData() {
while (1) { while (1) {
_ThreadData *threadData; _ThreadData *threadData;
threadDataQueue->wait_and_pop(threadData); threadDataQueue->wait_and_pop(threadData);
if (threadData == NULL) { if (threadData == NULL) {
cout << "Parse Data Thread Finished" << endl; INFO("Parse Data Thread Finished");
threadDataQueue->push(NULL); // Signal end of data for other threads threadDataQueue->push(NULL); // Signal end of data for other threads
break; break;
} }
loadBlock(threadData); loadBlock(threadData);
for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup_size(); i++) { for(int i = 0, groupSize = threadData->PBFprimitiveBlock.primitivegroup_size(); i < groupSize; ++i) {
threadData->currentGroupID = i; threadData->currentGroupID = i;
loadGroup(threadData); loadGroup(threadData);
@ -190,7 +186,7 @@ public:
} }
} }
bool Parse() { inline bool Parse() {
// Start the read and parse threads // Start the read and parse threads
boost::thread readThread(boost::bind(&PBFParser::ReadData, this)); boost::thread readThread(boost::bind(&PBFParser::ReadData, this));
@ -206,35 +202,38 @@ public:
private: private:
void parseDenseNode(_ThreadData * threadData) { inline void parseDenseNode(_ThreadData * threadData) {
const OSMPBF::DenseNodes& dense = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).dense(); const OSMPBF::DenseNodes& dense = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).dense();
int denseTagIndex = 0; int denseTagIndex = 0;
int m_lastDenseID = 0; int m_lastDenseID = 0;
int m_lastDenseLatitude = 0; int m_lastDenseLatitude = 0;
int m_lastDenseLongitude = 0; int m_lastDenseLongitude = 0;
for(int i = 0; i < dense.id_size(); i++) { ImportNode n;
//TODO: abbruchschwellenwert initialisieren
for(int i = 0, idSize = dense.id_size(); i < idSize; ++i) {
n.Clear();
m_lastDenseID += dense.id( i ); m_lastDenseID += dense.id( i );
m_lastDenseLatitude += dense.lat( i ); m_lastDenseLatitude += dense.lat( i );
m_lastDenseLongitude += dense.lon( i ); m_lastDenseLongitude += dense.lon( i );
ImportNode n;
n.id = m_lastDenseID; n.id = m_lastDenseID;
n.lat = 100000*( ( double ) m_lastDenseLatitude * threadData->PBFprimitiveBlock.granularity() +threadData-> PBFprimitiveBlock.lat_offset() ) / NANO; n.lat = 100000*( ( double ) m_lastDenseLatitude * threadData->PBFprimitiveBlock.granularity() +threadData-> PBFprimitiveBlock.lat_offset() ) / NANO;
n.lon = 100000*( ( double ) m_lastDenseLongitude * threadData->PBFprimitiveBlock.granularity() + threadData->PBFprimitiveBlock.lon_offset() ) / NANO; n.lon = 100000*( ( double ) m_lastDenseLongitude * threadData->PBFprimitiveBlock.granularity() + threadData->PBFprimitiveBlock.lon_offset() ) / NANO;
while (denseTagIndex < dense.keys_vals_size()) { while (denseTagIndex < dense.keys_vals_size()) {
int tagValue = dense.keys_vals( denseTagIndex ); const int tagValue = dense.keys_vals( denseTagIndex );
if(tagValue == 0) { if(tagValue == 0) {
denseTagIndex++; ++denseTagIndex;
break; break;
} }
int keyValue = dense.keys_vals ( denseTagIndex+1 ); const int keyValue = dense.keys_vals ( denseTagIndex+1 );
std::string key = threadData->PBFprimitiveBlock.stringtable().s(tagValue).data(); const std::string & key = threadData->PBFprimitiveBlock.stringtable().s(tagValue).data();
std::string value = threadData->PBFprimitiveBlock.stringtable().s(keyValue).data(); const std::string & value = threadData->PBFprimitiveBlock.stringtable().s(keyValue).data();
n.keyVals.Add(key, value); n.keyVals.Add(key, value);
denseTagIndex += 2; denseTagIndex += 2;
} }
//TODO: store in container and process in bulk
/** Pass the unpacked node to the LUA call back **/ /** Pass the unpacked node to the LUA call back **/
try { try {
luabind::call_function<int>( luabind::call_function<int>(
@ -255,24 +254,26 @@ private:
} }
} }
void parseNode(_ThreadData * ) { inline void parseNode(_ThreadData * ) {
ERR("Parsing of simple nodes not supported. PBF should use dense nodes"); ERR("Parsing of simple nodes not supported. PBF should use dense nodes");
// _Node n;
// if(!(*nodeCallback)(n))
// std::cerr << "[PBFParser] simple node not parsed" << std::endl;
} }
void parseRelation(_ThreadData * threadData) { inline void parseRelation(_ThreadData * threadData) {
//TODO: leave early, if relatio is not a restriction
//TODO: reuse rawRestriction container
const OSMPBF::PrimitiveGroup& group = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ); const OSMPBF::PrimitiveGroup& group = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID );
for(int i = 0; i < group.relations_size(); i++ ) { for(int i = 0; i < group.relations_size(); ++i ) {
const OSMPBF::Relation& inputRelation = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).relations(i); const OSMPBF::Relation& inputRelation = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).relations(i);
bool isRestriction = false; bool isRestriction = false;
bool isOnlyRestriction = false; bool isOnlyRestriction = false;
for(int k = 0; k < inputRelation.keys_size(); k++) { for(int k = 0, endOfKeys = inputRelation.keys_size(); k < endOfKeys; ++k) {
const std::string key = threadData->PBFprimitiveBlock.stringtable().s(inputRelation.keys(k)); const std::string & key = threadData->PBFprimitiveBlock.stringtable().s(inputRelation.keys(k));
const std::string val = threadData->PBFprimitiveBlock.stringtable().s(inputRelation.vals(k)); const std::string & val = threadData->PBFprimitiveBlock.stringtable().s(inputRelation.vals(k));
if ("type" == key && "restriction" == val) { if ("type" == key) {
if( "restriction" == val)
isRestriction = true; isRestriction = true;
else
break;
} }
if ("restriction" == key) { if ("restriction" == key) {
if(val.find("only_") == 0) if(val.find("only_") == 0)
@ -283,8 +284,8 @@ private:
if(isRestriction) { if(isRestriction) {
long long lastRef = 0; long long lastRef = 0;
_RawRestrictionContainer currentRestrictionContainer(isOnlyRestriction); _RawRestrictionContainer currentRestrictionContainer(isOnlyRestriction);
for(int rolesIndex = 0; rolesIndex < inputRelation.roles_sid_size(); rolesIndex++) { for(int rolesIndex = 0; rolesIndex < inputRelation.roles_sid_size(); ++rolesIndex) {
string role(threadData->PBFprimitiveBlock.stringtable().s( inputRelation.roles_sid( rolesIndex ) ).data()); std::string role(threadData->PBFprimitiveBlock.stringtable().s( inputRelation.roles_sid( rolesIndex ) ).data());
lastRef += inputRelation.memids(rolesIndex); lastRef += inputRelation.memids(rolesIndex);
if(false == ("from" == role || "to" == role || "via" == role)) { if(false == ("from" == role || "to" == role || "via" == role)) {
@ -320,7 +321,7 @@ private:
break; break;
default: //should not happen default: //should not happen
cout << "unknown"; //cout << "unknown";
assert(false); assert(false);
break; break;
} }
@ -336,11 +337,12 @@ private:
} }
} }
void parseWay(_ThreadData * threadData) { inline void parseWay(_ThreadData * threadData) {
if( threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size() > 0) { if( threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size() > 0) {
for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size(); i++) {
const OSMPBF::Way& inputWay = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways( i );
_Way w; _Way w;
for(int i = 0; i < threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways_size(); ++i) {
w.Clear();
const OSMPBF::Way& inputWay = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ).ways( i );
w.id = inputWay.id(); w.id = inputWay.id();
unsigned pathNode(0); unsigned pathNode(0);
for(int i = 0; i < inputWay.refs_size(); ++i) { for(int i = 0; i < inputWay.refs_size(); ++i) {
@ -349,11 +351,13 @@ private:
} }
assert(inputWay.keys_size() == inputWay.vals_size()); assert(inputWay.keys_size() == inputWay.vals_size());
for(int i = 0; i < inputWay.keys_size(); ++i) { for(int i = 0; i < inputWay.keys_size(); ++i) {
const std::string key = threadData->PBFprimitiveBlock.stringtable().s(inputWay.keys(i)); const std::string & key = threadData->PBFprimitiveBlock.stringtable().s(inputWay.keys(i));
const std::string val = threadData->PBFprimitiveBlock.stringtable().s(inputWay.vals(i)); const std::string & val = threadData->PBFprimitiveBlock.stringtable().s(inputWay.vals(i));
w.keyVals.Add(key, val); w.keyVals.Add(key, val);
} }
//TODO: store in container and process in bulk
/** Pass the unpacked way to the LUA call back **/ /** Pass the unpacked way to the LUA call back **/
try { try {
luabind::call_function<int>( luabind::call_function<int>(
@ -366,19 +370,21 @@ private:
std::cerr << "[PBFParser] way not parsed" << std::endl; std::cerr << "[PBFParser] way not parsed" << std::endl;
} }
} catch (const luabind::error &er) { } catch (const luabind::error &er) {
cerr << er.what() << endl;
lua_State* Ler=er.state(); lua_State* Ler=er.state();
report_errors(Ler, -1); report_errors(Ler, -1);
ERR(er.what());
} }
catch (...) { catch (...) {
cerr<<"Unknown error!"<<endl; ERR("Unknown error!");
} }
} }
} }
} }
void loadGroup(_ThreadData * threadData) { inline void loadGroup(_ThreadData * threadData) {
groupCount++; #ifndef NDEBUG
++groupCount;
#endif
const OSMPBF::PrimitiveGroup& group = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID ); const OSMPBF::PrimitiveGroup& group = threadData->PBFprimitiveBlock.primitivegroup( threadData->currentGroupID );
threadData->entityTypeIndicator = 0; threadData->entityTypeIndicator = 0;
@ -398,20 +404,22 @@ private:
assert( threadData->entityTypeIndicator != 0 ); assert( threadData->entityTypeIndicator != 0 );
} }
void loadBlock(_ThreadData * threadData) { inline void loadBlock(_ThreadData * threadData) {
#ifndef NDEBUG
++blockCount; ++blockCount;
#endif
threadData->currentGroupID = 0; threadData->currentGroupID = 0;
threadData->currentEntityID = 0; threadData->currentEntityID = 0;
} }
/* Reverses Network Byte Order into something usable */ /* Reverses Network Byte Order into something usable */
inline unsigned swapEndian(unsigned x) { inline unsigned swapEndian(unsigned x) const {
if(getMachineEndianness() == LittleEndian) if(getMachineEndianness() == LittleEndian)
return ( (x>>24) | ((x<<8) & 0x00FF0000) | ((x>>8) & 0x0000FF00) | (x<<24) ); return ( (x>>24) | ((x<<8) & 0x00FF0000) | ((x>>8) & 0x0000FF00) | (x<<24) );
return x; return x;
} }
bool readPBFBlobHeader(std::fstream& stream, _ThreadData * threadData) { inline bool readPBFBlobHeader(std::fstream& stream, _ThreadData * threadData) {
int size(0); int size(0);
stream.read((char *)&size, sizeof(int)); stream.read((char *)&size, sizeof(int));
size = swapEndian(size); size = swapEndian(size);
@ -424,17 +432,14 @@ private:
char *data = new char[size]; char *data = new char[size];
stream.read(data, size*sizeof(data[0])); stream.read(data, size*sizeof(data[0]));
if ( !(threadData->PBFBlobHeader).ParseFromArray( data, size ) ){ bool dataSuccessfullyParsed = (threadData->PBFBlobHeader).ParseFromArray( data, size);
delete[] data; delete[] data;
return false; return dataSuccessfullyParsed;
}
delete[] data;
return true;
} }
bool unpackZLIB(std::fstream &, _ThreadData * threadData) { inline bool unpackZLIB(std::fstream &, _ThreadData * threadData) {
unsigned rawSize = threadData->PBFBlob.raw_size(); unsigned rawSize = threadData->PBFBlob.raw_size();
char* unpackedDataArray = (char*)malloc(rawSize); char* unpackedDataArray = new char[rawSize];
z_stream compressedDataStream; z_stream compressedDataStream;
compressedDataStream.next_in = ( unsigned char* ) threadData->PBFBlob.zlib_data().data(); compressedDataStream.next_in = ( unsigned char* ) threadData->PBFBlob.zlib_data().data();
compressedDataStream.avail_in = threadData->PBFBlob.zlib_data().size(); compressedDataStream.avail_in = threadData->PBFBlob.zlib_data().size();
@ -446,7 +451,7 @@ private:
int ret = inflateInit( &compressedDataStream ); int ret = inflateInit( &compressedDataStream );
if ( ret != Z_OK ) { if ( ret != Z_OK ) {
std::cerr << "[error] failed to init zlib stream" << std::endl; std::cerr << "[error] failed to init zlib stream" << std::endl;
free(unpackedDataArray); delete[] unpackedDataArray;
return false; return false;
} }
@ -454,45 +459,43 @@ private:
if ( ret != Z_STREAM_END ) { if ( ret != Z_STREAM_END ) {
std::cerr << "[error] failed to inflate zlib stream" << std::endl; std::cerr << "[error] failed to inflate zlib stream" << std::endl;
std::cerr << "[error] Error type: " << ret << std::endl; std::cerr << "[error] Error type: " << ret << std::endl;
free(unpackedDataArray); delete[] unpackedDataArray;
return false; return false;
} }
ret = inflateEnd( &compressedDataStream ); ret = inflateEnd( &compressedDataStream );
if ( ret != Z_OK ) { if ( ret != Z_OK ) {
std::cerr << "[error] failed to deinit zlib stream" << std::endl; std::cerr << "[error] failed to deinit zlib stream" << std::endl;
free(unpackedDataArray); delete[] unpackedDataArray;
return false; return false;
} }
threadData->charBuffer.clear(); threadData->charBuffer.resize(rawSize); threadData->charBuffer.clear(); threadData->charBuffer.resize(rawSize);
for(unsigned i = 0; i < rawSize; i++) { std::copy(unpackedDataArray, unpackedDataArray + rawSize, threadData->charBuffer.begin());
threadData->charBuffer[i] = unpackedDataArray[i]; delete[] unpackedDataArray;
}
free(unpackedDataArray);
return true; return true;
} }
bool unpackLZMA(std::fstream &, _ThreadData * ) { inline bool unpackLZMA(std::fstream &, _ThreadData * ) const {
return false; return false;
} }
bool readBlob(std::fstream& stream, _ThreadData * threadData) { inline bool readBlob(std::fstream& stream, _ThreadData * threadData) {
if(stream.eof()) if(stream.eof())
return false; return false;
int size = threadData->PBFBlobHeader.datasize(); const int size = threadData->PBFBlobHeader.datasize();
if ( size < 0 || size > MAX_BLOB_SIZE ) { if ( size < 0 || size > MAX_BLOB_SIZE ) {
std::cerr << "[error] invalid Blob size:" << size << std::endl; std::cerr << "[error] invalid Blob size:" << size << std::endl;
return false; return false;
} }
char* data = (char*)malloc(size); char* data = new char[size];
stream.read(data, sizeof(data[0])*size); stream.read(data, sizeof(data[0])*size);
if ( !threadData->PBFBlob.ParseFromArray( data, size ) ) { if ( !threadData->PBFBlob.ParseFromArray( data, size ) ) {
std::cerr << "[error] failed to parse blob" << std::endl; std::cerr << "[error] failed to parse blob" << std::endl;
free(data); delete[] data;
return false; return false;
} }
@ -500,30 +503,28 @@ private:
const std::string& data = threadData->PBFBlob.raw(); const std::string& data = threadData->PBFBlob.raw();
threadData->charBuffer.clear(); threadData->charBuffer.clear();
threadData->charBuffer.resize( data.size() ); threadData->charBuffer.resize( data.size() );
for ( unsigned i = 0; i < data.size(); i++ ) { std::copy(data.begin(), data.end(), threadData->charBuffer.begin());
threadData->charBuffer[i] = data[i];
}
} else if ( threadData->PBFBlob.has_zlib_data() ) { } else if ( threadData->PBFBlob.has_zlib_data() ) {
if ( !unpackZLIB(stream, threadData) ) { if ( !unpackZLIB(stream, threadData) ) {
std::cerr << "[error] zlib data encountered that could not be unpacked" << std::endl; std::cerr << "[error] zlib data encountered that could not be unpacked" << std::endl;
free(data); delete[] data;
return false; return false;
} }
} else if ( threadData->PBFBlob.has_lzma_data() ) { } else if ( threadData->PBFBlob.has_lzma_data() ) {
if ( !unpackLZMA(stream, threadData) ) if ( !unpackLZMA(stream, threadData) )
std::cerr << "[error] lzma data encountered that could not be unpacked" << std::endl; std::cerr << "[error] lzma data encountered that could not be unpacked" << std::endl;
free(data); delete[] data;
return false; return false;
} else { } else {
std::cerr << "[error] Blob contains no data" << std::endl; std::cerr << "[error] Blob contains no data" << std::endl;
free(data); delete[] data;
return false; return false;
} }
free(data); delete[] data;
return true; return true;
} }
bool readNextBlock(std::fstream& stream, _ThreadData * threadData) { inline bool readNextBlock(std::fstream& stream, _ThreadData * threadData) {
if(stream.eof()) { if(stream.eof()) {
return false; return false;
} }
@ -547,10 +548,11 @@ private:
return true; return true;
} }
Endianness getMachineEndianness() const { //TODO: gets called multiple times, save information instead of recomputing.
inline Endianness getMachineEndianness() const {
int i(1); int i(1);
char *p = (char *) &i; char *p = (char *) &i;
if (p[0] == 1) if (1 == p[0])
return LittleEndian; return LittleEndian;
return BigEndian; return BigEndian;
} }
@ -559,9 +561,11 @@ private:
static const int MAX_BLOB_HEADER_SIZE = 64 * 1024; static const int MAX_BLOB_HEADER_SIZE = 64 * 1024;
static const int MAX_BLOB_SIZE = 32 * 1024 * 1024; static const int MAX_BLOB_SIZE = 32 * 1024 * 1024;
#ifndef NDEBUG
/* counting the number of read blocks and groups */ /* counting the number of read blocks and groups */
unsigned groupCount; unsigned groupCount;
unsigned blockCount; unsigned blockCount;
#endif
ExtractorCallbacks * externalMemory; ExtractorCallbacks * externalMemory;
/* the input stream to parse */ /* the input stream to parse */