23 integer(I4B),
dimension(:),
pointer :: model_proc_ids
26 type(
vdcptrtype),
dimension(:),
pointer :: all_models => null()
27 type(
vdcptrtype),
dimension(:),
pointer :: all_exchanges => null()
28 type(
vdcptrtype),
dimension(:),
pointer :: rte_models => null()
29 type(
vdcptrtype),
dimension(:),
pointer :: rte_exchanges => null()
34 logical(LGP) :: enable_monitor
72 integer(I4B) :: nr_models, nr_exchanges
74 character(len=LINELENGTH) :: monitor_file
77 this%halo_activated = .false.
80 this%enable_monitor = .false.
83 call this%message_builder%init()
84 call this%msg_cache%init()
90 call this%senders%init()
91 call this%receivers%init()
96 allocate (this%model_proc_ids(nr_models))
97 allocate (this%all_models(nr_models))
98 allocate (this%all_exchanges(nr_exchanges))
102 this%all_models(i)%ptr => vdc
103 if (vdc%is_local)
then
104 this%model_proc_ids(i) =
proc_id
106 this%model_proc_ids(i) = 0
110 call mpi_allreduce(mpi_in_place, this%model_proc_ids, nr_models, &
111 mpi_integer, mpi_sum, this%mpi_world%comm, ierr)
117 call vdc%set_orig_rank(this%model_proc_ids(i))
120 do i = 1, nr_exchanges
122 this%all_exchanges(i)%ptr => vdc
123 select type (vex => vdc)
125 call vex%set_orig_rank(vex%v_model1%orig_rank)
126 if (vex%v_model1%is_local)
then
127 call vex%set_orig_rank(vex%v_model2%orig_rank)
133 if (this%enable_monitor)
then
135 write (monitor_file,
'(a,i0,a)')
"mpi.p",
proc_id,
".log"
136 open (unit=this%imon, file=monitor_file)
137 call this%message_builder%set_monitor(this%imon)
140 write (this%imon,
'(a,/)')
">> initialize MPI Router:"
141 write (this%imon,
'(2x,a,i0)')
"process id: ",
proc_id
142 write (this%imon,
'(2x,a,i0)')
"nr. of processes: ",
nr_procs
143 write (this%imon,
'(2x,a,i0)')
"nr. of models: ", nr_models
144 write (this%imon,
'(2x,a,i0)')
"nr. of exchanges: ", nr_exchanges
145 write (this%imon,
'(2x,2a)')
"model id, processor id:"
147 write (this%imon,
'(4x,2i8)') i, this%model_proc_ids(i)
149 write (this%imon,
'(a,/)')
"<< initialize done"
158 type(
vdcptrtype),
dimension(:),
pointer :: models
159 type(
vdcptrtype),
dimension(:),
pointer :: exchanges
161 this%rte_models => models
162 this%rte_exchanges => exchanges
163 call this%message_builder%attach_data(models, exchanges)
172 this%rte_models => null()
173 this%rte_exchanges => null()
174 call this%message_builder%release_data()
184 integer(I4B) :: stage
186 if (this%enable_monitor)
then
187 write (this%imon,
'(/,2a)')
">> routing all: ",
stg_to_str(stage)
191 call this%activate(this%all_models, this%all_exchanges)
192 call this%route_active(0, stage)
193 call this%deactivate()
195 if (this%enable_monitor)
then
196 write (this%imon,
'(a,/)')
"<< end routing all"
208 integer(I4B) :: stage
210 if (this%enable_monitor)
then
211 write (this%imon,
'(/,a,i0,2a)')
">> routing solution: ", &
212 virtual_sol%solution_id,
", ",
stg_to_str(stage)
216 call this%activate(virtual_sol%models, virtual_sol%exchanges)
217 call this%route_active(virtual_sol%solution_id, stage)
218 call this%deactivate()
220 if (this%enable_monitor)
then
221 write (this%imon,
'(a)')
"<< end routing solution"
234 integer(I4B) :: stage
238 integer :: ierr, msg_size
239 logical(LGP) :: from_cache
241 integer,
dimension(:),
allocatable :: rcv_req
242 integer,
dimension(:),
allocatable :: snd_req
243 integer,
dimension(:, :),
allocatable :: rcv_stat
246 integer,
dimension(:),
allocatable :: body_rcv_t
247 integer,
dimension(:),
allocatable :: body_snd_t
250 call this%update_senders()
251 call this%update_receivers()
254 allocate (body_rcv_t(this%senders%size))
255 allocate (body_snd_t(this%receivers%size))
258 allocate (rcv_req(this%senders%size))
259 allocate (snd_req(this%receivers%size))
260 allocate (rcv_stat(mpi_status_size, this%senders%size))
263 rcv_req = mpi_request_null
264 snd_req = mpi_request_null
266 if (this%enable_monitor)
then
267 write (this%imon,
'(2x,a,*(i3))')
"process ids sending data: ", &
268 this%senders%get_values()
269 write (this%imon,
'(2x,a,*(i3))')
"process ids receiving data: ", &
270 this%receivers%get_values()
274 from_cache = this%is_cached(unit, stage)
275 if (.not. from_cache)
then
276 call this%compose_messages(unit, stage, body_snd_t, body_rcv_t)
278 call this%load_messages(unit, stage, body_snd_t, body_rcv_t)
281 if (this%enable_monitor)
then
282 write (this%imon,
'(2x,a)')
"== communicating bodies =="
286 do i = 1, this%senders%size
287 rnk = this%senders%at(i)
288 if (this%enable_monitor)
then
289 write (this%imon,
'(4x,a,i0)')
"receiving from process: ", rnk
292 call mpi_type_size(body_rcv_t(i), msg_size, ierr)
293 if (msg_size > 0)
then
294 call mpi_irecv(mpi_bottom, 1, body_rcv_t(i), rnk, stage, &
295 this%mpi_world%comm, rcv_req(i), ierr)
299 if (this%enable_monitor)
then
300 write (this%imon,
'(6x,a,i0)')
"message body size: ", msg_size
305 do i = 1, this%receivers%size
306 rnk = this%receivers%at(i)
307 if (this%enable_monitor)
then
308 write (this%imon,
'(4x,a,i0)')
"sending to process: ", rnk
311 call mpi_type_size(body_snd_t(i), msg_size, ierr)
312 if (msg_size > 0)
then
313 call mpi_isend(mpi_bottom, 1, body_snd_t(i), rnk, stage, &
314 this%mpi_world%comm, snd_req(i), ierr)
318 if (this%enable_monitor)
then
319 write (this%imon,
'(6x,a,i0)')
"message body size: ", msg_size
321 call flush (this%imon)
325 call mpi_waitall(this%senders%size, rcv_req, rcv_stat, ierr)
328 deallocate (rcv_req, snd_req, rcv_stat)
329 deallocate (body_rcv_t, body_snd_t)
369 integer(I4B) :: stage
370 integer,
dimension(:) :: body_snd_t
371 integer,
dimension(:) :: body_rcv_t
373 integer(I4B) :: i, j, k
377 integer,
dimension(:),
allocatable :: rcv_req
378 integer,
dimension(:),
allocatable :: snd_req
379 integer,
dimension(:, :),
allocatable :: rcv_stat
381 integer(I4B) :: max_headers
383 integer,
dimension(:),
allocatable :: hdr_rcv_t
384 integer,
dimension(:),
allocatable :: hdr_snd_t
385 integer,
dimension(:),
allocatable :: hdr_rcv_cnt
388 integer,
dimension(:),
allocatable :: map_rcv_t
389 integer,
dimension(:),
allocatable :: map_snd_t
392 allocate (rcv_req(this%receivers%size))
393 allocate (snd_req(this%senders%size))
394 allocate (rcv_stat(mpi_status_size, this%receivers%size))
397 rcv_req = mpi_request_null
398 snd_req = mpi_request_null
401 max_headers =
size(this%rte_models) +
size(this%rte_exchanges)
402 allocate (hdr_rcv_t(this%receivers%size))
403 allocate (hdr_snd_t(this%senders%size))
404 allocate (headers(max_headers, this%receivers%size))
405 allocate (hdr_rcv_cnt(this%receivers%size))
408 allocate (map_snd_t(this%senders%size))
409 allocate (map_rcv_t(this%receivers%size))
410 allocate (rcv_maps(max_headers, this%receivers%size))
412 if (this%enable_monitor)
then
413 write (this%imon,
'(2x,a)')
"== communicating headers =="
417 do i = 1, this%receivers%size
418 rnk = this%receivers%at(i)
419 if (this%enable_monitor)
then
420 write (this%imon,
'(4x,a,i0)')
"Ireceive header from process: ", rnk
422 call this%message_builder%create_header_rcv(hdr_rcv_t(i))
423 call mpi_irecv(headers(:, i), max_headers, hdr_rcv_t(i), rnk, stage, &
424 this%mpi_world%comm, rcv_req(i), ierr)
429 do i = 1, this%senders%size
430 rnk = this%senders%at(i)
431 if (this%enable_monitor)
then
432 write (this%imon,
'(4x,a,i0)')
"send header to process: ", rnk
434 call this%message_builder%create_header_snd(rnk, stage, hdr_snd_t(i))
435 call mpi_isend(mpi_bottom, 1, hdr_snd_t(i), rnk, stage, &
436 this%mpi_world%comm, snd_req(i), ierr)
441 call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
445 rcv_req = mpi_request_null
446 snd_req = mpi_request_null
449 do i = 1, this%receivers%size
450 call mpi_get_count(rcv_stat(:, i), hdr_rcv_t(i), hdr_rcv_cnt(i), ierr)
452 if (this%enable_monitor)
then
453 rnk = this%senders%at(i)
454 write (this%imon,
'(4x,a,i0)')
"received headers from process: ", rnk
455 write (this%imon,
'(6x,a)')
"expecting data for:"
456 do j = 1, hdr_rcv_cnt(i)
457 write (this%imon,
'(6x,a,i0,a,a)')
"id: ", headers(j, i)%id, &
459 write (this%imon,
'(6x,a,99i6)')
"map sizes: ", headers(j, i)%map_sizes
465 do i = 1, this%receivers%size
466 call mpi_type_free(hdr_rcv_t(i), ierr)
468 do i = 1, this%senders%size
469 call mpi_type_free(hdr_snd_t(i), ierr)
472 if (this%enable_monitor)
then
473 write (this%imon,
'(2x,a)')
"== communicating maps =="
477 do i = 1, this%receivers%size
478 do j = 1, hdr_rcv_cnt(i)
479 call rcv_maps(j, i)%create(headers(j, i)%map_sizes)
484 do i = 1, this%receivers%size
485 rnk = this%receivers%at(i)
486 if (this%enable_monitor)
then
487 write (this%imon,
'(4x,a,i0)')
"Ireceive maps from process: ", rnk
490 call this%message_builder%create_map_rcv(rcv_maps(:, i), hdr_rcv_cnt(i), &
492 call mpi_irecv(mpi_bottom, 1, map_rcv_t(i), rnk, stage, &
493 this%mpi_world%comm, rcv_req(i), ierr)
498 do i = 1, this%senders%size
499 rnk = this%senders%at(i)
500 if (this%enable_monitor)
then
501 write (this%imon,
'(4x,a,i0)')
"send map to process: ", rnk
504 call this%message_builder%create_map_snd(rnk, stage, map_snd_t(i))
505 call mpi_isend(mpi_bottom, 1, map_snd_t(i), rnk, stage, &
506 this%mpi_world%comm, snd_req(i), ierr)
511 call mpi_waitall(this%receivers%size, rcv_req, rcv_stat, ierr)
515 if (this%enable_monitor)
then
516 do i = 1, this%receivers%size
517 rnk = this%receivers%at(i)
518 write (this%imon,
'(4x,a,i0)')
"received maps from process: ", rnk
519 do j = 1, hdr_rcv_cnt(i)
520 write (this%imon,
'(6x,a,i0,a,a)')
"id: ", headers(j, i)%id, &
523 write (this%imon,
'(8x,i0, a,i0)') k,
" nr. elements: ", &
524 rcv_maps(j, i)%el_maps(k)%nr_virt_elems
525 if (rcv_maps(j, i)%el_maps(k)%nr_virt_elems > 0)
then
526 write (this%imon,
'(8x,*(i6))') &
527 rcv_maps(j, i)%el_maps(k)%remote_elem_shift
535 do i = 1, this%receivers%size
536 call mpi_type_free(map_rcv_t(i), ierr)
538 do i = 1, this%senders%size
539 call mpi_type_free(map_snd_t(i), ierr)
542 if (this%enable_monitor)
then
543 write (this%imon,
'(2x,a)')
"== composing message bodies =="
547 do i = 1, this%senders%size
548 rnk = this%senders%at(i)
549 if (this%enable_monitor)
then
550 write (this%imon,
'(4x,a,i0)')
"build recv body for process: ", rnk
553 call this%message_builder%create_body_rcv(rnk, stage, body_rcv_t(i))
554 call this%msg_cache%put(unit, rnk, stage,
mpi_bdy_rcv, body_rcv_t(i))
558 do i = 1, this%receivers%size
559 rnk = this%receivers%at(i)
560 if (this%enable_monitor)
then
561 write (this%imon,
'(4x,a,i0)')
"build send body for process: ", rnk
564 call this%message_builder%create_body_snd( &
565 rnk, stage, headers(1:hdr_rcv_cnt(i), i), &
566 rcv_maps(:, i), body_snd_t(i))
567 call this%msg_cache%put(unit, rnk, stage,
mpi_bdy_snd, body_snd_t(i))
571 do i = 1, this%receivers%size
572 do j = 1, hdr_rcv_cnt(i)
573 call rcv_maps(j, i)%destroy()
577 deallocate (rcv_req, snd_req, rcv_stat)
578 deallocate (hdr_rcv_t, hdr_snd_t, hdr_rcv_cnt)
580 deallocate (map_rcv_t, map_snd_t)
581 deallocate (rcv_maps)
590 integer(I4B) :: stage
591 integer,
dimension(:),
allocatable :: body_snd_t
592 integer,
dimension(:),
allocatable :: body_rcv_t
594 integer(I4B) :: i, rnk
596 if (this%enable_monitor)
then
597 write (this%imon,
'(2x,a)')
"... running from cache ..."
600 do i = 1, this%receivers%size
601 rnk = this%receivers%at(i)
602 body_snd_t(i) = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_snd)
604 do i = 1, this%senders%size
605 rnk = this%senders%at(i)
606 body_rcv_t(i) = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_rcv)
617 call this%senders%clear()
619 do i = 1,
size(this%rte_models)
620 vdc => this%rte_models(i)%ptr
621 if (.not. vdc%is_local .and. vdc%is_active)
then
622 call this%senders%push_back_unique(vdc%orig_rank)
625 do i = 1,
size(this%rte_exchanges)
626 vdc => this%rte_exchanges(i)%ptr
627 if (.not. vdc%is_local .and. vdc%is_active)
then
628 call this%senders%push_back_unique(vdc%orig_rank)
640 call this%receivers%clear()
642 if (.not. this%halo_activated)
then
644 do i = 1, this%senders%size
645 call this%receivers%push_back(this%senders%at(i))
649 do i = 1,
size(this%rte_models)
650 vdc => this%rte_models(i)%ptr
651 do j = 1, vdc%rcv_ranks%size
652 call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
655 do i = 1,
size(this%rte_exchanges)
656 vdc => this%rte_exchanges(i)%ptr
657 do j = 1, vdc%rcv_ranks%size
658 call this%receivers%push_back_unique(vdc%rcv_ranks%at(j))
671 integer(I4B) :: stage
672 logical(LGP) :: in_cache
674 integer(I4B) :: i, rnk
675 integer(I4B) :: no_cache_cnt
676 integer :: cached_type
681 do i = 1, this%receivers%size
682 rnk = this%receivers%at(i)
683 cached_type = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_snd)
684 if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
686 do i = 1, this%senders%size
687 rnk = this%senders%at(i)
688 cached_type = this%msg_cache%get(unit, rnk, stage,
mpi_bdy_rcv)
689 if (cached_type == no_cached_value) no_cache_cnt = no_cache_cnt + 1
693 if (no_cache_cnt == this%receivers%size + this%senders%size)
then
695 else if (no_cache_cnt == 0)
then
698 call ustop(
"Internal error: MPI message cache corrupt...")
706 call this%msg_cache%destroy()
708 call this%senders%destroy()
709 call this%receivers%destroy()
711 deallocate (this%model_proc_ids)
712 deallocate (this%all_models)
713 deallocate (this%all_exchanges)
This module contains simulation constants.
integer(i4b), parameter linelength
maximum length of a standard line
This module defines variable data types.
subroutine, public mem_print_detailed(iout)
integer(i4b), parameter, public mpi_bdy_snd
sending data (body) to ranks
integer(i4b), parameter, public mpi_bdy_rcv
receiving data (body) from ranks
subroutine mr_route_sln(this, virtual_sol, stage)
This will route all remote data from models and exchanges in a particular solution over MPI,...
subroutine route_active(this, unit, stage)
Routes the models and exchanges over MPI, either constructing the message bodies in a sequence of com...
subroutine compose_messages(this, unit, stage, body_snd_t, body_rcv_t)
Constructs the message bodies' MPI datatypes.
subroutine load_messages(this, unit, stage, body_snd_t, body_rcv_t)
Load the message body MPI datatypes from cache.
class(routerbasetype) function, pointer, public create_mpi_router()
Factory method to create MPI router.
logical(lgp) function is_cached(this, unit, stage)
Check if this stage is cached.
subroutine mr_route_all(this, stage)
This will route all remote data from the global models and exchanges over MPI, for a.
subroutine activate(this, models, exchanges)
Activate models and exchanges for routing.
subroutine deactivate(this)
Deactivate data after routing.
subroutine update_receivers(this)
subroutine update_senders(this)
subroutine mr_destroy(this)
subroutine mr_initialize(this)
type(mpiworldtype) function, pointer, public get_mpi_world()
subroutine, public check_mpi(mpi_error_code)
Check the MPI error code, report, and.
This module contains simulation methods.
subroutine, public ustop(stopmess, ioutlocal)
Stop the simulation.
subroutine, public store_error(msg, terminate)
Store an error message.
character(len=24) function, public stg_to_str(stage)
Converts a stage to its string representation.
This module contains simulation variables.
integer(i4b), parameter, public nr_vdc_element_maps
character(len=24) function, public vdc_type_to_str(cntr_type)
@ Converts a virtual container type to its string representation
class(virtualdatacontainertype) function, pointer, public get_vdc_from_list(list, idx)
type(listtype), public virtual_model_list
type(listtype), public virtual_exchange_list
Facility to cache the constructed MPI datatypes. This will avoid having to construct them over and ov...
Wrapper for virtual data containers.
Container (list) of virtual data items.
The Virtual Exchange is based on two Virtual Models and is therefore not always strictly local or rem...
This bundles all virtual data for a particular solution.