19#ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
20#define DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
27 template <
class MsgBuffer>
38 _recvBufferSizes.clear();
39 _recvBufferSizesComputed = false ;
42 template <
class MsgBuffer>
47 typedef linkage_t::const_iterator const_iterator ;
48 dest.resize( linkage.size() );
49 const const_iterator linkageEnd = linkage.end ();
50 for( const_iterator i = linkage.begin (); i != linkageEnd; ++i )
52 dest[ (*i).second ] = (*i).first;
56 template <
class MsgBuffer>
59 insertRequest(
const std::set< int >& sendLinks,
const std::set< int >& recvLinks )
64 const int me_rank = rank ();
67 typedef std::map< int, int >::iterator iterator ;
68 typedef std::set< int >::const_iterator const_iterator;
70 const iterator sendEnd = sendLinkage_.end ();
71 const iterator recvEnd = recvLinkage_.end ();
72 const const_iterator sendLinksEnd = sendLinks.end ();
75 for (const_iterator i = sendLinks.begin (); i != sendLinksEnd; ++i )
77 const int rank = (*i);
79 if( rank != me_rank && (sendLinkage_.find ( rank ) == sendEnd ) )
81 sendLinkage_.insert( std::make_pair( rank, sendLink++) );
85 const const_iterator recvLinksEnd = recvLinks.end ();
86 for (const_iterator i = recvLinks.begin (); i != recvLinksEnd; ++i )
88 const int rank = (*i);
90 if( rank != me_rank && (recvLinkage_.find ( rank ) == recvEnd ) )
92 recvLinkage_.insert( std::make_pair( rank, recvLink++) );
98 computeDestinations( sendLinkage_, sendDest_ );
101 computeDestinations( recvLinkage_, recvSource_ );
113#define MY_INT_TEST int test =
118 template <
class P2PCommunicator >
119 class NonBlockingExchangeImplementation
121 typedef P2PCommunicator P2PCommunicatorType ;
122 const P2PCommunicatorType& _p2pCommunicator;
124 const int _sendLinks;
125 const int _recvLinks;
128 MPI_Request* _sendRequest;
129 MPI_Request* _recvRequest;
131 const bool _recvBufferSizesKnown;
135 NonBlockingExchangeImplementation(
const NonBlockingExchangeImplementation& );
138 MPI_Request* createSendRequest()
const
140 return ( _sendLinks > 0 ) ?
new MPI_Request [ _sendLinks ] : 0;
145 MPI_Request* createRecvRequest(
const bool recvBufferSizesKnown )
const
147 return ( _recvLinks > 0 && recvBufferSizesKnown ) ?
new MPI_Request [ _recvLinks ] : 0;
151 MPI_Comm mpiCommunicator()
const {
return static_cast< MPI_Comm
> (_p2pCommunicator); }
154 typedef typename P2PCommunicatorType :: DataHandleInterface DataHandleInterface;
155 typedef typename P2PCommunicatorType :: MessageBufferType MessageBufferType;
157 NonBlockingExchangeImplementation(
const P2PCommunicatorType& p2pComm,
159 const bool recvBufferSizesKnown =
false )
160 : _p2pCommunicator( p2pComm ),
161 _sendLinks( _p2pCommunicator.sendLinks() ),
162 _recvLinks( _p2pCommunicator.recvLinks() ),
164 _sendRequest( createSendRequest() ),
165 _recvRequest( createRecvRequest( recvBufferSizesKnown ) ),
166 _recvBufferSizesKnown( recvBufferSizesKnown ),
172 assert ( mytag == _p2pCommunicator.max( mytag ) );
176 NonBlockingExchangeImplementation(
const P2PCommunicatorType& p2pComm,
178 const std::vector< MessageBufferType > & sendBuffers )
179 : _p2pCommunicator( p2pComm ),
180 _sendLinks( _p2pCommunicator.sendLinks() ),
181 _recvLinks( _p2pCommunicator.recvLinks() ),
183 _sendRequest( createSendRequest() ),
184 _recvRequest( createRecvRequest( false ) ),
185 _recvBufferSizesKnown( false ),
191 assert ( mytag == _p2pCommunicator.max( mytag ) );
194 assert ( _sendLinks ==
int( sendBuffers.size() ) );
195 sendImpl( sendBuffers );
201 ~NonBlockingExchangeImplementation()
205 delete [] _sendRequest;
211 delete [] _recvRequest;
217 void send(
const std::vector< MessageBufferType > & sendBuffers ) { sendImpl( sendBuffers ); }
218 std::vector< MessageBufferType > receive() {
return receiveImpl(); }
225 void sendImpl(
const std::vector< MessageBufferType > & sendBuffers )
228 MPI_Comm comm = mpiCommunicator();
231 const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
234 for (
int link = 0; link < _sendLinks; ++link)
236 sendLink( sendDest[ link ], _tag, sendBuffers[ link ], _sendRequest[ link ], comm );
240 _needToSend = false ;
244 std::vector< MessageBufferType > receiveImpl ()
247 std::vector< MessageBufferType > recvBuffer( _recvLinks );
248 receiveImpl( recvBuffer );
253 void receiveImpl ( std::vector< MessageBufferType >& recvBuffers, DataHandleInterface* dataHandle = 0)
256 if( (_recvLinks + _sendLinks) == 0 )
return;
259 MPI_Comm comm = mpiCommunicator();
262 const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
265 const bool useFirstStreamOnly = (recvBuffers.size() == 1) ;
268 std::vector< bool > linkNotReceived( _recvLinks,
true );
272 while( numReceived < _recvLinks )
275 for (
int link = 0; link < _recvLinks; ++link )
278 if( linkNotReceived[ link ] )
281 MessageBufferType& recvBuffer = useFirstStreamOnly ? recvBuffers[ 0 ] : recvBuffers[ link ];
285 if( probeAndReceive( comm, recvSource[ link ], _tag, recvBuffer ) )
288 if( dataHandle ) dataHandle->unpack( link, recvBuffer );
291 linkNotReceived[ link ] = false ;
304 MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
305 assert (test == MPI_SUCCESS);
310 void unpackRecvBufferSizeKnown( std::vector< MessageBufferType >& recvBuffers, DataHandleInterface& dataHandle )
313 if( _recvLinks == 0 )
return;
316 std::vector< bool > linkNotReceived( _recvLinks,
true );
320 while( numReceived < _recvLinks )
323 for (
int link = 0; link < _recvLinks; ++link )
326 if( linkNotReceived[ link ] )
328 assert( _recvRequest );
330 if( receivedMessage( _recvRequest[ link ], recvBuffers[ link ] ) )
333 dataHandle.unpack( link, recvBuffers[ link ] );
336 linkNotReceived[ link ] = false ;
348 MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
349 assert (test == MPI_SUCCESS);
354 void send( std::vector< MessageBufferType >& sendBuffers,
355 DataHandleInterface& dataHandle )
357 std::vector< MessageBufferType > recvBuffers;
358 send( sendBuffers, recvBuffers, dataHandle );
362 void send( std::vector< MessageBufferType >& sendBuffer,
363 std::vector< MessageBufferType >& recvBuffer,
364 DataHandleInterface& dataHandle )
369 MPI_Comm comm = mpiCommunicator();
372 const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
375 for (
int link = 0; link < _sendLinks; ++link)
378 dataHandle.pack( link, sendBuffer[ link ] );
381 sendLink( sendDest[ link ], _tag, sendBuffer[ link ], _sendRequest[ link ], comm );
385 _needToSend = false ;
389 if( _recvBufferSizesKnown )
392 MPI_Comm comm = mpiCommunicator();
394 recvBuffer.resize( _recvLinks );
397 const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
398 const std::vector< int >& recvBufferSizes = _p2pCommunicator.recvBufferSizes();
401 for (
int link = 0; link < _recvLinks; ++link)
404 const int bufferSize = recvBufferSizes[ link ];
407 assert( _recvRequest );
408 assert( &_recvRequest[ link ] );
409 postReceive( recvSource[ link ], _tag, bufferSize, recvBuffer[ link ], _recvRequest[ link ], comm );
415 void receive( DataHandleInterface& dataHandle )
418 dataHandle.localComputation() ;
421 std::vector< MessageBufferType > recvBuffer( 1 );
423 receiveImpl( recvBuffer, &dataHandle );
427 void exchange( DataHandleInterface& dataHandle )
429 const int recvLinks = _p2pCommunicator.recvLinks();
431 if( (recvLinks + _sendLinks) == 0 )
return;
436 std::vector< MessageBufferType > recvBuffers ;
438 std::vector< MessageBufferType > sendBuffers;
444 sendBuffers.resize(_sendLinks);
447 send( sendBuffers, recvBuffers, dataHandle );
451 if( _recvBufferSizesKnown )
452 unpackRecvBufferSizeKnown( recvBuffers, dataHandle );
454 receive( dataHandle );
458 int sendLink(
const int dest,
const int tag,
459 const MessageBufferType& msgBuffer, MPI_Request& request, MPI_Comm& comm )
462 std::pair< char*, int > buffer = msgBuffer.buffer();
464 MY_INT_TEST MPI_Isend ( buffer.first, buffer.second, MPI_BYTE, dest, tag, comm, &request );
465 assert (test == MPI_SUCCESS);
467 return buffer.second;
470 void postReceive(
const int source,
const int tag,
const int bufferSize,
471 MessageBufferType& msgBuffer, MPI_Request& request, MPI_Comm& comm )
474 msgBuffer.resize( bufferSize );
476 msgBuffer.resetReadPosition();
479 std::pair< char*, int > buffer = msgBuffer.buffer();
483 MY_INT_TEST MPI_Irecv ( buffer.first, buffer.second, MPI_BYTE, source, tag, comm, & request);
484 assert (test == MPI_SUCCESS);
489 bool receivedMessage( MPI_Request& request, MessageBufferType&
503 MPI_Test( &request, &received,
514 int checkBufferSize = -1;
515 MPI_Get_count ( & status, MPI_BYTE, &checkBufferSize );
516 if( checkBufferSize !=
int(buffer.size()) )
517 std::cout <<
"Buffer sizes don't match: " << checkBufferSize <<
" " << buffer.size() << std::endl;
518 assert( checkBufferSize ==
int(buffer.size()) );
521 return bool(received);
525 bool probeAndReceive( MPI_Comm& comm,
528 MessageBufferType& recvBuffer )
538 MPI_Iprobe( source, tag, comm, &available, &status );
544 assert ( source == status.MPI_SOURCE );
551 MY_INT_TEST MPI_Get_count ( &status, MPI_BYTE, &bufferSize );
552 assert (test == MPI_SUCCESS);
556 recvBuffer.resize( bufferSize );
558 recvBuffer.resetReadPosition();
561 std::pair< char*, int > buffer = recvBuffer.buffer();
565 MY_INT_TEST MPI_Recv ( buffer.first, buffer.second, MPI_BYTE, status.MPI_SOURCE, tag, comm, & status);
566 assert (test == MPI_SUCCESS);
580 template <
class MsgBuffer>
589 assert( _recvBufferSizes.empty () );
591 NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *
this, getMessageTag() );
592 nonBlockingExchange.exchange( handle );
597 template <
class MsgBuffer>
598 inline std::vector< MsgBuffer >
600 exchange(
const std::vector< MessageBufferType > & in )
const
605 NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *
this, getMessageTag(), in );
606 return nonBlockingExchange.receiveImpl();
614 template <
class MsgBuffer>
624 if( ! _recvBufferSizesComputed )
626 const int nSendLinks = sendLinks();
627 std::vector< MsgBuffer > buffers( nSendLinks );
629 for(
int link=0; link<nSendLinks; ++link )
631 handle.pack( link, buffers[ link ] );
634 buffers = exchange( buffers );
635 const int nRecvLinks = recvLinks();
637 for(
int link=0; link<nRecvLinks; ++link )
639 handle.unpack( link, buffers[ link ] );
642 _recvBufferSizes.resize( nRecvLinks );
643 for(
int link=0; link<nRecvLinks; ++link )
645 _recvBufferSizes[ link ] = buffers[ link ].size();
647 _recvBufferSizesComputed = true ;
651 NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *
this, getMessageTag(), _recvBufferSizesComputed );
652 nonBlockingExchange.exchange( handle );
Definition p2pcommunicator.hh:166
Point-2-Point communicator for exchange messages between processes.
Definition p2pcommunicator.hh:132
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition p2pcommunicator_impl.hh:600
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition p2pcommunicator_impl.hh:59
void removeLinkage()
remove stored linkage
Definition p2pcommunicator_impl.hh:30
The namespace Dune is the main namespace for all Dune code.
Definition CartesianIndexMapper.hpp:10