MODFLOW 6  version 6.7.0.dev1
USGS Modular Hydrologic Model
MpiRouter.f90
Go to the documentation of this file.
3  use kindmodule, only: i4b, lgp
4  use profilermodule, only: g_prof
16  use mpiworldmodule
17  use mpi
18  implicit none
19  private
20 
21  public :: create_mpi_router
22 
23  type, public, extends(routerbasetype) :: mpiroutertype
24  integer(I4B), dimension(:), pointer :: model_proc_ids
25  type(stlvecint) :: senders !< the process ids to receive data from
26  type(stlvecint) :: receivers !< the process ids to send data to
27  type(vdcptrtype), dimension(:), pointer :: all_models => null() !< all virtual models from the global list
28  type(vdcptrtype), dimension(:), pointer :: all_exchanges => null() !< all virtual exchanges from the global list
29  type(vdcptrtype), dimension(:), pointer :: rte_models => null() !< the currently active models to be routed
30  type(vdcptrtype), dimension(:), pointer :: rte_exchanges => null() !< the currently active exchanges to be routed
31  type(mpimessagebuildertype) :: message_builder
32  type(mpimessagecachetype) :: msg_cache
33  type(mpiworldtype), pointer :: mpi_world => null()
34  integer(I4B) :: imon !< the output file unit for the mpi monitor
35  logical(LGP) :: enable_monitor !< when true, log diagnostics
36  integer(I4B), dimension(:, :), allocatable :: tmr_mpi_wait !< array with timer handles for MPI_Wait calls
37  contains
38  procedure :: initialize => mr_initialize
39  procedure :: route_all => mr_route_all
40  procedure :: route_sln => mr_route_sln
41  procedure :: finalize => mr_finalize
42  procedure :: destroy => mr_destroy
43  ! private
44  procedure, private :: activate
45  procedure, private :: deactivate
46  procedure, private :: update_senders
47  procedure, private :: update_receivers
48  procedure, private :: route_active
49  procedure, private :: is_cached
50  procedure, private :: compose_messages
51  procedure, private :: load_messages
52  end type mpiroutertype
53 
54 contains
55 
56  !> Factory method to create MPI router
57  !<
58  function create_mpi_router() result(router)
59  class(routerbasetype), pointer :: router
60  ! local
61  class(mpiroutertype), pointer :: mpi_router
62 
63  allocate (mpi_router)
64  router => mpi_router
65 
66  end function create_mpi_router
67 
68  subroutine mr_initialize(this)
69  use inputoutputmodule, only: getunit
70  use constantsmodule, only: linelength
71  class(mpiroutertype) :: this
72  ! local
73  integer :: ierr
74  integer(I4B) :: i
75  integer(I4B) :: nr_models, nr_exchanges
76  class(virtualdatacontainertype), pointer :: vdc
77  character(len=LINELENGTH) :: monitor_file
78 
79  ! allocate timer handles: global + nr. solutions
80  allocate (this%tmr_mpi_wait(nr_sim_stages, this%nr_virt_solutions + 1))
81  this%tmr_mpi_wait = -1
82 
83  ! routing over all when starting
84  this%halo_activated = .false.
85 
86  ! to log or not to log
87  this%enable_monitor = .false.
88 
89  ! initialize the MPI message builder and cache
90  call this%message_builder%init()
91  call this%msg_cache%init()
92 
93  ! get mpi world for our process
94  this%mpi_world => get_mpi_world()
95 
96  ! init address list
97  call this%senders%init()
98  call this%receivers%init()
99 
100  ! find out where models are
101  nr_models = virtual_model_list%Count()
102  nr_exchanges = virtual_exchange_list%Count()
103  allocate (this%model_proc_ids(nr_models))
104  allocate (this%all_models(nr_models))
105  allocate (this%all_exchanges(nr_exchanges))
106 
107  do i = 1, nr_models
109  this%all_models(i)%ptr => vdc
110  if (vdc%is_local) then
111  this%model_proc_ids(i) = proc_id
112  else
113  this%model_proc_ids(i) = 0
114  end if
115  end do
116 
117  call mpi_allreduce(mpi_in_place, this%model_proc_ids, nr_models, &
118  mpi_integer, mpi_sum, this%mpi_world%comm, ierr)
119  call check_mpi(ierr)
120 
121  ! set the process id to the models and exchanges
122  do i = 1, nr_models
124  call vdc%set_orig_rank(this%model_proc_ids(i))
125  end do
126 
127  do i = 1, nr_exchanges
129  this%all_exchanges(i)%ptr => vdc
130  select type (vex => vdc)
131  class is (virtualexchangetype)
132  call vex%set_orig_rank(vex%v_model1%orig_rank)
133  if (vex%v_model1%is_local) then
134  call vex%set_orig_rank(vex%v_model2%orig_rank)
135  end if
136  end select
137  end do
138 
139  ! open log file
140  if (this%enable_monitor) then
141  this%imon = getunit()
142  write (monitor_file, '(a,i0,a)') "mpi.p", proc_id, ".log"
143  open (unit=this%imon, file=monitor_file)
144  call this%message_builder%set_monitor(this%imon)
145 
146  ! write initial info
147  write (this%imon, '(a,/)') ">> initialize MPI Router:"
148  write (this%imon, '(2x,a,i0)') "process id: ", proc_id
149  write (this%imon, '(2x,a,i0)') "nr. of processes: ", nr_procs
150  write (this%imon, '(2x,a,i0)') "nr. of models: ", nr_models
151  write (this%imon, '(2x,a,i0)') "nr. of exchanges: ", nr_exchanges
152  write (this%imon, '(2x,2a)') "model id, processor id:"
153  do i = 1, nr_models
154  write (this%imon, '(4x,2i8)') i, this%model_proc_ids(i)
155  end do
156  write (this%imon, '(a,/)') "<< initialize done"
157  end if
158 
159  end subroutine mr_initialize
160 
161  !> @brief Activate models and exchanges for routing
162  !<
163  subroutine activate(this, models, exchanges)
164  class(mpiroutertype) :: this
165  type(vdcptrtype), dimension(:), pointer :: models
166  type(vdcptrtype), dimension(:), pointer :: exchanges
167 
168  this%rte_models => models
169  this%rte_exchanges => exchanges
170  call this%message_builder%attach_data(models, exchanges)
171 
172  end subroutine activate
173 
174  !> @brief Deactivate data after routing
175  !<
176  subroutine deactivate(this)
177  class(mpiroutertype) :: this
178 
179  this%rte_models => null()
180  this%rte_exchanges => null()
181  call this%message_builder%release_data()
182 
183  end subroutine deactivate
184 
185  !> @brief This will route all remote data from the
186  !! global models and exchanges over MPI, for a
187  !< given stage
188  subroutine mr_route_all(this, stage)
190  class(mpiroutertype) :: this
191  integer(I4B) :: stage
192 
193  if (this%enable_monitor) then
194  write (this%imon, '(/,2a)') ">> routing all: ", stg_to_str(stage)
195  end if
196 
197  ! route all
198  call this%activate(this%all_models, this%all_exchanges)
199  call this%route_active(0, stage)
200  call this%deactivate()
201 
202  if (this%enable_monitor) then
203  write (this%imon, '(a,/)') "<< end routing all"
204  !call mem_print_detailed(this%imon)
205  end if
206 
207  end subroutine mr_route_all
208 
209  !> @brief This will route all remote data from models
210  !! and exchanges in a particular solution over MPI,
211  !< for a given stage
212  subroutine mr_route_sln(this, virtual_sol, stage)
213  class(mpiroutertype) :: this
214  type(virtualsolutiontype) :: virtual_sol
215  integer(I4B) :: stage
216 
217  if (this%enable_monitor) then
218  write (this%imon, '(/,a,i0,2a)') ">> routing solution: ", &
219  virtual_sol%solution_id, ", ", stg_to_str(stage)
220  end if
221 
222  ! route for this solution
223  call this%activate(virtual_sol%models, virtual_sol%exchanges)
224  call this%route_active(virtual_sol%solution_id, stage)
225  call this%deactivate()
226 
227  if (this%enable_monitor) then
228  write (this%imon, '(a)') "<< end routing solution"
229  end if
230 
231  end subroutine mr_route_sln
232 
233  !> @brief Routes the models and exchanges over MPI,
234  !! either constructing the message bodies in a sequence
235  !! of communication steps, or by loading from cache
236  !< for efficiency.
237  subroutine route_active(this, unit, stage)
238  use simmodule, only: store_error
239  class(mpiroutertype) :: this !< this mpi router
240  integer(I4B) :: unit !< the solution id, or equal to 0 when global
241  integer(I4B) :: stage !< the stage to route
242  ! local
243  integer(I4B) :: i
244  integer(I4B) :: rnk
245  integer :: ierr
246  integer(kind=MPI_COUNT_KIND) :: msg_size !< need a longer int here, msg size can be > 2^31
247  logical(LGP) :: from_cache
248  ! mpi handles
249  integer, dimension(:), allocatable :: rcv_req
250  integer, dimension(:), allocatable :: snd_req
251  integer, dimension(:, :), allocatable :: rcv_stat
252 
253  ! message body
254  integer, dimension(:), allocatable :: body_rcv_t
255  integer, dimension(:), allocatable :: body_snd_t
256 
257  ! update address list
258  call this%update_senders()
259  call this%update_receivers()
260 
261  ! allocate body data
262  allocate (body_rcv_t(this%senders%size))
263  allocate (body_snd_t(this%receivers%size))
264 
265  ! allocate handles
266  allocate (rcv_req(this%senders%size))
267  allocate (snd_req(this%receivers%size))
268  allocate (rcv_stat(mpi_status_size, this%senders%size))
269 
270  ! always initialize request handles
271  rcv_req = mpi_request_null
272  snd_req = mpi_request_null
273 
274  if (this%enable_monitor) then
275  write (this%imon, '(2x,a,*(i3))') "process ids sending data: ", &
276  this%senders%get_values()
277  write (this%imon, '(2x,a,*(i3))') "process ids receiving data: ", &
278  this%receivers%get_values()
279  end if
280 
281  ! from cache or build
282  from_cache = this%is_cached(unit, stage)
283  if (.not. from_cache) then
284  call this%compose_messages(unit, stage, body_snd_t, body_rcv_t)
285  else
286  call this%load_messages(unit, stage, body_snd_t, body_rcv_t)
287  end if
288 
289  if (this%enable_monitor) then
290  write (this%imon, '(2x,a)') "== communicating bodies =="
291  end if
292 
293  ! recv bodies
294  do i = 1, this%senders%size
295  rnk = this%senders%at(i)
296  if (this%enable_monitor) then
297  write (this%imon, '(4x,a,i0)') "receiving from process: ", rnk
298  end if
299 
300  ! call extended type size function (*_x) to avoid overflow for large submodels
301  call mpi_type_size_x(body_rcv_t(i), msg_size, ierr)
302  if (msg_size > 0) then
303  call mpi_irecv(mpi_bottom, 1, body_rcv_t(i), rnk, stage, &
304  this%mpi_world%comm, rcv_req(i), ierr)
305  call check_mpi(ierr)
306  end if
307 
308  if (this%enable_monitor) then
309  write (this%imon, '(6x,a,i0)') "message body size: ", msg_size
310  end if
311  end do
312 
313  ! send bodies
314  do i = 1, this%receivers%size
315  rnk = this%receivers%at(i)
316  if (this%enable_monitor) then
317  write (this%imon, '(4x,a,i0)') "sending to process: ", rnk
318  end if
319 
320  ! call extended type size function (*_x) to avoid overflow for large submodels
321  call mpi_type_size_x(body_snd_t(i), msg_size, ierr)
322  if (msg_size > 0) then
323  call mpi_isend(mpi_bottom, 1, body_snd_t(i), rnk, stage, &
324  this%mpi_world%comm, snd_req(i), ierr)
325  call check_mpi(ierr)
326  end if
327 
328  if (this%enable_monitor) then
329  write (this%imon, '(6x,a,i0)') "message body size: ", msg_size
330  end if
331  call flush (this%imon)
332  end do
333 
334  ! wait for exchange of all messages
335  call g_prof%start("MPI_WaitAll ("//trim(stg_to_str(stage))//")", &
336  this%tmr_mpi_wait(stage, unit + 1))
337  call mpi_waitall(this%senders%size, rcv_req, rcv_stat, ierr)
338  call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
339  call check_mpi(ierr)
340 
341  deallocate (rcv_req, snd_req, rcv_stat)
342  deallocate (body_rcv_t, body_snd_t)
343 
344  end subroutine route_active
345 
346  !> @brief Constructs the message bodies' MPI datatypes.
347  !!
348  !! Constructs the message bodies' MPI datatypes for a
349  !! unit (a solution) and a given stage. This is done in
350  !! a sequence of 6 steps (distributed over 3 phases):
351  !!
352  !! == synchronizing headers (VdcHeaderType) ==
353  !!
354  !! step 1:
355  !! Receive headers from remote addresses requesting data
356  !! from virtual data containers (models, exchanges, ...) local to this process
357  !! step 2:
358  !! Send headers to remote addresses to indicate for which
359  !! virtual data containers (models, exchanges, ...) data is requested
360  !!
361  !! == synchronizing maps (VdcReceiverMapsType) ==
362  !!
363  !! step 3:
364  !! Based on the received headers, receive element maps (which elements are
365  !! to be sent from a contiguous array) for outgoing data
366  !! step 4:
367  !! Send element maps to remote addresses to specify incoming data
368  !!
369  !! == construct message body data types (VirtualDataContainerType) ==
370  !!
371  !! step 5:
372  !! Construct the message receive body based on the virtual
373  !! data items in the virtual data containers, and cache
374  !!
375  !! step 6:
376  !! Construct the message send body, based on received header data and
377  !! and maps, from the virtual data containers, and cache
378  !<
379  subroutine compose_messages(this, unit, stage, body_snd_t, body_rcv_t)
380  class(mpiroutertype) :: this
381  integer(I4B) :: unit
382  integer(I4B) :: stage
383  integer, dimension(:) :: body_snd_t
384  integer, dimension(:) :: body_rcv_t
385  ! local
386  integer(I4B) :: i, j, k
387  integer(I4B) :: rnk
388  integer :: ierr
389  ! mpi handles
390  integer, dimension(:), allocatable :: rcv_req
391  integer, dimension(:), allocatable :: snd_req
392  integer, dimension(:, :), allocatable :: rcv_stat
393  ! message header
394  integer(I4B) :: max_headers
395  type(vdcheadertype), dimension(:, :), allocatable :: headers
396  integer, dimension(:), allocatable :: hdr_rcv_t
397  integer, dimension(:), allocatable :: hdr_snd_t
398  integer, dimension(:), allocatable :: hdr_rcv_cnt
399  ! maps
400  type(vdcreceivermapstype), dimension(:, :), allocatable :: rcv_maps
401  integer, dimension(:), allocatable :: map_rcv_t
402  integer, dimension(:), allocatable :: map_snd_t
403 
404  ! allocate handles
405  allocate (rcv_req(this%receivers%size))
406  allocate (snd_req(this%senders%size))
407  allocate (rcv_stat(mpi_status_size, this%receivers%size))
408 
409  ! init handles
410  rcv_req = mpi_request_null
411  snd_req = mpi_request_null
412 
413  ! allocate header data
414  max_headers = size(this%rte_models) + size(this%rte_exchanges)
415  allocate (hdr_rcv_t(this%receivers%size))
416  allocate (hdr_snd_t(this%senders%size))
417  allocate (headers(max_headers, this%receivers%size))
418  allocate (hdr_rcv_cnt(this%receivers%size))
419 
420  ! allocate map data
421  allocate (map_snd_t(this%senders%size))
422  allocate (map_rcv_t(this%receivers%size))
423  allocate (rcv_maps(max_headers, this%receivers%size)) ! for every header, we potentially need the maps
424 
425  if (this%enable_monitor) then
426  write (this%imon, '(2x,a)') "== communicating headers =="
427  end if
428 
429  ! first receive headers for outward data
430  do i = 1, this%receivers%size
431  rnk = this%receivers%at(i)
432  if (this%enable_monitor) then
433  write (this%imon, '(4x,a,i0)') "Ireceive header from process: ", rnk
434  end if
435  call this%message_builder%create_header_rcv(hdr_rcv_t(i))
436  call mpi_irecv(headers(:, i), max_headers, hdr_rcv_t(i), rnk, stage, &
437  this%mpi_world%comm, rcv_req(i), ierr)
438  call check_mpi(ierr)
439  end do
440 
441  ! send header for incoming data
442  do i = 1, this%senders%size
443  rnk = this%senders%at(i)
444  if (this%enable_monitor) then
445  write (this%imon, '(4x,a,i0)') "send header to process: ", rnk
446  end if
447  call this%message_builder%create_header_snd(rnk, stage, hdr_snd_t(i))
448  call mpi_isend(mpi_bottom, 1, hdr_snd_t(i), rnk, stage, &
449  this%mpi_world%comm, snd_req(i), ierr)
450  call check_mpi(ierr)
451  end do
452 
453  ! wait for exchange of all headers
454  call g_prof%start("MPI_WaitAll ("//trim(stg_to_str(stage))//")", &
455  this%tmr_mpi_wait(stage, unit + 1))
456  call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
457  call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
458  call check_mpi(ierr)
459 
460  ! reinit handles
461  rcv_req = mpi_request_null
462  snd_req = mpi_request_null
463 
464  ! after WaitAll we can count incoming headers from statuses
465  do i = 1, this%receivers%size
466  call mpi_get_count(rcv_stat(:, i), hdr_rcv_t(i), hdr_rcv_cnt(i), ierr)
467 
468  if (this%enable_monitor) then
469  rnk = this%senders%at(i)
470  write (this%imon, '(4x,a,i0)') "received headers from process: ", rnk
471  write (this%imon, '(6x,a)') "expecting data for:"
472  do j = 1, hdr_rcv_cnt(i)
473  write (this%imon, '(6x,a,i0,a,a)') "id: ", headers(j, i)%id, &
474  " type: ", trim(vdc_type_to_str(headers(j, i)%container_type))
475  write (this%imon, '(6x,a,99i6)') "map sizes: ", headers(j, i)%map_sizes
476  end do
477  end if
478  end do
479 
480  ! clean up types
481  do i = 1, this%receivers%size
482  call mpi_type_free(hdr_rcv_t(i), ierr)
483  end do
484  do i = 1, this%senders%size
485  call mpi_type_free(hdr_snd_t(i), ierr)
486  end do
487 
488  if (this%enable_monitor) then
489  write (this%imon, '(2x,a)') "== communicating maps =="
490  end if
491 
492  ! allocate space for receiving maps
493  do i = 1, this%receivers%size
494  do j = 1, hdr_rcv_cnt(i)
495  call rcv_maps(j, i)%create(headers(j, i)%map_sizes)
496  end do
497  end do
498 
499  ! receive maps
500  do i = 1, this%receivers%size
501  rnk = this%receivers%at(i)
502  if (this%enable_monitor) then
503  write (this%imon, '(4x,a,i0)') "Ireceive maps from process: ", rnk
504  end if
505 
506  call this%message_builder%create_map_rcv(rcv_maps(:, i), hdr_rcv_cnt(i), &
507  map_rcv_t(i))
508  call mpi_irecv(mpi_bottom, 1, map_rcv_t(i), rnk, stage, &
509  this%mpi_world%comm, rcv_req(i), ierr)
510  call check_mpi(ierr)
511  end do
512 
513  ! send maps
514  do i = 1, this%senders%size
515  rnk = this%senders%at(i)
516  if (this%enable_monitor) then
517  write (this%imon, '(4x,a,i0)') "send map to process: ", rnk
518  end if
519 
520  call this%message_builder%create_map_snd(rnk, stage, map_snd_t(i))
521  call mpi_isend(mpi_bottom, 1, map_snd_t(i), rnk, stage, &
522  this%mpi_world%comm, snd_req(i), ierr)
523  call check_mpi(ierr)
524  end do
525 
526  ! wait on receiving maps
527  call g_prof%start("MPI_WaitAll ("//trim(stg_to_str(stage))//")", &
528  this%tmr_mpi_wait(stage, unit + 1))
529  call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
530  call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
531  call check_mpi(ierr)
532 
533  ! print maps
534  if (this%enable_monitor) then
535  do i = 1, this%receivers%size
536  rnk = this%receivers%at(i)
537  write (this%imon, '(4x,a,i0)') "received maps from process: ", rnk
538  do j = 1, hdr_rcv_cnt(i)
539  write (this%imon, '(6x,a,i0,a,a)') "id: ", headers(j, i)%id, &
540  " type: ", trim(vdc_type_to_str(headers(j, i)%container_type))
541  do k = 1, nr_vdc_element_maps
542  write (this%imon, '(8x,i0, a,i0)') k, " nr. elements: ", &
543  rcv_maps(j, i)%el_maps(k)%nr_virt_elems
544  if (rcv_maps(j, i)%el_maps(k)%nr_virt_elems > 0) then
545  write (this%imon, '(8x,*(i6))') &
546  rcv_maps(j, i)%el_maps(k)%remote_elem_shift
547  end if
548  end do
549  end do
550  end do
551  end if
552 
553  ! clean up types
554  do i = 1, this%receivers%size
555  call mpi_type_free(map_rcv_t(i), ierr)
556  end do
557  do i = 1, this%senders%size
558  call mpi_type_free(map_snd_t(i), ierr)
559  end do
560 
561  if (this%enable_monitor) then
562  write (this%imon, '(2x,a)') "== composing message bodies =="
563  end if
564 
565  ! construct recv bodies and cache
566  do i = 1, this%senders%size
567  rnk = this%senders%at(i)
568  if (this%enable_monitor) then
569  write (this%imon, '(4x,a,i0)') "build recv body for process: ", rnk
570  end if
571 
572  call this%message_builder%create_body_rcv(rnk, stage, body_rcv_t(i))
573  call this%msg_cache%put(unit, rnk, stage, mpi_bdy_rcv, body_rcv_t(i))
574  end do
575 
576  ! construct send bodies and cache
577  do i = 1, this%receivers%size
578  rnk = this%receivers%at(i)
579  if (this%enable_monitor) then
580  write (this%imon, '(4x,a,i0)') "build send body for process: ", rnk
581  end if
582 
583  call this%message_builder%create_body_snd( &
584  rnk, stage, headers(1:hdr_rcv_cnt(i), i), &
585  rcv_maps(:, i), body_snd_t(i))
586  call this%msg_cache%put(unit, rnk, stage, mpi_bdy_snd, body_snd_t(i))
587  end do
588 
589  ! clean up element maps
590  do i = 1, this%receivers%size
591  do j = 1, hdr_rcv_cnt(i)
592  call rcv_maps(j, i)%destroy()
593  end do
594  end do
595 
596  deallocate (rcv_req, snd_req, rcv_stat)
597  deallocate (hdr_rcv_t, hdr_snd_t, hdr_rcv_cnt)
598  deallocate (headers)
599  deallocate (map_rcv_t, map_snd_t)
600  deallocate (rcv_maps)
601 
602  end subroutine compose_messages
603 
604  !> @brief Load the message body MPI datatypes from cache
605  !<
606  subroutine load_messages(this, unit, stage, body_snd_t, body_rcv_t)
607  class(mpiroutertype) :: this
608  integer(I4B) :: unit
609  integer(I4B) :: stage
610  integer, dimension(:), allocatable :: body_snd_t
611  integer, dimension(:), allocatable :: body_rcv_t
612  ! local
613  integer(I4B) :: i, rnk
614 
615  if (this%enable_monitor) then
616  write (this%imon, '(2x,a)') "... running from cache ..."
617  end if
618 
619  do i = 1, this%receivers%size
620  rnk = this%receivers%at(i)
621  body_snd_t(i) = this%msg_cache%get(unit, rnk, stage, mpi_bdy_snd)
622  end do
623  do i = 1, this%senders%size
624  rnk = this%senders%at(i)
625  body_rcv_t(i) = this%msg_cache%get(unit, rnk, stage, mpi_bdy_rcv)
626  end do
627 
628  end subroutine load_messages
629 
630  subroutine update_senders(this)
631  class(mpiroutertype) :: this
632  ! local
633  integer(I4B) :: i
634  class(virtualdatacontainertype), pointer :: vdc
635 
636  call this%senders%clear()
637 
638  do i = 1, size(this%rte_models)
639  vdc => this%rte_models(i)%ptr
640  if (.not. vdc%is_local .and. vdc%is_active) then
641  call this%senders%push_back_unique(vdc%orig_rank)
642  end if
643  end do
644  do i = 1, size(this%rte_exchanges)
645  vdc => this%rte_exchanges(i)%ptr
646  if (.not. vdc%is_local .and. vdc%is_active) then
647  call this%senders%push_back_unique(vdc%orig_rank)
648  end if
649  end do
650 
651  end subroutine update_senders
652 
653  subroutine update_receivers(this)
654  class(mpiroutertype) :: this
655  ! local
656  integer(I4B) :: i, j
657  class(virtualdatacontainertype), pointer :: vdc
658 
659  call this%receivers%clear()
660 
661  if (.not. this%halo_activated) then
662  ! assuming symmetry for now
663  do i = 1, this%senders%size
664  call this%receivers%push_back(this%senders%at(i))
665  end do
666  else
667  ! get the receivers from the VDCs
668  do i = 1, size(this%rte_models)
669  vdc => this%rte_models(i)%ptr
670  do j = 1, vdc%rcv_ranks%size
671  call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
672  end do
673  end do
674  do i = 1, size(this%rte_exchanges)
675  vdc => this%rte_exchanges(i)%ptr
676  do j = 1, vdc%rcv_ranks%size
677  call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
678  end do
679  end do
680  end if
681 
682  end subroutine update_receivers
683 
684  !> @brief Check if this stage is cached
685  !<
686  function is_cached(this, unit, stage) result(in_cache)
687  use simmodule, only: ustop
688  class(mpiroutertype) :: this
689  integer(I4B) :: unit
690  integer(I4B) :: stage
691  logical(LGP) :: in_cache
692  ! local
693  integer(I4B) :: i, rnk
694  integer(I4B) :: no_cache_cnt
695  integer :: cached_type
696 
697  in_cache = .false.
698  no_cache_cnt = 0
699 
700  do i = 1, this%receivers%size
701  rnk = this%receivers%at(i)
702  cached_type = this%msg_cache%get(unit, rnk, stage, mpi_bdy_snd)
703  if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
704  end do
705  do i = 1, this%senders%size
706  rnk = this%senders%at(i)
707  cached_type = this%msg_cache%get(unit, rnk, stage, mpi_bdy_rcv)
708  if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
709  end do
710 
711  ! it should be all or nothing
712  if (no_cache_cnt == this%receivers%size + this%senders%size) then
713  in_cache = .false.
714  else if (no_cache_cnt == 0) then
715  in_cache = .true.
716  else
717  call ustop("Internal error: MPI message cache corrupt...")
718  end if
719 
720  end function is_cached
721 
722  subroutine mr_finalize(this)
723  class(mpiroutertype) :: this
724 
725  call this%msg_cache%clear()
726 
727  end subroutine mr_finalize
728 
729  subroutine mr_destroy(this)
730  class(mpiroutertype) :: this
731 
732  call this%msg_cache%destroy()
733 
734  call this%senders%destroy()
735  call this%receivers%destroy()
736 
737  deallocate (this%model_proc_ids)
738  deallocate (this%all_models)
739  deallocate (this%all_exchanges)
740 
741  deallocate (this%tmr_mpi_wait)
742 
743  end subroutine mr_destroy
744 
745 end module mpiroutermodule
This module contains simulation constants.
Definition: Constants.f90:9
integer(i4b), parameter linelength
maximum length of a standard line
Definition: Constants.f90:45
integer(i4b) function, public getunit()
Get a free unit number.
This module defines variable data types.
Definition: kind.f90:8
subroutine, public mem_print_detailed(iout)
integer(i4b), parameter, public mpi_bdy_snd
sending data (body) to ranks
integer(i4b), parameter, public mpi_bdy_rcv
receiving data (body) from ranks
subroutine mr_route_sln(this, virtual_sol, stage)
This will route all remote data from models and exchanges in a particular solution over MPI,...
Definition: MpiRouter.f90:213
subroutine route_active(this, unit, stage)
Routes the models and exchanges over MPI, either constructing the message bodies in a sequence of com...
Definition: MpiRouter.f90:238
subroutine compose_messages(this, unit, stage, body_snd_t, body_rcv_t)
Constructs the message bodies' MPI datatypes.
Definition: MpiRouter.f90:380
subroutine load_messages(this, unit, stage, body_snd_t, body_rcv_t)
Load the message body MPI datatypes from cache.
Definition: MpiRouter.f90:607
class(routerbasetype) function, pointer, public create_mpi_router()
Factory method to create MPI router.
Definition: MpiRouter.f90:59
logical(lgp) function is_cached(this, unit, stage)
Check if this stage is cached.
Definition: MpiRouter.f90:687
subroutine mr_route_all(this, stage)
This will route all remote data from the global models and exchanges over MPI, for a.
Definition: MpiRouter.f90:189
subroutine activate(this, models, exchanges)
Activate models and exchanges for routing.
Definition: MpiRouter.f90:164
subroutine mr_finalize(this)
Definition: MpiRouter.f90:723
subroutine deactivate(this)
Deactivate data after routing.
Definition: MpiRouter.f90:177
subroutine update_receivers(this)
Definition: MpiRouter.f90:654
subroutine update_senders(this)
Definition: MpiRouter.f90:631
subroutine mr_destroy(this)
Definition: MpiRouter.f90:730
subroutine mr_initialize(this)
Definition: MpiRouter.f90:69
type(mpiworldtype) function, pointer, public get_mpi_world()
Definition: MpiWorld.f90:32
subroutine, public check_mpi(mpi_error_code)
Check the MPI error code, report, and.
Definition: MpiWorld.f90:116
type(profilertype), public g_prof
the global timer object (to reduce trivial lines of code)
Definition: Profiler.f90:65
This module contains simulation methods.
Definition: Sim.f90:10
subroutine, public ustop(stopmess, ioutlocal)
Stop the simulation.
Definition: Sim.f90:312
subroutine, public store_error(msg, terminate)
Store an error message.
Definition: Sim.f90:92
integer(i4b), parameter, public nr_sim_stages
before exchange formulate (per solution)
Definition: SimStages.f90:24
character(len=24) function, public stg_to_str(stage)
Converts a stage to its string representation.
Definition: SimStages.f90:31
This module contains simulation variables.
Definition: SimVariables.f90:9
integer(i4b) nr_procs
integer(i4b) proc_id
integer(i4b), parameter, public nr_vdc_element_maps
Definition: VirtualBase.f90:16
character(len=24) function, public vdc_type_to_str(cntr_type)
@ Converts a virtual container type to its string representation
class(virtualdatacontainertype) function, pointer, public get_vdc_from_list(list, idx)
type(listtype), public virtual_model_list
type(listtype), public virtual_exchange_list
Facility to cache the constructed MPI datatypes. This will avoid having to construct them over and ov...
Wrapper for virtual data containers.
The Virtual Exchange is based on two Virtual Models and is therefore not always strictly local or rem...
This bundles all virtual data for a particular solution.