3#include "../cudaflow.hpp"
16enum class cudaMergeBoundType {
22template<
typename T,
unsigned N>
25 cudaArray<unsigned, N> indices;
29struct cudaMergeRange {
30 unsigned a_begin, a_end, b_begin, b_end;
32 __device__
unsigned a_count()
const {
return a_end - a_begin; }
33 __device__
unsigned b_count()
const {
return b_end - b_begin; }
34 __device__
unsigned total()
const {
return a_count() + b_count(); }
36 __device__ cudaRange a_range()
const {
37 return cudaRange { a_begin, a_end };
39 __device__ cudaRange b_range()
const {
40 return cudaRange { b_begin, b_end };
43 __device__ cudaMergeRange to_local()
const {
44 return cudaMergeRange { 0, a_count(), a_count(), total() };
48 __device__ cudaMergeRange
partition(
unsigned mp0,
unsigned diag)
const {
49 return cudaMergeRange { a_begin + mp0, a_end, b_begin + diag - mp0, b_end };
53 __device__ cudaMergeRange
partition(
unsigned mp0,
unsigned diag0,
54 unsigned mp1,
unsigned diag1)
const {
55 return cudaMergeRange {
58 b_begin + diag0 - mp0,
63 __device__
bool a_valid()
const {
64 return a_begin < a_end;
66 __device__
bool b_valid()
const {
67 return b_begin < b_end;
73 cudaMergeBoundType bounds = cudaMergeBoundType::LOWER,
74 typename a_keys_it,
typename b_keys_it,
typename comp_t
76__device__
auto cuda_merge_path(
77 a_keys_it a_keys,
unsigned a_count,
78 b_keys_it b_keys,
unsigned b_count,
79 unsigned diag, comp_t comp
82 unsigned beg = (diag > b_count) ? diag - b_count : 0;
83 unsigned end = diag < a_count ? diag : a_count;
86 auto mid = (beg +
end) / 2;
87 auto a_key = a_keys[mid];
88 auto b_key = b_keys[diag - 1 - mid];
89 bool pred = (cudaMergeBoundType::UPPER == bounds) ?
93 if(pred) beg = mid + 1;
100template<cudaMergeBoundType bounds,
typename keys_it,
typename comp_t>
101__device__
auto cuda_merge_path(
102 keys_it keys, cudaMergeRange range,
unsigned diag, comp_t comp
105 return cuda_merge_path<bounds>(
106 keys + range.a_begin, range.a_count(),
107 keys + range.b_begin, range.b_count(),
112template<cudaMergeBoundType bounds,
bool range_check,
typename T,
typename comp_t>
113__device__
bool cuda_merge_predicate(
114 T a_key, T b_key, cudaMergeRange range, comp_t comp
118 if(range_check && !range.a_valid()) {
121 else if(range_check && !range.b_valid()) {
125 p = (cudaMergeBoundType::UPPER == bounds) ? comp(a_key, b_key) :
132inline __device__
auto cuda_compute_merge_range(
133 unsigned a_count,
unsigned b_count,
134 unsigned partition,
unsigned spacing,
135 unsigned mp0,
unsigned mp1
139 auto diag1 =
min(a_count + b_count, diag0 + spacing);
141 return cudaMergeRange { mp0, mp1, diag0 - mp0, diag1 - mp1 };
151template<
unsigned nt,
unsigned vt,
typename T>
152__device__
auto cuda_load_two_streams_reg(
153 const T* a,
unsigned a_count,
const T* b,
unsigned b_count,
unsigned tid
158 cuda_strided_iterate<nt, vt>([&](
auto i,
auto index) {
159 const T* p = (index >= a_count) ? b : a;
161 }, tid, a_count + b_count);
167template<
unsigned nt,
unsigned vt,
typename T,
typename a_it,
typename b_it>
172> load_two_streams_reg(a_it a,
unsigned a_count, b_it b,
unsigned b_count,
unsigned tid) {
175 cuda_strided_iterate<nt, vt>([&](
auto i,
auto index) {
176 x[i] = (index < a_count) ? a[index] : b[index];
177 }, tid, a_count + b_count);
182template<
unsigned nt,
unsigned vt,
typename A,
typename B,
typename T,
unsigned S>
183__device__
void cuda_load_two_streams_shared(A a,
unsigned a_count,
184 B b,
unsigned b_count,
unsigned tid, T (&shared)[S],
bool sync =
true
187 auto x = cuda_load_two_streams_reg<nt, vt, T>(a, a_count, b, b_count, tid);
188 cuda_reg_to_shared_strided<nt>(x, tid, shared, sync);
192template<
unsigned nt,
unsigned vt,
typename T>
193__device__
auto cuda_gather_two_streams_strided(
const T* a,
194 unsigned a_count,
const T* b,
unsigned b_count, cudaArray<unsigned, vt> indices,
197 ptrdiff_t b_offset = b - a - a_count;
198 auto count = a_count + b_count;
201 cuda_strided_iterate<nt, vt>([&](
auto i,
auto j) {
202 ptrdiff_t gather = indices[i];
203 if(gather >= a_count) gather += b_offset;
211template<
unsigned nt,
unsigned vt,
typename T,
typename a_it,
typename b_it>
216> cuda_gather_two_streams_strided(a_it a,
217 unsigned a_count, b_it b,
unsigned b_count, cudaArray<unsigned, vt> indices,
unsigned tid) {
221 cuda_strided_iterate<nt, vt>([&](
auto i,
auto j) {
222 x[i] = (indices[i] < a_count) ? a[indices[i]] : b[indices[i]];
223 }, tid, a_count + b_count);
229template<
unsigned nt,
unsigned vt,
typename a_it,
typename b_it,
typename c_it>
230__device__
void cuda_transfer_two_streams_strided(
231 a_it a,
unsigned a_count, b_it b,
unsigned b_count,
232 cudaArray<unsigned, vt> indices,
unsigned tid, c_it c
236 auto x = cuda_gather_two_streams_strided<nt, vt, T>(
237 a, a_count, b, b_count, indices, tid
240 cuda_reg_to_mem_strided<nt>(x, tid, a_count + b_count, c);
251template<cudaMergeBoundType bounds,
unsigned vt,
typename T,
typename comp_t>
252__device__
auto cuda_serial_merge(
253 const T* keys_shared, cudaMergeRange range, comp_t comp,
bool sync =
true
256 auto a_key = keys_shared[range.a_begin];
257 auto b_key = keys_shared[range.b_begin];
259 cudaMergePair<T, vt> merge_pair;
260 cuda_iterate<vt>([&](
auto i) {
261 bool p = cuda_merge_predicate<bounds, true>(a_key, b_key, range, comp);
262 auto index = p ? range.a_begin : range.b_begin;
264 merge_pair.keys[i] = p ? a_key : b_key;
265 merge_pair.indices[i] = index;
267 T c_key = keys_shared[++index];
268 if(p) a_key = c_key, range.a_begin = index;
269 else b_key = c_key, range.b_begin = index;
272 if(sync) __syncthreads();
281template<cudaMergeBoundType bounds,
282 unsigned nt,
unsigned vt,
283 typename a_it,
typename b_it,
typename T,
typename comp_t,
unsigned S
285__device__
auto block_merge_from_mem(
286 a_it a, b_it b, cudaMergeRange range_mem,
unsigned tid, comp_t comp, T (&keys_shared)[S]
289 static_assert(S >= nt * vt + 1,
290 "block_merge_from_mem requires temporary storage of at "
291 "least nt * vt + 1 items");
294 cuda_load_two_streams_shared<nt, vt>(
295 a + range_mem.a_begin, range_mem.a_count(),
296 b + range_mem.b_begin, range_mem.b_count(),
297 tid, keys_shared,
true
301 auto range_local = range_mem.to_local();
302 auto diag = vt * tid;
303 auto mp = cuda_merge_path<bounds>(keys_shared, range_local, diag, comp);
308 auto merged = cuda_serial_merge<bounds, vt>(
309 keys_shared, range_local.partition(mp, diag), comp
316template<cudaMergeBoundType bounds,
317 typename P,
typename a_keys_it,
typename b_keys_it,
typename comp_t
319void cuda_merge_path_partitions(
321 a_keys_it a,
unsigned a_count,
322 b_keys_it b,
unsigned b_count,
330 unsigned num_partitions = (a_count + b_count + spacing - 1) / spacing + 1;
332 const unsigned nt = 128;
333 const unsigned vt = 1;
334 const unsigned nv = nt * vt;
336 unsigned B = (num_partitions + nv - 1) / nv;
338 cuda_kernel<<<B, nt, 0, p.stream()>>>([=]__device__(
auto tid,
auto bid) {
339 auto range = cuda_get_tile(bid, nt * vt, num_partitions);
340 cuda_strided_iterate<nt, vt>([=](
auto,
auto j) {
341 auto index = range.begin + j;
342 auto diag =
min(spacing * index, a_count + b_count);
343 buf[index] = cuda_merge_path<bounds>(a, a_count, b, b_count, diag, comp);
344 }, tid, range.count());
375 typename a_keys_it,
typename a_vals_it,
376 typename b_keys_it,
typename b_vals_it,
377 typename c_keys_it,
typename c_vals_it,
382 a_keys_it a_keys, a_vals_it a_vals,
unsigned a_count,
383 b_keys_it b_keys, b_vals_it b_vals,
unsigned b_count,
384 c_keys_it c_keys, c_vals_it c_vals,
393 auto buf =
static_cast<unsigned*
>(ptr);
397 cuda_merge_path_partitions<cudaMergeBoundType::LOWER>(
398 p, a_keys, a_count, b_keys, b_count, E::nv, comp, buf
401 unsigned B = (a_count + b_count + E::nv - 1)/ E::nv;
404 cuda_kernel<<<B, E::nt, 0, p.stream()>>>([=] __device__ (
auto tid,
auto bid) {
408 unsigned indices[E::nv];
412 auto mp0 = buf[bid + 0];
413 auto mp1 = buf[bid + 1];
414 auto range = cuda_compute_merge_range(a_count, b_count, bid, E::nv, mp0, mp1);
416 auto merge = block_merge_from_mem<cudaMergeBoundType::LOWER, E::nt, E::vt>(
417 a_keys, b_keys, range, tid, comp, shared.keys
420 auto dest_offset = E::nv * bid;
421 cuda_reg_to_mem_thread<E::nt>(
422 merge.keys, tid, range.total(), c_keys + dest_offset, shared.keys
427 auto indices = cuda_reg_thread_to_strided<E::nt>(
428 merge.indices, tid, shared.indices
432 cuda_transfer_two_streams_strided<E::nt>(
433 a_vals + range.a_begin, range.a_count(),
434 b_vals + range.b_begin, range.b_count(), indices, tid,
462 unsigned sz = (a_count + b_count + E::nv - 1) / E::nv + 1;
463 return sz*
sizeof(unsigned);
555 typename a_keys_it,
typename a_vals_it,
556 typename b_keys_it,
typename b_vals_it,
557 typename c_keys_it,
typename c_vals_it,
562 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
563 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
564 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp,
571 if(a_count + b_count == 0) {
575 detail::cuda_merge_loop(p,
576 a_keys_first, a_vals_first, a_count,
577 b_keys_first, b_vals_first, b_count,
578 c_keys_first, c_vals_first, comp,
627 typename a_keys_it,
typename b_keys_it,
typename c_keys_it,
typename C
631 a_keys_it a_keys_first, a_keys_it a_keys_last,
632 b_keys_it b_keys_first, b_keys_it b_keys_last,
633 c_keys_it c_keys_first,
639 a_keys_first, a_keys_last, (
const cudaEmpty*)
nullptr,
640 b_keys_first, b_keys_last, (
const cudaEmpty*)
nullptr,
641 c_keys_first, (cudaEmpty*)
nullptr, comp,
651template<
typename A,
typename B,
typename C,
typename Comp>
653 A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
657 cap.
merge(a_first, a_last, b_first, b_last, c_first, comp);
662template<
typename A,
typename B,
typename C,
typename Comp>
664 cudaTask task, A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
668 cap.
merge(a_first, a_last, b_first, b_last, c_first, comp);
674 typename a_keys_it,
typename a_vals_it,
675 typename b_keys_it,
typename b_vals_it,
676 typename c_keys_it,
typename c_vals_it,
680 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
681 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
682 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
687 a_keys_first, a_keys_last, a_vals_first,
688 b_keys_first, b_keys_last, b_vals_first,
689 c_keys_first, c_vals_first,
697 typename a_keys_it,
typename a_vals_it,
698 typename b_keys_it,
typename b_vals_it,
699 typename c_keys_it,
typename c_vals_it,
704 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
705 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
706 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
711 a_keys_first, a_keys_last, a_vals_first,
712 b_keys_first, b_keys_last, b_vals_first,
713 c_keys_first, c_vals_first,
724template<
typename A,
typename B,
typename C,
typename Comp>
726 A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
729 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
733 return on([=, buf=MoC{cudaDeviceVector<std::byte>(bufsz)}]
734 (cudaStream_t stream)
mutable {
736 a_first, a_last, b_first, b_last, c_first, comp, buf.get().data()
742template<
typename A,
typename B,
typename C,
typename Comp>
744 cudaTask task, A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
747 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
751 on(task, [=, buf=MoC{cudaDeviceVector<std::byte>(bufsz)}]
752 (cudaStream_t stream)
mutable {
754 a_first, a_last, b_first, b_last, c_first, comp, buf.get().data()
761 typename a_keys_it,
typename a_vals_it,
762 typename b_keys_it,
typename b_vals_it,
763 typename c_keys_it,
typename c_vals_it,
767 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
768 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
769 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
772 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
777 return on([=, buf=MoC{cudaDeviceVector<std::byte>(bufsz)}]
778 (cudaStream_t stream)
mutable {
780 a_keys_first, a_keys_last, a_vals_first,
781 b_keys_first, b_keys_last, b_vals_first,
782 c_keys_first, c_vals_first,
791 typename a_keys_it,
typename a_vals_it,
792 typename b_keys_it,
typename b_vals_it,
793 typename c_keys_it,
typename c_vals_it,
798 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
799 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
800 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
803 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
808 on(task, [=, buf=MoC{cudaDeviceVector<std::byte>(bufsz)}]
809 (cudaStream_t stream)
mutable {
811 a_keys_first, a_keys_last, a_vals_first,
812 b_keys_first, b_keys_last, b_vals_first,
813 c_keys_first, c_vals_first,
class to define execution policy for CUDA standard algorithms
Definition cuda_execution_policy.hpp:29
class to create a cudaFlow graph using stream capture
Definition cuda_capturer.hpp:57
cudaTask merge_by_key(a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first, b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first, c_keys_it c_keys_first, c_vals_it c_vals_first, C comp)
captures kernels that perform parallel key-value merge
Definition merge.hpp:766
OPT & make_optimizer(ArgsT &&... args)
selects a different optimization algorithm
Definition cuda_capturer.hpp:1312
cudaTask merge(A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp)
captures kernels that perform parallel merge on two sorted arrays
Definition merge.hpp:725
cudaTask on(C &&callable)
captures a sequential CUDA operations from the given callable
Definition cuda_capturer.hpp:1105
cudaTask capture(C &&callable)
constructs a subflow graph through tf::cudaFlowCapturer
Definition cudaflow.hpp:1582
cudaTask merge_by_key(a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first, b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first, c_keys_it c_keys_first, c_vals_it c_vals_first, C comp)
creates a task to perform parallel key-value merge
Definition merge.hpp:679
cudaTask merge(A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp)
creates a task to perform parallel merge on two sorted arrays
Definition merge.hpp:652
class to capture a linear CUDA graph using a sequential stream
Definition cuda_optimizer.hpp:182
class to create a task handle over an internal node of a cudaFlow graph
Definition cuda_task.hpp:65
taskflow namespace
Definition small_vector.hpp:27
void cuda_merge(P &&p, a_keys_it a_keys_first, a_keys_it a_keys_last, b_keys_it b_keys_first, b_keys_it b_keys_last, c_keys_it c_keys_first, C comp, void *buf)
performs asynchronous key-only merge over a range of keys
Definition merge.hpp:629
unsigned cuda_merge_buffer_size(unsigned a_count, unsigned b_count)
queries the buffer size in bytes needed to call merge kernels
Definition merge.hpp:460
void cuda_merge_by_key(P &&p, a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first, b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first, c_keys_it c_keys_first, c_vals_it c_vals_first, C comp, void *buf)
performs asynchronous key-value merge over a range of keys and values
Definition merge.hpp:560