Fixes a race condition when more than one thread tries to access the grid file (thanks Patrick)
This commit is contained in:
		
							parent
							
								
									73ab331506
								
							
						
					
					
						commit
						e176115273
					
				
							
								
								
									
										47
									
								
								DataStructures/HashTable.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								DataStructures/HashTable.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,47 @@ | |||||||
|  | /*
 | ||||||
|  |     open source routing machine | ||||||
|  |     Copyright (C) Dennis Luxen, others 2010 | ||||||
|  | 
 | ||||||
|  | This program is free software; you can redistribute it and/or modify | ||||||
|  | it under the terms of the GNU AFFERO General Public License as published by | ||||||
|  | the Free Software Foundation; either version 3 of the License, or | ||||||
|  | any later version. | ||||||
|  | 
 | ||||||
|  | This program is distributed in the hope that it will be useful, | ||||||
|  | but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | GNU General Public License for more details. | ||||||
|  | 
 | ||||||
|  | You should have received a copy of the GNU Affero General Public License | ||||||
|  | along with this program; if not, write to the Free Software | ||||||
|  | Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA | ||||||
|  | or see http://www.gnu.org/licenses/agpl.txt.
 | ||||||
|  | 
 | ||||||
|  |   Created on: 18.11.2010 | ||||||
|  |   Author: dennis | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | #ifndef HASHTABLE_H_ | ||||||
|  | #define HASHTABLE_H_ | ||||||
|  | 
 | ||||||
|  | #include <google/sparse_hash_map> | ||||||
|  | 
 | ||||||
|  | template<typename keyT, typename valueT> | ||||||
|  | class HashTable { | ||||||
|  | 	typedef google::sparse_hash_map<keyT, valueT> MyHashTable; | ||||||
|  | public: | ||||||
|  | 	HashTable() { } | ||||||
|  | 	HashTable(const unsigned size) { | ||||||
|  | 		table.resize(size); | ||||||
|  | 	} | ||||||
|  | 	void Add(const keyT& key, const valueT& value){ | ||||||
|  | 		table[key] = value; | ||||||
|  | 	} | ||||||
|  | 	valueT Find(const keyT& key) { | ||||||
|  | 		return table.find(key)->second; | ||||||
|  | 	} | ||||||
|  | private: | ||||||
|  | 	MyHashTable table; | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | #endif /* HASHTABLE_H_ */ | ||||||
| @ -28,6 +28,8 @@ or see http://www.gnu.org/licenses/agpl.txt. | |||||||
| #include <cmath> | #include <cmath> | ||||||
| #include <vector> | #include <vector> | ||||||
| #include <stxxl.h> | #include <stxxl.h> | ||||||
|  | 
 | ||||||
|  | #include <boost/thread.hpp> | ||||||
| #include <google/dense_hash_map> | #include <google/dense_hash_map> | ||||||
| 
 | 
 | ||||||
