9#include "kamping/communicator.hpp"
14#include "kamping/named_parameters_detail/status_parameters.hpp"
15#include "kamping/p2p/isend.hpp"
16#include "kamping/p2p/recv.hpp"
17#include "kamping/plugin/plugin_helpers.hpp"
19namespace kamping::plugin {
22namespace reproducible_reduce {
34constexpr uint8_t MAX_MESSAGE_LENGTH = 4;
35constexpr int MESSAGEBUFFER_MPI_TAG = 0xb586772;
40template <
typename T,
typename Communicator>
49 internal::BufferModifiability::modifiable,
50 internal::BufferOwnership::owning,
51 internal::BufferType::out_buffer,
53 internal::BufferAllocation::lib_allocated,
69 _send_buffer_clear(
true),
71 _outbox.reserve(MAX_MESSAGE_LENGTH + 1);
72 _buffer.reserve(MAX_MESSAGE_LENGTH + 1);
81 tag(MESSAGEBUFFER_MPI_TAG),
87 for (
auto const entry: _buffer) {
96 if (!_target_rank.has_value() || _outbox.empty())
99 _request = std::make_unique<ResultType>(send());
102 _target_rank.reset();
103 _send_buffer_clear =
false;
108 if (_send_buffer_clear) {
114 _send_buffer_clear =
true;
128 bool const outbox_full = _outbox.size() >= MAX_MESSAGE_LENGTH;
140 KASSERT(_outbox.size() < _outbox.capacity());
141 KASSERT(_outbox.capacity() > 0);
143 _outbox.push_back(
entry);
145 if (_outbox.size() >= MAX_MESSAGE_LENGTH) {
159 auto const entry = _inbox.find(index);
162 if (
entry != _inbox.end()) {
164 value =
entry->second;
172 auto const new_entry = _inbox.find(index);
187 std::array<MessageBufferEntry<T>, MAX_MESSAGE_LENGTH> _entries;
188 std::map<uint64_t, T> _inbox;
189 std::optional<int> _target_rank;
190 std::vector<MessageBufferEntry<T>> _outbox;
191 std::vector<MessageBufferEntry<T>> _buffer;
192 std::unique_ptr<ResultType> _request;
193 size_t _awaited_numbers;
194 size_t _sent_messages;
195 size_t _sent_elements;
196 bool _send_buffer_clear;
203inline auto tree_parent(
size_t const i) {
211inline auto tree_subtree_size(
size_t const i) {
212 auto const largest_child_index{i | (i - 1)};
213 return largest_child_index + 1 - i;
218inline auto tree_rank_from_index_map(std::map<size_t, size_t>
const& start_indices,
size_t const index) {
220 auto it = start_indices.upper_bound(index);
221 KASSERT(it != start_indices.begin());
231inline auto tree_rank_intersecting_elements(
size_t const region_begin,
size_t const region_end) {
232 std::vector<size_t> result;
234 size_t const region_size = region_end - region_begin;
236 if (region_begin == 0 || region_size == 0) {
240 size_t index{region_begin};
241 while (index < region_end) {
243 KASSERT(tree_parent(index) < region_begin);
245 result.push_back(index);
246 index += tree_subtree_size(index);
259inline auto log2l(
size_t const value) {
262 unsigned int target_value = 0;
271inline size_t subtree_height(
size_t const index) {
274 return log2l(tree_subtree_size(index));
278inline size_t tree_height(
size_t const global_size) {
279 if (global_size == 0) {
283 unsigned int result = log2l(global_size);
285 if (global_size > (1UL << result)) {
296template <
typename T,
typename Communicator>
317 _origin_rank{_global_size == 0 ? 0
UL : tree_rank_from_index_map(_start_indices, 0)},
319 _rank_intersecting_elements(tree_rank_intersecting_elements(_region_begin, _region_end)),
320 _reduce_buffer(_region_size),
321 _message_buffer(_comm) {}
332 template <
typename...
Args>
338 internal::select_parameter_type<internal::ParameterType::send_buf>(
args...).construct_buffer_or_rebind();
343 "send_buf must have the same size as specified during creation of the reproducible communicator. "
344 <<
"Is " <<
send_buf.size() <<
" but should be " << _region_size <<
" on rank " << _comm.
rank()
348 std::is_same_v<std::remove_const_t<send_value_type>, T>,
349 "send type must be equal to the type used during Communicator initiation"
353 auto&
operation_param = internal::select_parameter_type<internal::ParameterType::op>(
args...);
361 template <
typename Func>
363 for (
auto const index: _rank_intersecting_elements) {
364 if (tree_subtree_size(index) > 16) {
367 _message_buffer.flush();
369 auto const target_rank = tree_rank_from_index_map(_start_indices, tree_parent(index));
370 T
const value = _perform_reduce(index,
buffer,
op);
374 _message_buffer.flush();
375 _message_buffer.wait();
378 if (_comm.
rank() == _origin_rank) {
387 template <
typename Func>
388 T
const _perform_reduce(
size_t const index, T
const* buffer, Func&&
op) {
389 if ((index & 1) == 1) {
390 return buffer[index - _region_begin];
394 (index == 0) ? _global_size - 1 :
std::
min(_global_size - 1, index + tree_subtree_size(index) - 1);
395 size_t const max_y = (index == 0) ? tree_height(_global_size) : subtree_height(index);
397 KASSERT(max_y < 64,
"Unreasonably large max_y");
399 size_t const largest_local_index = std::min(max_x, _region_end - 1);
400 auto const n_local_elements = largest_local_index + 1 - index;
402 size_t elements_in_buffer = n_local_elements;
403 T* destination_buffer = _reduce_buffer.data();
404 T
const* source_buffer =
static_cast<T const*
>(buffer + (index - _region_begin));
406 for (
size_t y = 1; y <= max_y; y += 1) {
407 size_t const stride = 1UL << (y - 1);
408 size_t elements_written = 0;
410 for (
size_t x = 0; x + 2 <= elements_in_buffer; x += 2) {
411 T
const a = source_buffer[x];
412 T
const b = source_buffer[x + 1];
413 destination_buffer[elements_written++] =
op(a, b);
415 size_t const remaining_elements = elements_in_buffer - 2 * elements_written;
416 KASSERT(remaining_elements <= 1);
418 if (remaining_elements == 1) {
419 auto const indexA = index + (elements_in_buffer - 1) * stride;
420 auto const indexB = indexA + stride;
422 T
const elementA = source_buffer[elements_in_buffer - 1];
423 if (indexB > max_x) {
425 destination_buffer[elements_written++] = elementA;
427 auto const source_rank = tree_rank_from_index_map(_start_indices, indexB);
428 T elementB = _message_buffer.get(asserting_cast<int>(source_rank), indexB);
429 destination_buffer[elements_written++] =
op(elementA, elementB);
434 source_buffer = destination_buffer;
435 elements_in_buffer = elements_written;
438 KASSERT(elements_in_buffer == 1);
439 return destination_buffer[0];
442 std::map<size_t, size_t>
const _start_indices;
443 size_t const _region_begin, _region_size, _region_end, _global_size;
444 size_t const _origin_rank;
445 Communicator
const& _comm;
446 std::vector<size_t>
const _rank_intersecting_elements;
447 std::vector<T> _reduce_buffer;
448 MessageBuffer<T, Communicator> _message_buffer;
471template <
typename Comm,
template <
typename...>
typename DefaultContainerType>
492 template <
typename T,
typename...
Args>
494 using namespace kamping;
504 internal::select_parameter_type_or_default<internal::ParameterType::recv_displs, default_recv_displs_type>(
510 static_assert(std::is_same_v<std::remove_const_t<recv_displs_type>,
int>,
"Recv displs must be of type int");
512 auto const&
send_counts = internal::select_parameter_type<internal::ParameterType::send_counts>(
args...)
517 auto comm = this->to_communicator();
529 "send_counts value for rank " <<
i <<
" is not uniform across the cluster",
534 "recv_displs value for rank " <<
i <<
" is not uniform across the cluster",
543 for (
size_t p = 0;
p <
comm.size(); ++
p) {
560 auto const next = std::next(
it);
564 auto const rank =
it->second;
572 "Region of rank " << rank <<
" ends at index " <<
region_end <<
", but next region of rank "
578 this->to_communicator(),
@ source
only additionally add the source PE in the envelope (if possible)
Definition alltoall_grid.hpp:32
Helper functions that make casts safer.
Wrapper for MPI communicator providing access to rank() and size() of the communicator....
Definition communicator.hpp:49
size_t rank() const
Rank of the current MPI process in the communicator as size_t.
Definition communicator.hpp:155
STL-compatible allocator for requesting memory using the builtin MPI allocator.
Definition allocator.hpp:32
NonBlockingResult contains the result of a non-blocking MPI call wrapped by KaMPIng....
Definition result.hpp:1108
Wrapper for MPI request handles (aka. MPI_Request).
Definition request.hpp:145
Data buffer used for named parameters.
Definition data_buffer.hpp:371
Reproducible reduction of distributed arrays.
Definition reproducible_reduce.hpp:473
auto make_reproducible_comm(Args... args)
Create a communicator with a fixed distribution of a global array that can perform reductions in the ...
Definition reproducible_reduce.hpp:493
Responsible for storing and communicating intermediate results between PEs.
Definition reproducible_reduce.hpp:41
void put(int const target_rank, size_t const index, T const value)
Store an intermediate result inside the message buffer for eventual transmission to its destination.
Definition reproducible_reduce.hpp:127
void wait(void)
Wait until the message dispatched by flush() is actually sent and clear any stored values.
Definition reproducible_reduce.hpp:107
void receive(int const source_rank)
Receive a message from another PE and store its contents.
Definition reproducible_reduce.hpp:78
MessageBuffer(Communicator const &comm)
Construct a new message buffer utilizing the given communicator comm.
Definition reproducible_reduce.hpp:59
T const get(int const source_rank, size_t const index)
Get the intermediate result with the specified index from source_rank.
Definition reproducible_reduce.hpp:158
void flush(void)
Asynchronously send locally stored intermediate results.
Definition reproducible_reduce.hpp:95
Communicator that can reproducibly reduce an array of a fixed size according to a binary tree scheme.
Definition reproducible_reduce.hpp:297
T const reproducible_reduce(Args... args)
Reproducible reduction according to pre-initialized scheme. The following parameters are required:
Definition reproducible_reduce.hpp:333
ReproducibleCommunicator(Communicator const &comm, std::map< size_t, size_t > const start_indices, size_t const region_begin, size_t const region_size)
Create a new reproducible communicator.
Definition reproducible_reduce.hpp:306
constexpr int light_communication
Assertions that perform lightweight communication.
Definition assertion_levels.hpp:25
auto bcast_single(Args... args) const
Wrapper for MPI_Bcast.
Definition bcast.hpp:246
static constexpr auto alloc_new
Convenience wrapper for creating library allocated containers. See AllocNewT for details.
Definition data_buffer.hpp:194
@ no_resize
Policy indicating that the underlying buffer shall never be resized.
auto tag(internal::any_tag_t)
Indicates to use MPI_ANY_TAG as tag in the underlying call.
Definition named_parameters.hpp:1064
auto destination(int rank)
Passes rank as destination rank to the underlying call. This parameter is needed in point-to-point ex...
Definition named_parameters.hpp:999
auto root(int rank)
Passes rank as root rank to the underlying call. This parameter is needed in functions like MPI_Gathe...
Definition named_parameters.hpp:979
internal::OperationBuilder< Op, Commutative > op(Op &&op, Commutative commute=ops::internal::undefined_commutative_tag{})
Passes a reduction operation to ther underlying call. Accepts function objects, lambdas,...
Definition named_parameters.hpp:1155
auto request()
Internally allocate a request object and return it to the user.
Definition named_parameters.hpp:1122
auto send_buf(internal::ignore_t< Data > ignore)
Generates a dummy send buf that wraps a nullptr.
Definition named_parameters.hpp:51
auto send_recv_buf(Data &&data)
Passes a container/single value as a send or receive buffer to the underlying MPI call.
Definition named_parameters.hpp:137
auto recv_displs_out()
Indicates to construct a container with type kamping::Communicator::default_container_type<int>,...
Definition named_parameters.hpp:802
auto send_counts(Container &&container)
Passes a container as send counts to the underlying call, i.e. the container's storage must contain t...
Definition named_parameters.hpp:203
auto recv_displs(Container &&container)
Passes a container as receive displacements to the underlying call, i.e. the container's storage must...
Definition named_parameters.hpp:697
auto recv_count(int count)
Passes count as recv count to the underlying call.
Definition named_parameters.hpp:490
auto isend(Args... args) const
Definition isend.hpp:80
auto recv(Args... args) const
Definition recv.hpp:83
ParameterType
Each input parameter to one of the MPI calls wrapped by KaMPIng needs to has one of the following tag...
Definition named_parameter_types.hpp:33
@ request
Tag used to represent an MPI_Request.
Template magic to check named parameters passed to wrappers at compile time.
#define KAMPING_REQUIRED_PARAMETERS(...)
Wrapper to pass (possibly empty) list of parameter type names as required parameters to KAMPING_CHECK...
Definition named_parameter_check.hpp:52
#define KAMPING_OPTIONAL_PARAMETERS(...)
Wrapper to pass (possibly empty) list of parameter type names as optional parameters to KAMPING_CHECK...
Definition named_parameter_check.hpp:58
#define KAMPING_CHECK_PARAMETERS(args, required, optional)
Assertion macro that checks if passed parameters are correct, i.e., all parameter types are unique,...
Definition named_parameter_check.hpp:80
Template magic to implement named parameters in cpp.
Factory methods for buffer wrappers.
kamping::internal::min_impl< T > min
builtin minimum operation (aka MPI_MIN)
Definition mpi_ops.hpp:149
tag type to indicate that the value_type should be inferred from the container
Definition data_buffer.hpp:320
Helper class for using CRTP for mixins. Which are used to implement kamping plugins.
Definition plugin_helpers.hpp:32
Encapsulates a single intermediate result (value) and its index.
Definition reproducible_reduce.hpp:27
T value
Intermediate value during calculation.
Definition reproducible_reduce.hpp:31
size_t index
Global index according to reduction order.
Definition reproducible_reduce.hpp:29