KaMPIng 0.1.1
Flexible and (near) zero-overhead C++ bindings for MPI
Loading...
Searching...
No Matches
allgather.hpp
1// This file is part of KaMPIng.
2//
3// Copyright 2022-2024 The KaMPIng Authors
4//
5// KaMPIng is free software : you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
6// License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
7// version. KaMPIng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
9// for more details.
10//
11// You should have received a copy of the GNU Lesser General Public License along with KaMPIng. If not, see
12// <https://www.gnu.org/licenses/>.
13
14#pragma once
15
16#include <numeric>
17
18#include <kassert/kassert.hpp>
19#include <mpi.h>
20
21#include "kamping/assertion_levels.hpp"
23#include "kamping/collectives/collectives_helpers.hpp"
24#include "kamping/comm_helper/is_same_on_all_ranks.hpp"
25#include "kamping/communicator.hpp"
32#include "kamping/result.hpp"
33
34/// @addtogroup kamping_collectives
35/// @{
36
37/// @brief Wrapper for \c MPI_Allgather.
38///
39/// This wrapper for \c MPI_Allgather collects the same amount of data from each rank to all ranks. It is semantically
40/// equivalent to performing a \c gather() followed by a broadcast of the collected data.
41///
42/// The following parameters are required:
43/// - \ref kamping::send_buf() containing the data that is sent to the root. This buffer has to be the same size at
44/// each rank. See allgather_v if the amounts differ.
45///
46/// The following parameters are optional:
47/// - \ref kamping::send_count() specifying how many elements are sent. If
48/// omitted, the size of the send buffer is used. This parameter is mandatory if \ref kamping::send_type() is given.
49///
50/// - \ref kamping::send_type() specifying the \c MPI datatype to use as send type. If omitted, the \c MPI datatype is
51/// derived automatically based on send_buf's underlying \c value_type.
52///
53/// - \ref kamping::recv_count() specifying how many elements are received. If
54/// omitted, the value of send_counts will be used. This parameter is mandatory if \ref kamping::recv_type() is given.
55///
56/// - \ref kamping::recv_buf() specifying a buffer for the output. Afterwards, this buffer will contain
57/// all data from all send buffers. This requires a size of the buffer of at least `recv_counts * communicator size`.
58///
59/// - \ref kamping::recv_type() specifying the \c MPI datatype to use as recv type. If omitted, the \c MPI datatype is
60/// derived automatically based on recv_buf's underlying \c value_type.
61///
62/// In-place allgather is supported by passing send_recv_buf() as parameter. This changes the requirements for the other
63/// parameters, see \ref Communicator::allgather_inplace.
64///
65/// @tparam Args Automatically deduced template parameters.
66/// @param args All required and any number of the optional parameters described above.
67/// @return Result object wrapping the output parameters to be returned by value.
68///
69/// @see \ref docs/parameter_handling.md for general information about parameter handling in KaMPIng.
70/// <hr>
71/// \include{doc} docs/resize_policy.dox
72template <
73 template <typename...>
74 typename DefaultContainerType,
75 template <typename, template <typename...> typename>
76 typename... Plugins>
77template <typename... Args>
79 using namespace kamping::internal;
81 if constexpr (inplace) {
82 return allgather_inplace(std::forward<Args>(args)...);
83 } else {
85 Args,
88 );
89
90 // get the send/recv buffer and types
91 auto&& send_buf_param =
92 internal::select_parameter_type<internal::ParameterType::send_buf>(args...).construct_buffer_or_rebind();
93 auto send_buf = send_buf_param.get();
94 using send_value_type = typename std::remove_reference_t<decltype(send_buf_param)>::value_type;
95
97 auto&& recv_buf =
98 internal::select_parameter_type_or_default<internal::ParameterType::recv_buf, default_recv_buf_type>(
99 std::tuple(),
100 args...
101 )
103 using recv_value_type = typename std::remove_reference_t<decltype(recv_buf)>::value_type;
104
105 auto&& [send_type, recv_type] =
106 internal::determine_mpi_datatypes<send_value_type, recv_value_type, decltype(recv_buf)>(args...);
107 [[maybe_unused]] constexpr bool send_type_is_input_parameter = !has_to_be_computed<decltype(send_type)>;
108 [[maybe_unused]] constexpr bool recv_type_is_input_parameter = !has_to_be_computed<decltype(recv_type)>;
109
110 KASSERT(
111 // if the send type is user provided, kamping no longer can deduce the number of elements to send from the
112 // size of the recv buffer
113 send_type_is_input_parameter || is_same_on_all_ranks(send_buf.size()),
114 "All PEs have to send the same number of elements. Use allgatherv, if you want to send a different number "
115 "of "
116 "elements.",
118 );
119
120 // get the send counts
122 auto&& send_count =
123 internal::select_parameter_type_or_default<internal::ParameterType::send_count, default_send_count_type>(
124 std::tuple(),
125 args...
126 )
127 .construct_buffer_or_rebind();
129 if constexpr (do_compute_send_count) {
130 send_count.underlying() = asserting_cast<int>(send_buf.size());
131 }
132
133 // get the receive counts
135 auto&& recv_count =
136 internal::select_parameter_type_or_default<internal::ParameterType::recv_count, default_recv_count_type>(
137 std::tuple(),
138 args...
139 )
140 .construct_buffer_or_rebind();
142 if constexpr (do_compute_recv_count) {
143 recv_count.underlying() = send_count.get_single_element();
144 }
145
146 auto compute_required_recv_buf_size = [&]() {
147 return asserting_cast<size_t>(recv_count.get_single_element()) * size();
148 };
149 recv_buf.resize_if_requested(compute_required_recv_buf_size);
150 KASSERT(
151 // if the recv type is user provided, kamping cannot make any assumptions about the required size of the
152 // recv buffer
154 "Recv buffer is not large enough to hold all received elements.",
156 );
157
158 // error code can be unused if KTHROW is removed at compile time
160 send_buf.data(),
161 send_count.get_single_element(),
162 send_type.get_single_element(),
163 recv_buf.data(),
164 recv_count.get_single_element(),
165 recv_type.get_single_element(),
166 this->mpi_communicator()
167 );
168 this->mpi_error_hook(err, "MPI_Allgather");
169
170 return make_mpi_result<std::tuple<Args...>>(
171 std::move(recv_buf),
172 std::move(send_count),
173 std::move(recv_count),
174 std::move(send_type),
175 std::move(recv_type)
176 );
177 }
178}
179
180/// @brief Wrapper for the in-place version of \c MPI_Allgather.
181///
182/// This variant must be called collectively by all ranks in the communicator.
183///
184/// The following parameters are required:
185/// - \ref kamping::send_recv_buf() containing the data that is sent to the root. Opposed to the non-inplace version,
186/// this is required to already have size `size() * send_recv_count` and the data contributed by each rank is already at
187/// the correct location in the buffer.
188///
189/// The following parameters are optional:
190/// - \ref kamping::send_recv_count() specifying how many elements are sent and received. If omitted, the size
191/// `send_recv_buf.size() / size()` is used.
192///
193/// - \ref kamping::send_recv_type() specifying the \c MPI datatype to use as send and recv type. If omitted, the \c MPI
194/// datatype is derived automatically based on send_recv_buf's underlying \c value_type.
195///
196/// @tparam Args Automatically deduced template parameters.
197/// @param args All required and any number of the optional parameters described above.
198/// @return Result object wrapping the output parameters to be returned by value.
199///
200/// @see \ref docs/parameter_handling.md for general information about parameter handling in KaMPIng.
201/// <hr>
202/// \include{doc} docs/resize_policy.dox
203template <
204 template <typename...>
205 typename DefaultContainerType,
206 template <typename, template <typename...> typename>
207 typename... Plugins>
208template <typename... Args>
210 using namespace kamping::internal;
212 Args,
215 );
216
217 // get the send/recv buffer and type
218 auto&& buffer =
219 internal::select_parameter_type<internal::ParameterType::send_recv_buf>(args...).construct_buffer_or_rebind();
220 using value_type = typename std::remove_reference_t<decltype(buffer)>::value_type;
221
222 auto&& type = internal::determine_mpi_send_recv_datatype<value_type, decltype(buffer)>(args...);
223 [[maybe_unused]] constexpr bool type_is_input_parameter = !has_to_be_computed<decltype(type)>;
224
225 KASSERT(
226 // if the type is user provided, kamping no longer can deduce the number of elements to send from the size
227 // of the recv buffer
228 type_is_input_parameter || is_same_on_all_ranks(buffer.size()),
229 "All PEs have to send the same number of elements. Use allgatherv, if you want to send a different number of "
230 "elements.",
232 );
233
234 // get the send counts
236 auto&& count =
237 internal::select_parameter_type_or_default<internal::ParameterType::send_recv_count, default_count_type>(
238 std::tuple(),
239 args...
240 )
241 .construct_buffer_or_rebind();
242 constexpr bool do_compute_count = internal::has_to_be_computed<decltype(count)>;
243 // Opposed the non-inplace version, this requires that the data contributed by each rank is already at the correct
244 // location in the buffer. Therefore the count is inferred as buffer.size() / size() if not given.
245 KASSERT(
246 (!do_compute_count || buffer.size() % size() == 0lu),
247 "There is no send_recv_count given and the number of elements in send_recv_buf is not divisible by the number "
248 "of "
249 "ranks "
250 "in the communicator.",
252 );
253 if constexpr (do_compute_count) {
254 count.underlying() = asserting_cast<int>(buffer.size() / size());
255 }
256
257 auto compute_required_buf_size = [&]() {
258 return asserting_cast<size_t>(count.get_single_element()) * size();
259 };
260 buffer.resize_if_requested(compute_required_buf_size);
261 KASSERT(
262 // if the type is user provided, kamping cannot make any assumptions about the required size of the buffer
264 "Recv buffer is not large enough to hold all received elements.",
266 );
267
268 // error code can be unused if KTHROW is removed at compile time
270 MPI_IN_PLACE, // sendbuf
271 0, // sendcount (ignored)
272 MPI_DATATYPE_NULL, // sendtype (ignored)
273 buffer.data(), // recvbuf
274 count.get_single_element(), // recvcount
275 type.get_single_element(), // recvtype
276 this->mpi_communicator() // communicator
277 );
278 this->mpi_error_hook(err, "MPI_Allgather");
279
280 return make_mpi_result<std::tuple<Args...>>(std::move(buffer), std::move(count), std::move(type));
281}
282
283/// @brief Wrapper for \c MPI_Allgatherv.
284///
285/// This wrapper for \c MPI_Allgatherv collects possibly different amounts of data from each rank to all ranks. It is
286/// semantically equivalent to performing a \c gatherv() followed by a broadcast of the collected data.
287///
288/// The following parameters are required:
289/// - kamping::send_buf() containing the data that is sent to all other ranks.
290///
291/// The following parameters are optional but result in communication overhead if omitted:
292/// - \ref kamping::recv_counts() containing the number of elements to receive from each rank. This parameter is
293/// mandatory if \ref kamping::recv_type() is given.
294///
295/// The following parameters are optional:
296/// - kamping::send_count() specifying how many elements are sent. If
297/// omitted, the size of the send buffer is used. This parameter is mandatory if \ref kamping::send_type() is given.
298///
299/// - \ref kamping::send_type() specifying the \c MPI datatype to use as send type. If omitted, the \c
300/// MPI datatype is derived automatically based on send_buf's underlying \c value_type.
301///
302/// - kamping::recv_buf() specifying a buffer for the output. Afterwards, this buffer will contain
303/// all data from all send buffers. This requires a size of the underlying storage of at least `max(recv_counts[i] +
304/// recv_displs[i])` for \c i in `[0, communicator size)`.
305///
306/// - kamping::recv_displs() containing the offsets of the messages in recv_buf. The `recv_counts[i]`
307/// elements starting at `recv_buf[recv_displs[i]]` will be received from rank `i`. If omitted, this is calculated as
308/// the exclusive prefix-sum of `recv_counts`.
309///
310/// - \ref kamping::recv_type() specifying the \c MPI datatype to use as recv type. If omitted, the \c
311/// MPI datatype is derived automatically based on recv_buf's underlying \c value_type.
312///
313/// @tparam Args Automatically deduced template parameters.
314/// @param args All required and any number of the optional parameters described above.
315/// @return Result object wrapping the output parameters to be returned by value.
316///
317/// @see \ref docs/parameter_handling.md for general information about parameter handling in KaMPIng.
318/// <hr>
319/// \include{doc} docs/resize_policy.dox
320template <
321 template <typename...>
322 typename DefaultContainerType,
323 template <typename, template <typename...> typename>
324 typename... Plugins>
325template <typename... Args>
327 using namespace kamping::internal;
329 Args,
332 );
333
334 // get send_buf
335 auto&& send_buf =
336 internal::select_parameter_type<internal::ParameterType::send_buf>(args...).construct_buffer_or_rebind();
337 using send_value_type = typename std::remove_reference_t<decltype(send_buf)>::value_type;
338
339 // get recv_buf
341 auto&& recv_buf =
342 internal::select_parameter_type_or_default<internal::ParameterType::recv_buf, default_recv_buf_type>(
343 std::tuple(),
344 args...
345 )
347 using recv_value_type = typename std::remove_reference_t<decltype(recv_buf)>::value_type;
348
349 // get send/recv types
350 auto&& [send_type, recv_type] =
351 internal::determine_mpi_datatypes<send_value_type, recv_value_type, decltype(recv_buf)>(args...);
354
355 // get the send counts
357 auto&& send_count =
358 internal::select_parameter_type_or_default<internal::ParameterType::send_count, default_send_count_type>(
359 std::tuple(),
360 args...
361 )
362 .construct_buffer_or_rebind();
364 if constexpr (do_compute_send_count) {
365 send_count.underlying() = asserting_cast<int>(send_buf.size());
366 }
367 // get the recv counts
369 auto&& recv_counts =
370 internal::select_parameter_type_or_default<internal::ParameterType::recv_counts, default_recv_counts_type>(
371 std::tuple(),
372 args...
373 )
375 using recv_counts_type = typename std::remove_reference_t<decltype(recv_counts)>::value_type;
376 static_assert(std::is_same_v<std::remove_const_t<recv_counts_type>, int>, "Recv counts must be of type int");
377 // calculate recv_counts if necessary
379 KASSERT(
380 is_same_on_all_ranks(do_calculate_recv_counts),
381 "Receive counts are given on some ranks and have to be computed on others",
383 );
384 if constexpr (do_calculate_recv_counts) {
385 recv_counts.resize_if_requested([&]() { return this->size(); });
386 KASSERT(recv_counts.size() >= this->size(), "Recv counts buffer is not large enough.", assert::light);
387 this->allgather(
388 kamping::send_buf(static_cast<int>(send_count.get_single_element())),
390 );
391 } else {
392 KASSERT(recv_counts.size() >= this->size(), "Recv counts buffer is not large enough.", assert::light);
393 }
394
395 // Get recv_displs
397 auto&& recv_displs =
398 internal::select_parameter_type_or_default<internal::ParameterType::recv_displs, default_recv_displs_type>(
399 std::tuple(),
400 args...
401 )
403 using recv_displs_type = typename std::remove_reference_t<decltype(recv_displs)>::value_type;
404 static_assert(std::is_same_v<std::remove_const_t<recv_displs_type>, int>, "Recv displs must be of type int");
405
407
408 // Calculate recv_displs if necessary
410 KASSERT(
411 is_same_on_all_ranks(do_calculate_recv_displs),
412 "Receive displacements are given on some ranks and have to be computed on others",
414 );
415 if constexpr (do_calculate_recv_displs) {
416 recv_displs.resize_if_requested([&]() { return this->size(); });
417 KASSERT(recv_displs.size() >= this->size(), "Recv displs buffer is not large enough.", assert::light);
418 std::exclusive_scan(recv_counts.data(), recv_counts.data() + this->size(), recv_displs.data(), 0);
419 } else {
420 KASSERT(recv_displs.size() >= this->size(), "Recv displs buffer is not large enough.", assert::light);
421 }
422
423 auto compute_required_recv_buf_size = [&]() {
424 return compute_required_recv_buf_size_in_vectorized_communication(recv_counts, recv_displs, this->size());
425 };
426 recv_buf.resize_if_requested(compute_required_recv_buf_size);
427 KASSERT(
428 // if the recv type is user provided, kamping cannot make any assumptions about the required size of the recv
429 // buffer
431 "Recv buffer is not large enough to hold all received elements.",
433 );
434
435 // error code can be unused if KTHROW is removed at compile time
437 send_buf.data(), // sendbuf
438 send_count.get_single_element(), // sendcount
439 send_type.get_single_element(), // sendtype
440 recv_buf.data(), // recvbuf
441 recv_counts.data(), // recvcounts
442 recv_displs.data(), // recvdispls
443 recv_type.get_single_element(), // recvtype
444 this->mpi_communicator() // communicator
445 );
446 this->mpi_error_hook(err, "MPI_Allgatherv");
447
448 return make_mpi_result<std::tuple<Args...>>(
449 std::move(recv_buf),
450 std::move(send_count),
451 std::move(recv_counts),
452 std::move(recv_displs),
453 std::move(send_type),
454 std::move(recv_type)
455 );
456}
457/// @}
Helper functions that make casts safer.
STL-compatible allocator for requesting memory using the builtin MPI allocator.
Definition allocator.hpp:32
T value_type
The value type.
Definition allocator.hpp:53
constexpr int light
Assertion level for lightweight assertions.
Definition assertion_levels.hpp:13
constexpr int light_communication
Assertions that perform lightweight communication.
Definition assertion_levels.hpp:25
auto allgather_inplace(Args... args) const
Wrapper for the in-place version of MPI_Allgather.
Definition allgather.hpp:209
auto allgather(Args... args) const
Wrapper for MPI_Allgather.
Definition allgather.hpp:78
auto allgatherv(Args... args) const
Wrapper for MPI_Allgatherv.
Definition allgather.hpp:326
static constexpr auto alloc_new
Convenience wrapper for creating library allocated containers. See AllocNewT for details.
Definition data_buffer.hpp:194
auto send_count(int count)
Passes count as send count to the underlying call.
Definition named_parameters.hpp:321
auto recv_counts(Container &&container)
Passes a container as recv counts to the underlying call, i.e. the container's storage must contain t...
Definition named_parameters.hpp:366
auto send_type(MPI_Datatype send_type)
Passes send_type as send type to the underlying call.
Definition named_parameters.hpp:1195
auto recv_buf(Container &&container)
Passes a container, into which the received elements will be written, to the underlying call....
Definition named_parameters.hpp:859
auto send_buf(internal::ignore_t< Data > ignore)
Generates a dummy send buf that wraps a nullptr.
Definition named_parameters.hpp:51
auto send_count_out()
Indicates to deduce the send count and return it to the caller as part of the underlying call's resul...
Definition named_parameters.hpp:347
auto send_recv_buf(Data &&data)
Passes a container/single value as a send or receive buffer to the underlying MPI call.
Definition named_parameters.hpp:137
auto recv_counts_out()
Indicates to construct a container with type kamping::Communicator::default_container_type<int>,...
Definition named_parameters.hpp:478
auto recv_displs_out()
Indicates to construct a container with type kamping::Communicator::default_container_type<int>,...
Definition named_parameters.hpp:802
auto send_recv_count(int count)
Passes count as send/recv count to the underlying call.
Definition named_parameters.hpp:529
auto recv_displs(Container &&container)
Passes a container as receive displacements to the underlying call, i.e. the container's storage must...
Definition named_parameters.hpp:697
auto recv_count(int count)
Passes count as recv count to the underlying call.
Definition named_parameters.hpp:490
auto recv_type(MPI_Datatype recv_type)
Passes recv_type as recv type to the underlying call.
Definition named_parameters.hpp:1238
auto send_recv_type(MPI_Datatype send_recv_type)
Passes send_recv_type as send/recv type to the underlying call. (This parameter is in MPI routines su...
Definition named_parameters.hpp:1282
auto recv_count_out()
Indicates to deduce the recv count and return it to the caller as part of the underlying call's resul...
Definition named_parameters.hpp:516
auto send_recv_count_out()
Indicates to deduce the send/recv count and return it to the caller as part of the underlying call's ...
Definition named_parameters.hpp:555
constexpr bool has_parameter_type()
Checks if parameter with requested parameter type exists.
Definition named_parameter_selection.hpp:186
Utility that maps C++ types to types that can be understood by MPI.
Template magic to check named parameters passed to wrappers at compile time.
#define KAMPING_REQUIRED_PARAMETERS(...)
Wrapper to pass (possibly empty) list of parameter type names as required parameters to KAMPING_CHECK...
Definition named_parameter_check.hpp:52
#define KAMPING_OPTIONAL_PARAMETERS(...)
Wrapper to pass (possibly empty) list of parameter type names as optional parameters to KAMPING_CHECK...
Definition named_parameter_check.hpp:58
#define KAMPING_CHECK_PARAMETERS(args, required, optional)
Assertion macro that checks if passed parameters are correct, i.e., all parameter types are unique,...
Definition named_parameter_check.hpp:80
Template magic to implement named parameters in cpp.
File containing the parameter types used by the KaMPIng library.
Factory methods for buffer wrappers.
Internal namespace marking the code that is not user-facing.
Definition collectives_helpers.hpp:20
static constexpr bool has_to_be_computed
Checks if the buffer has to be computed by kamping, i.e. if it is an output parameter or the buffer h...
Definition named_parameter_check.hpp:398
Some functions and types simplifying/enabling the development of wrapped MPI calls in KaMPIng.