| #include "ExtractorStructs.h" | #include "ExtractorStructs.h" | ||||||
| @ -146,30 +148,56 @@ static void getListOfIndexesForEdgeAndGridSize(_Coordinate& start, _Coordinate& | |||||||
| 
 | 
 | ||||||
| template<bool WriteAccess = false> | template<bool WriteAccess = false> | ||||||
| class NNGrid { | class NNGrid { | ||||||
|  | 	struct _ThreadData{ | ||||||
|  | 		_ThreadData(const char * iif) { | ||||||
|  | 			stream.open(iif, std::ios::in | std::ios::binary); | ||||||
|  | 		} | ||||||
|  | 		~_ThreadData() { | ||||||
|  | 			stream.close(); | ||||||
|  | 		} | ||||||
|  | 		ifstream stream; | ||||||
|  | 	}; | ||||||
| 
 | 
 | ||||||
|  | 	struct ThreadLookupTable{ | ||||||
|  | 		HashTable<unsigned, unsigned> table; | ||||||
|  | 	}; | ||||||
| public: | public: | ||||||
|  | 	ThreadLookupTable threadLookup; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| 	NNGrid() { ramIndexTable.resize((1024*1024), UINT_MAX); if( WriteAccess) { entries = new stxxl::vector<GridEdgeData>(); }} | 	NNGrid() { ramIndexTable.resize((1024*1024), UINT_MAX); if( WriteAccess) { entries = new stxxl::vector<GridEdgeData>(); }} | ||||||
| 
 | 
 | ||||||
| 	NNGrid(const char* rif, const char* iif) { | 	NNGrid(const char* rif, const char* iif) { | ||||||
| 		ramIndexTable.resize((1024*1024), UINT_MAX); | 		ramIndexTable.resize((1024*1024), UINT_MAX); | ||||||
|         indexInFile.open(iif, std::ios::in | std::ios::binary); | //		indexInFile.open(iif, std::ios::in | std::ios::binary);
 | ||||||
| 		ramInFile.open(rif, std::ios::in | std::ios::binary); | 		ramInFile.open(rif, std::ios::in | std::ios::binary); | ||||||
|  | 
 | ||||||
|  | 		cout << "opening grid file for " << omp_get_num_procs() << " threads" << endl; | ||||||
|  | 
 | ||||||
|  | 		for(int i = 0; i< omp_get_num_procs(); i++) { | ||||||
|  | 			indexFileStreams.push_back(new _ThreadData(iif)); | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	~NNGrid() { | 	~NNGrid() { | ||||||
| 		if(ramInFile.is_open()) ramInFile.close(); | 		if(ramInFile.is_open()) ramInFile.close(); | ||||||
|         if(indexInFile.is_open()) indexInFile.close(); | //		if(indexInFile.is_open()) indexInFile.close();
 | ||||||
| 
 | 
 | ||||||
| 		if (WriteAccess) | 		if (WriteAccess) | ||||||
| 		{ | 		{ | ||||||
| 			delete entries; | 			delete entries; | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | #pragma omp parallel for | ||||||
|  | 		for(int i = 0; i< indexFileStreams.size(); i++) { | ||||||
|  | 			delete indexFileStreams[i]; | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	void OpenIndexFiles() | 	void OpenIndexFiles() | ||||||
| 	{ | 	{ | ||||||
| 		assert(ramInFile.is_open()); | 		assert(ramInFile.is_open()); | ||||||
|         assert(indexInFile.is_open()); | //		assert(indexInFile.is_open());
 | ||||||
| 
 | 
 | ||||||
| 		for(int i = 0; i < 1024*1024; i++) | 		for(int i = 0; i < 1024*1024; i++) | ||||||
| 		{ | 		{ | ||||||
| @ -484,8 +512,11 @@ private: | |||||||
| 
 | 
 | ||||||
| 	void GetContentsOfFileBucket(const unsigned fileIndex, std::vector<_Edge>& result) | 	void GetContentsOfFileBucket(const unsigned fileIndex, std::vector<_Edge>& result) | ||||||
| 	{ | 	{ | ||||||
|  | //		cout << "thread: " << boost::this_thread::get_id() << ", hash: " << boost_thread_id_hash(boost::this_thread::get_id()) << ", id: " << threadLookup.table.Find(boost_thread_id_hash(boost::this_thread::get_id())) << endl;
 | ||||||
|  | 		unsigned threadID = threadLookup.table.Find(boost_thread_id_hash(boost::this_thread::get_id())); | ||||||
| 		unsigned ramIndex = getRAMIndexFromFileIndex(fileIndex); | 		unsigned ramIndex = getRAMIndexFromFileIndex(fileIndex); | ||||||
| 		unsigned startIndexInFile = ramIndexTable[ramIndex]; | 		unsigned startIndexInFile = ramIndexTable[ramIndex]; | ||||||
|  | //		ifstream indexInFile( indexFileStreams[threadID]->stream );
 | ||||||
| 		if(startIndexInFile == UINT_MAX){ | 		if(startIndexInFile == UINT_MAX){ | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| @ -495,7 +526,8 @@ private: | |||||||
| 		google::dense_hash_map< unsigned, unsigned > * cellMap = new google::dense_hash_map< unsigned, unsigned >(1024); | 		google::dense_hash_map< unsigned, unsigned > * cellMap = new google::dense_hash_map< unsigned, unsigned >(1024); | ||||||
| 		cellMap->set_empty_key(UINT_MAX); | 		cellMap->set_empty_key(UINT_MAX); | ||||||
| 
 | 
 | ||||||
|         indexInFile.seekg(startIndexInFile); | 
 | ||||||
|  | 		indexFileStreams[threadID]->stream.seekg(startIndexInFile); | ||||||
| 
 | 
 | ||||||
| 		unsigned lineBase = ramIndex/1024; | 		unsigned lineBase = ramIndex/1024; | ||||||
| 		lineBase = lineBase*32*32768; | 		lineBase = lineBase*32*32768; | ||||||
| @ -516,7 +548,7 @@ private: | |||||||
| 		unsigned numOfElementsInCell = 0; | 		unsigned numOfElementsInCell = 0; | ||||||
| 		for(int i = 0; i < 32*32; i++) | 		for(int i = 0; i < 32*32; i++) | ||||||
| 		{ | 		{ | ||||||
|             indexInFile.read((char *)&cellIndex[i], sizeof(unsigned)); | 			indexFileStreams[threadID]->stream.read((char *)&cellIndex[i], sizeof(unsigned)); | ||||||
| 			numOfElementsInCell += cellIndex[i]; | 			numOfElementsInCell += cellIndex[i]; | ||||||
| 		} | 		} | ||||||
| 		assert(cellMap->find(fileIndex) != cellMap->end()); | 		assert(cellMap->find(fileIndex) != cellMap->end()); | ||||||
| @ -526,18 +558,18 @@ private: | |||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 		unsigned position = cellIndex[cellMap->find(fileIndex)->second] + 32*32*sizeof(unsigned) ; | 		unsigned position = cellIndex[cellMap->find(fileIndex)->second] + 32*32*sizeof(unsigned) ; | ||||||
|         indexInFile.seekg(position); | 		indexFileStreams[threadID]->stream.seekg(position); | ||||||
| 		unsigned numberOfEdgesInFileBucket = 0; | 		unsigned numberOfEdgesInFileBucket = 0; | ||||||
| 		NodeID start, target; int slat, slon, tlat, tlon; | 		NodeID start, target; int slat, slon, tlat, tlon; | ||||||
| 		do{ | 		do{ | ||||||
|             indexInFile.read((char *)&(start), sizeof(NodeID)); | 			indexFileStreams[threadID]->stream.read((char *)&(start), sizeof(NodeID)); | ||||||
|             if(start == UINT_MAX || indexInFile.eof()) | 			if(start == UINT_MAX || indexFileStreams[threadID]->stream.eof()) | ||||||
| 				break; | 				break; | ||||||
|             indexInFile.read((char *)&(target), sizeof(NodeID)); | 			indexFileStreams[threadID]->stream.read((char *)&(target), sizeof(NodeID)); | ||||||
|             indexInFile.read((char *)&(slat), sizeof(int)); | 			indexFileStreams[threadID]->stream.read((char *)&(slat), sizeof(int)); | ||||||
|             indexInFile.read((char *)&(slon), sizeof(int)); | 			indexFileStreams[threadID]->stream.read((char *)&(slon), sizeof(int)); | ||||||
|             indexInFile.read((char *)&(tlat), sizeof(int)); | 			indexFileStreams[threadID]->stream.read((char *)&(tlat), sizeof(int)); | ||||||
|             indexInFile.read((char *)&(tlon), sizeof(int)); | 			indexFileStreams[threadID]->stream.read((char *)&(tlon), sizeof(int)); | ||||||
| 
 | 
 | ||||||
| 			_Edge e(start, target); | 			_Edge e(start, target); | ||||||
| 			e.startCoord.lat = slat; | 			e.startCoord.lat = slat; | ||||||
| @ -591,8 +623,9 @@ private: | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	ofstream indexOutFile; | 	ofstream indexOutFile; | ||||||
|     ifstream indexInFile; | //	ifstream indexInFile;
 | ||||||
| 	ifstream ramInFile; | 	ifstream ramInFile; | ||||||
|  | 	std::vector < _ThreadData* > indexFileStreams; | ||||||
| 	stxxl::vector<GridEdgeData> * entries; | 	stxxl::vector<GridEdgeData> * entries; | ||||||
| 	std::vector<unsigned> ramIndexTable; //4 MB for first level index in RAM
 | 	std::vector<unsigned> ramIndexTable; //4 MB for first level index in RAM
 | ||||||
| }; | }; | ||||||
|  | |||||||
| @ -61,6 +61,10 @@ public: | |||||||
| 	inline bool FindRoutingStarts(const _Coordinate start, const _Coordinate target, PhantomNodes * phantomNodes) { | 	inline bool FindRoutingStarts(const _Coordinate start, const _Coordinate target, PhantomNodes * phantomNodes) { | ||||||
| 	    g->FindRoutingStarts(start, target, phantomNodes); | 	    g->FindRoutingStarts(start, target, phantomNodes); | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	inline void RegisterThread(const unsigned k, const unsigned v) { | ||||||
|  | 		g->threadLookup.table.Add(k, v); | ||||||
|  | 	} | ||||||
| private: | private: | ||||||
| 	vector<_Coordinate> * int2ExtNodeMap; | 	vector<_Coordinate> * int2ExtNodeMap; | ||||||
| 	ReadOnlyGrid * g; | 	ReadOnlyGrid * g; | ||||||
|  | |||||||
| @ -37,4 +37,12 @@ inline double get_timestamp() | |||||||
| double y2lat(double a) { return 180/M_PI * (2 * atan(exp(a*M_PI/180)) - M_PI/2); } | double y2lat(double a) { return 180/M_PI * (2 * atan(exp(a*M_PI/180)) - M_PI/2); } | ||||||
| double lat2y(double a) { return 180/M_PI * log(tan(M_PI/4+a*(M_PI/180)/2)); } | double lat2y(double a) { return 180/M_PI * log(tan(M_PI/4+a*(M_PI/180)/2)); } | ||||||
| 
 | 
 | ||||||
|  | inline unsigned boost_thread_id_hash(boost::thread::id const& id) | ||||||
|  | { | ||||||
|  | 	std::stringstream ostr; | ||||||
|  | 	ostr << id; | ||||||
|  | 	std::tr1::hash<std::string> h; | ||||||
|  | 	return h(ostr.str()); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| #endif /* TIMEUTIL_H_ */ | #endif /* TIMEUTIL_H_ */ | ||||||
|  | |||||||
| @ -70,6 +70,7 @@ public: | |||||||
| 		for (std::size_t i = 0; i < thread_pool_size_; ++i) | 		for (std::size_t i = 0; i < thread_pool_size_; ++i) | ||||||
| 		{ | 		{ | ||||||
| 			boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_))); | 			boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_))); | ||||||
|  | 			sEngine->RegisterThread( boost_thread_id_hash( thread->get_id() ), threads.size()); | ||||||
| 			threads.push_back(thread); | 			threads.push_back(thread); | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -29,6 +29,8 @@ or see http://www.gnu.org/licenses/agpl.txt. | |||||||
| #include <cstring> | #include <cstring> | ||||||
| #include <string> | #include <string> | ||||||
| 
 | 
 | ||||||
|  | #include <boost/thread.hpp> | ||||||
|  | 
 | ||||||
| using namespace std; | using namespace std; | ||||||
| 
 | 
 | ||||||
| #define VERBOSE(x) x | #define VERBOSE(x) x | ||||||
| @ -45,6 +47,7 @@ typedef unsigned int EdgeWeight; | |||||||
| static const NodeID SPECIAL_NODEID = UINT_MAX; | static const NodeID SPECIAL_NODEID = UINT_MAX; | ||||||
| static const EdgeID SPECIAL_EDGEID = UINT_MAX; | static const EdgeID SPECIAL_EDGEID = UINT_MAX; | ||||||
| 
 | 
 | ||||||
|  | #include "DataStructures/HashTable.h" | ||||||
| #include "DataStructures/NodeCoords.h" | #include "DataStructures/NodeCoords.h" | ||||||
| typedef NodeCoords<NodeID> NodeInfo; | typedef NodeCoords<NodeID> NodeInfo; | ||||||
| #include "DataStructures/Util.h" | #include "DataStructures/Util.h" | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user