24 integer(I4B),
dimension(:),
pointer :: model_proc_ids
27 type(
vdcptrtype),
dimension(:),
pointer :: all_models => null()
28 type(
vdcptrtype),
dimension(:),
pointer :: all_exchanges => null()
29 type(
vdcptrtype),
dimension(:),
pointer :: rte_models => null()
30 type(
vdcptrtype),
dimension(:),
pointer :: rte_exchanges => null()
35 logical(LGP) :: enable_monitor
36 integer(I4B),
dimension(:, :),
allocatable :: tmr_mpi_wait
75 integer(I4B) :: nr_models, nr_exchanges
77 character(len=LINELENGTH) :: monitor_file
80 allocate (this%tmr_mpi_wait(
nr_sim_stages, this%nr_virt_solutions + 1))
81 this%tmr_mpi_wait = -1
84 this%halo_activated = .false.
87 this%enable_monitor = .false.
90 call this%message_builder%init()
91 call this%msg_cache%init()
97 call this%senders%init()
98 call this%receivers%init()
103 allocate (this%model_proc_ids(nr_models))
104 allocate (this%all_models(nr_models))
105 allocate (this%all_exchanges(nr_exchanges))
109 this%all_models(i)%ptr => vdc
110 if (vdc%is_local)
then
111 this%model_proc_ids(i) =
proc_id
113 this%model_proc_ids(i) = 0
117 call mpi_allreduce(mpi_in_place, this%model_proc_ids, nr_models, &
118 mpi_integer, mpi_sum, this%mpi_world%comm, ierr)
124 call vdc%set_orig_rank(this%model_proc_ids(i))
127 do i = 1, nr_exchanges
129 this%all_exchanges(i)%ptr => vdc
130 select type (vex => vdc)
132 call vex%set_orig_rank(vex%v_model1%orig_rank)
133 if (vex%v_model1%is_local)
then
134 call vex%set_orig_rank(vex%v_model2%orig_rank)
140 if (this%enable_monitor)
then
142 write (monitor_file,
'(a,i0,a)')
"mpi.p",
proc_id,
".log"
143 open (unit=this%imon, file=monitor_file)
144 call this%message_builder%set_monitor(this%imon)
147 write (this%imon,
'(a,/)')
">> initialize MPI Router:"
148 write (this%imon,
'(2x,a,i0)')
"process id: ",
proc_id
149 write (this%imon,
'(2x,a,i0)')
"nr. of processes: ",
nr_procs
150 write (this%imon,
'(2x,a,i0)')
"nr. of models: ", nr_models
151 write (this%imon,
'(2x,a,i0)')
"nr. of exchanges: ", nr_exchanges
152 write (this%imon,
'(2x,2a)')
"model id, processor id:"
154 write (this%imon,
'(4x,2i8)') i, this%model_proc_ids(i)
156 write (this%imon,
'(a,/)')
"<< initialize done"
165 type(
vdcptrtype),
dimension(:),
pointer :: models
166 type(
vdcptrtype),
dimension(:),
pointer :: exchanges
168 this%rte_models => models
169 this%rte_exchanges => exchanges
170 call this%message_builder%attach_data(models, exchanges)
179 this%rte_models => null()
180 this%rte_exchanges => null()
181 call this%message_builder%release_data()
191 integer(I4B) :: stage
193 if (this%enable_monitor)
then
194 write (this%imon,
'(/,2a)')
">> routing all: ",
stg_to_str(stage)
198 call this%activate(this%all_models, this%all_exchanges)
199 call this%route_active(0, stage)
200 call this%deactivate()
202 if (this%enable_monitor)
then
203 write (this%imon,
'(a,/)')
"<< end routing all"
215 integer(I4B) :: stage
217 if (this%enable_monitor)
then
218 write (this%imon,
'(/,a,i0,2a)')
">> routing solution: ", &
219 virtual_sol%solution_id,
", ",
stg_to_str(stage)
223 call this%activate(virtual_sol%models, virtual_sol%exchanges)
224 call this%route_active(virtual_sol%solution_id, stage)
225 call this%deactivate()
227 if (this%enable_monitor)
then
228 write (this%imon,
'(a)')
"<< end routing solution"
241 integer(I4B) :: stage
246 integer(kind=MPI_COUNT_KIND) :: msg_size
247 logical(LGP) :: from_cache
249 integer,
dimension(:),
allocatable :: rcv_req
250 integer,
dimension(:),
allocatable :: snd_req
251 integer,
dimension(:, :),
allocatable :: rcv_stat
254 integer,
dimension(:),
allocatable :: body_rcv_t
255 integer,
dimension(:),
allocatable :: body_snd_t
258 call this%update_senders()
259 call this%update_receivers()
262 allocate (body_rcv_t(this%senders%size))
263 allocate (body_snd_t(this%receivers%size))
266 allocate (rcv_req(this%senders%size))
267 allocate (snd_req(this%receivers%size))
268 allocate (rcv_stat(mpi_status_size, this%senders%size))
271 rcv_req = mpi_request_null
272 snd_req = mpi_request_null
274 if (this%enable_monitor)
then
275 write (this%imon,
'(2x,a,*(i3))')
"process ids sending data: ", &
276 this%senders%get_values()
277 write (this%imon,
'(2x,a,*(i3))')
"process ids receiving data: ", &
278 this%receivers%get_values()
282 from_cache = this%is_cached(unit, stage)
283 if (.not. from_cache)
then
284 call this%compose_messages(unit, stage, body_snd_t, body_rcv_t)
286 call this%load_messages(unit, stage, body_snd_t, body_rcv_t)
289 if (this%enable_monitor)
then
290 write (this%imon,
'(2x,a)')
"== communicating bodies =="
294 do i = 1, this%senders%size
295 rnk = this%senders%at(i)
296 if (this%enable_monitor)
then
297 write (this%imon,
'(4x,a,i0)')
"receiving from process: ", rnk
301 call mpi_type_size_x(body_rcv_t(i), msg_size, ierr)
302 if (msg_size > 0)
then
303 call mpi_irecv(mpi_bottom, 1, body_rcv_t(i), rnk, stage, &
304 this%mpi_world%comm, rcv_req(i), ierr)
308 if (this%enable_monitor)
then
309 write (this%imon,
'(6x,a,i0)')
"message body size: ", msg_size
314 do i = 1, this%receivers%size
315 rnk = this%receivers%at(i)
316 if (this%enable_monitor)
then
317 write (this%imon,
'(4x,a,i0)')
"sending to process: ", rnk
321 call mpi_type_size_x(body_snd_t(i), msg_size, ierr)
322 if (msg_size > 0)
then
323 call mpi_isend(mpi_bottom, 1, body_snd_t(i), rnk, stage, &
324 this%mpi_world%comm, snd_req(i), ierr)
328 if (this%enable_monitor)
then
329 write (this%imon,
'(6x,a,i0)')
"message body size: ", msg_size
331 call flush (this%imon)
336 this%tmr_mpi_wait(stage, unit + 1))
337 call mpi_waitall(this%senders%size, rcv_req, rcv_stat, ierr)
338 call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
341 deallocate (rcv_req, snd_req, rcv_stat)
342 deallocate (body_rcv_t, body_snd_t)
382 integer(I4B) :: stage
383 integer,
dimension(:) :: body_snd_t
384 integer,
dimension(:) :: body_rcv_t
386 integer(I4B) :: i, j, k
390 integer,
dimension(:),
allocatable :: rcv_req
391 integer,
dimension(:),
allocatable :: snd_req
392 integer,
dimension(:, :),
allocatable :: rcv_stat
394 integer(I4B) :: max_headers
396 integer,
dimension(:),
allocatable :: hdr_rcv_t
397 integer,
dimension(:),
allocatable :: hdr_snd_t
398 integer,
dimension(:),
allocatable :: hdr_rcv_cnt
401 integer,
dimension(:),
allocatable :: map_rcv_t
402 integer,
dimension(:),
allocatable :: map_snd_t
405 allocate (rcv_req(this%receivers%size))
406 allocate (snd_req(this%senders%size))
407 allocate (rcv_stat(mpi_status_size, this%receivers%size))
410 rcv_req = mpi_request_null
411 snd_req = mpi_request_null
414 max_headers =
size(this%rte_models) +
size(this%rte_exchanges)
415 allocate (hdr_rcv_t(this%receivers%size))
416 allocate (hdr_snd_t(this%senders%size))
417 allocate (headers(max_headers, this%receivers%size))
418 allocate (hdr_rcv_cnt(this%receivers%size))
421 allocate (map_snd_t(this%senders%size))
422 allocate (map_rcv_t(this%receivers%size))
423 allocate (rcv_maps(max_headers, this%receivers%size))
425 if (this%enable_monitor)
then
426 write (this%imon,
'(2x,a)')
"== communicating headers =="
430 do i = 1, this%receivers%size
431 rnk = this%receivers%at(i)
432 if (this%enable_monitor)
then
433 write (this%imon,
'(4x,a,i0)')
"Ireceive header from process: ", rnk
435 call this%message_builder%create_header_rcv(hdr_rcv_t(i))
436 call mpi_irecv(headers(:, i), max_headers, hdr_rcv_t(i), rnk, stage, &
437 this%mpi_world%comm, rcv_req(i), ierr)
442 do i = 1, this%senders%size
443 rnk = this%senders%at(i)
444 if (this%enable_monitor)
then
445 write (this%imon,
'(4x,a,i0)')
"send header to process: ", rnk
447 call this%message_builder%create_header_snd(rnk, stage, hdr_snd_t(i))
448 call mpi_isend(mpi_bottom, 1, hdr_snd_t(i), rnk, stage, &
449 this%mpi_world%comm, snd_req(i), ierr)
455 this%tmr_mpi_wait(stage, unit + 1))
456 call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
457 call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
461 rcv_req = mpi_request_null
462 snd_req = mpi_request_null
465 do i = 1, this%receivers%size
466 call mpi_get_count(rcv_stat(:, i), hdr_rcv_t(i), hdr_rcv_cnt(i), ierr)
468 if (this%enable_monitor)
then
469 rnk = this%senders%at(i)
470 write (this%imon,
'(4x,a,i0)')
"received headers from process: ", rnk
471 write (this%imon,
'(6x,a)')
"expecting data for:"
472 do j = 1, hdr_rcv_cnt(i)
473 write (this%imon,
'(6x,a,i0,a,a)')
"id: ", headers(j, i)%id, &
475 write (this%imon,
'(6x,a,99i6)')
"map sizes: ", headers(j, i)%map_sizes
481 do i = 1, this%receivers%size
482 call mpi_type_free(hdr_rcv_t(i), ierr)
484 do i = 1, this%senders%size
485 call mpi_type_free(hdr_snd_t(i), ierr)
488 if (this%enable_monitor)
then
489 write (this%imon,
'(2x,a)')
"== communicating maps =="
493 do i = 1, this%receivers%size
494 do j = 1, hdr_rcv_cnt(i)
495 call rcv_maps(j, i)%create(headers(j, i)%map_sizes)
500 do i = 1, this%receivers%size
501 rnk = this%receivers%at(i)
502 if (this%enable_monitor)
then
503 write (this%imon,
'(4x,a,i0)')
"Ireceive maps from process: ", rnk
506 call this%message_builder%create_map_rcv(rcv_maps(:, i), hdr_rcv_cnt(i), &
508 call mpi_irecv(mpi_bottom, 1, map_rcv_t(i), rnk, stage, &
509 this%mpi_world%comm, rcv_req(i), ierr)
514 do i = 1, this%senders%size
515 rnk = this%senders%at(i)
516 if (this%enable_monitor)
then
517 write (this%imon,
'(4x,a,i0)')
"send map to process: ", rnk
520 call this%message_builder%create_map_snd(rnk, stage, map_snd_t(i))
521 call mpi_isend(mpi_bottom, 1, map_snd_t(i), rnk, stage, &
522 this%mpi_world%comm, snd_req(i), ierr)
528 this%tmr_mpi_wait(stage, unit + 1))
529 call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
530 call g_prof%stop(this%tmr_mpi_wait(stage, unit + 1))
534 if (this%enable_monitor)
then
535 do i = 1, this%receivers%size
536 rnk = this%receivers%at(i)
537 write (this%imon,
'(4x,a,i0)')
"received maps from process: ", rnk
538 do j = 1, hdr_rcv_cnt(i)
539 write (this%imon,
'(6x,a,i0,a,a)')
"id: ", headers(j, i)%id, &
542 write (this%imon,
'(8x,i0, a,i0)') k,
" nr. elements: ", &
543 rcv_maps(j, i)%el_maps(k)%nr_virt_elems
544 if (rcv_maps(j, i)%el_maps(k)%nr_virt_elems > 0)
then
545 write (this%imon,
'(8x,*(i6))') &
546 rcv_maps(j, i)%el_maps(k)%remote_elem_shift
554 do i = 1, this%receivers%size
555 call mpi_type_free(map_rcv_t(i), ierr)
557 do i = 1, this%senders%size
558 call mpi_type_free(map_snd_t(i), ierr)
561 if (this%enable_monitor)
then
562 write (this%imon,
'(2x,a)')
"== composing message bodies =="
566 do i = 1, this%senders%size
567 rnk = this%senders%at(i)
568 if (this%enable_monitor)
then
569 write (this%imon,
'(4x,a,i0)')
"build recv body for process: ", rnk
572 call this%message_builder%create_body_rcv(rnk, stage, body_rcv_t(i))
573 call this%msg_cache%put(unit, rnk, stage,
mpi_bdy_rcv, body_rcv_t(i))
577 do i = 1, this%receivers%size
578 rnk = this%receivers%at(i)
579 if (this%enable_monitor)
then
580 write (this%imon,
'(4x,a,i0)')
"build send body for process: ", rnk
583 call this%message_builder%create_body_snd( &
584 rnk, stage, headers(1:hdr_rcv_cnt(i), i), &
585 rcv_maps(:, i), body_snd_t(i))
586 call this%msg_cache%put(unit, rnk, stage,
mpi_bdy_snd, body_snd_t(i))
590 do i = 1, this%receivers%size
591 do j = 1, hdr_rcv_cnt(i)
592 call rcv_maps(j, i)%destroy()
596 deallocate (rcv_req, snd_req, rcv_stat)
597 deallocate (hdr_rcv_t, hdr_snd_t, hdr_rcv_cnt)
599 deallocate (map_rcv_t, map_snd_t)
600 deallocate (rcv_maps)
609 integer(I4B) :: stage
610 integer,
dimension(:),
allocatable :: body_snd_t
611 integer,
dimension(:),
allocatable :: body_rcv_t
613 integer(I4B) :: i, rnk
615 if (this%enable_monitor)
then
616 write (this%imon,
'(2x,a)')
"... running from cache ..."
619 do i = 1, this%receivers%size
620 rnk = this%receivers%at(i)
621 body_snd_t(i) = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_snd)
623 do i = 1, this%senders%size
624 rnk = this%senders%at(i)
625 body_rcv_t(i) = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_rcv)
636 call this%senders%clear()
638 do i = 1,
size(this%rte_models)
639 vdc => this%rte_models(i)%ptr
640 if (.not. vdc%is_local .and. vdc%is_active)
then
641 call this%senders%push_back_unique(vdc%orig_rank)
644 do i = 1,
size(this%rte_exchanges)
645 vdc => this%rte_exchanges(i)%ptr
646 if (.not. vdc%is_local .and. vdc%is_active)
then
647 call this%senders%push_back_unique(vdc%orig_rank)
659 call this%receivers%clear()
661 if (.not. this%halo_activated)
then
663 do i = 1, this%senders%size
664 call this%receivers%push_back(this%senders%at(i))
668 do i = 1,
size(this%rte_models)
669 vdc => this%rte_models(i)%ptr
670 do j = 1, vdc%rcv_ranks%size
671 call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
674 do i = 1,
size(this%rte_exchanges)
675 vdc => this%rte_exchanges(i)%ptr
676 do j = 1, vdc%rcv_ranks%size
677 call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
690 integer(I4B) :: stage
691 logical(LGP) :: in_cache
693 integer(I4B) :: i, rnk
694 integer(I4B) :: no_cache_cnt
695 integer :: cached_type
700 do i = 1, this%receivers%size
701 rnk = this%receivers%at(i)
702 cached_type = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_snd)
703 if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
705 do i = 1, this%senders%size
706 rnk = this%senders%at(i)
707 cached_type = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_rcv)
708 if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
712 if (no_cache_cnt == this%receivers%size + this%senders%size)
then
714 else if (no_cache_cnt == 0)
then
717 call ustop(
"Internal error: MPI message cache corrupt...")
725 call this%msg_cache%clear()
732 call this%msg_cache%destroy()
734 call this%senders%destroy()
735 call this%receivers%destroy()
737 deallocate (this%model_proc_ids)
738 deallocate (this%all_models)
739 deallocate (this%all_exchanges)
741 deallocate (this%tmr_mpi_wait)
This module contains simulation constants.
integer(i4b), parameter linelength
maximum length of a standard line
This module defines variable data types.
subroutine, public mem_print_detailed(iout)
integer(i4b), parameter, public mpi_bdy_snd
sending data (body) to ranks
integer(i4b), parameter, public mpi_bdy_rcv
receiving data (body) from ranks
subroutine mr_route_sln(this, virtual_sol, stage)
This will route all remote data from models and exchanges in a particular solution over MPI,...
subroutine route_active(this, unit, stage)
Routes the models and exchanges over MPI, either constructing the message bodies in a sequence of com...
subroutine compose_messages(this, unit, stage, body_snd_t, body_rcv_t)
Constructs the message bodies' MPI datatypes.
subroutine load_messages(this, unit, stage, body_snd_t, body_rcv_t)
Load the message body MPI datatypes from cache.
class(routerbasetype) function, pointer, public create_mpi_router()
Factory method to create MPI router.
logical(lgp) function is_cached(this, unit, stage)
Check if this stage is cached.
subroutine mr_route_all(this, stage)
This will route all remote data from the global models and exchanges over MPI, for a.
subroutine activate(this, models, exchanges)
Activate models and exchanges for routing.
subroutine mr_finalize(this)
subroutine deactivate(this)
Deactivate data after routing.
subroutine update_receivers(this)
subroutine update_senders(this)
subroutine mr_destroy(this)
subroutine mr_initialize(this)
type(mpiworldtype) function, pointer, public get_mpi_world()
subroutine, public check_mpi(mpi_error_code)
Check the MPI error code, report, and.
type(profilertype), public g_prof
the global timer object (to reduce trivial lines of code)
This module contains simulation methods.
subroutine, public ustop(stopmess, ioutlocal)
Stop the simulation.
subroutine, public store_error(msg, terminate)
Store an error message.
integer(i4b), parameter, public nr_sim_stages
before exchange formulate (per solution)
character(len=24) function, public stg_to_str(stage)
Converts a stage to its string representation.
This module contains simulation variables.
integer(i4b), parameter, public nr_vdc_element_maps
character(len=24) function, public vdc_type_to_str(cntr_type)
@ Converts a virtual container type to its string representation
class(virtualdatacontainertype) function, pointer, public get_vdc_from_list(list, idx)
type(listtype), public virtual_model_list
type(listtype), public virtual_exchange_list
Facility to cache the constructed MPI datatypes. This will avoid having to construct them over and ov...
Wrapper for virtual data containers.
Container (list) of virtual data items.
The Virtual Exchange is based on two Virtual Models and is therefore not always strictly local or rem...
This bundles all virtual data for a particular solution.