APIs
ClimaComms.ClimaComms
— ModuleClimaComms
Abstracts the communications interface for the various CliMA components in order to:
- support different computational backends (CPUs, GPUs)
- enable the use of different backends as transports (MPI, SharedArrays, etc.), and
- transparently support single or double buffering for GPUs, depending on whether the transport has the ability to access GPU memory.
Loading
ClimaComms.@import_required_backends
— MacroClimaComms.@import_required_backends
If the desired context is MPI (as determined by ClimaComms.context()
), try loading MPI.jl. If the desired device is CUDA (as determined by ClimaComms.device()
), try loading CUDA.jl.
ClimaComms.cuda_is_required
— Functioncuda_is_required()
Returns a Bool indicating if CUDA should be loaded, based on the ENV["CLIMACOMMS_DEVICE"]
. See ClimaComms.device
for more information.
cuda_is_required() && using CUDA
ClimaComms.mpi_is_required
— Functionmpi_is_required()
Returns a Bool indicating if MPI should be loaded, based on the ENV["CLIMACOMMS_CONTEXT"]
. See ClimaComms.context
for more information.
mpi_is_required() && using MPI
Devices
ClimaComms.AbstractDevice
— TypeAbstractDevice
The base type for a device.
ClimaComms.AbstractCPUDevice
— TypeAbstractCPUDevice()
Abstract device type for single-threaded and multi-threaded CPU runs.
ClimaComms.CPUSingleThreaded
— TypeCPUSingleThreaded()
Use the CPU with single thread.
ClimaComms.CPUMultiThreaded
— TypeCPUMultiThreaded()
Use the CPU with multiple thread.
ClimaComms.CUDADevice
— TypeCUDADevice()
Use NVIDIA GPU accelarator
ClimaComms.device
— FunctionClimaComms.device()
Determine the device to use depending on the CLIMACOMMS_DEVICE
environment variable.
Allowed values:
CPU
, single-threaded or multi-threaded depending on the number of threads;CPUSingleThreaded
,CPUMultiThreaded
,CUDA
.
The default is CPU
.
ClimaComms.device_functional
— FunctionClimaComms.device_functional(device)
Return true when the device
is correctly set up.
ClimaComms.array_type
— FunctionClimaComms.array_type(::AbstractDevice)
The base array type used by the specified device (currently Array
or CuArray
).
ClimaComms.allowscalar
— Functionallowscalar(f, ::AbstractDevice, args...; kwargs...)
Device-flexible version of CUDA.@allowscalar
.
Lowers to
f(args...)
for CPU devices and
CUDA.@allowscalar f(args...)
for CUDA devices.
This is usefully written with closures via
allowscalar(device) do
f()
end
ClimaComms.@time
— Macro@time device expr
Device-flexible @time
.
Lowers to
@time expr
for CPU devices and
CUDA.@time expr
for CUDA devices.
ClimaComms.@elapsed
— Macro@elapsed device expr
Device-flexible @elapsed
.
Lowers to
@elapsed expr
for CPU devices and
CUDA.@elapsed expr
for CUDA devices.
ClimaComms.@assert
— Macro@assert device cond [text]
Device-flexible @assert
.
Lowers to
@assert cond [text]
for CPU devices and
CUDA.@cuassert cond [text]
for CUDA devices.
ClimaComms.@sync
— Macro@sync device expr
Device-flexible @sync
.
Lowers to
@sync expr
for CPU devices and
CUDA.@sync expr
for CUDA devices.
An example use-case of this might be:
BenchmarkTools.@benchmark begin
if ClimaComms.device() isa ClimaComms.CUDADevice
CUDA.@sync begin
launch_cuda_kernels_or_spawn_tasks!(...)
end
elseif ClimaComms.device() isa ClimaComms.CPUMultiThreading
Base.@sync begin
launch_cuda_kernels_or_spawn_tasks!(...)
end
end
end
If the CPU version of the above example does not leverage spawned tasks (which require using Base.sync
or Threads.wait
to synchronize), then you may want to simply use @cuda_sync
.
ClimaComms.@cuda_sync
— Macro@cuda_sync device expr
Device-flexible CUDA.@sync
.
Lowers to
expr
for CPU devices and
CUDA.@sync expr
for CUDA devices.
Adapt.adapt_structure
— MethodAdapt.adapt_structure(::Type{<:AbstractArray}, device::AbstractDevice)
Adapt a given device to a device associated with the given array type.
Example
Adapt.adapt_structure(Array, ClimaComms.CUDADevice()) -> ClimaComms.CPUSingleThreaded()
By default, adapting to Array
creates a CPUSingleThreaded
device, and there is currently no way to conver to a CPUMultiThreaded device.
ClimaComms.@threaded
— Macro@threaded [device] [coarsen=...] [block_size=...] for ... end
Device-flexible generalization of Threads.@threads
, which distributes the iterations of a for-loop across multiple threads, with the option to control thread coarsening and GPU kernel configuration. Coarsening makes each thread evaluate more than one iteration of the loop, which can improve performance by reducing the runtime overhead of launching additional threads (though too much coarsening worsens performance because it reduces parallelization). The device
is either inferred by calling ClimaComms.device()
, or it can be specified manually, with the following device-dependent behavior:
When
device
is aCPUSingleThreaded()
, the loop is evaluated as-is. This avoids the runtime overhead of callingThreads.@threads
with a single thread, and, when the device type is statically inferrable, it also avoids compilation overhead.When
device
is aCPUMultiThreaded()
, the loop is passed toThreads.@threads
. This supports three different kinds of "schedulers" for determining how many iterations of the loop to evaluate in each thread:- (default) a "dynamic" scheduler that changes the number of iterations as new threads are launched,
- a "static" scheduler that evaluates a fixed number of iterations per thread, and
- a "greedy" scheduler that uses a small number of threads, continuously evaluating iterations in each thread until the loop is completed (only available as of Julia 1.11).
Setting
coarsen
to:dynamic
or:greedy
launches threads with those schedulers. Setting it to:static
or an integer value launches threads with static scheduling (using:static
is similar to using1
, but slightly more performant). To read more about multi-threading, see the documentation forThreads.@threads
.When
device
is aCUDADevice()
, the loop is compiled withCUDA.@cuda
and run withCUDA.@sync
. Since CUDA launches all threads at the same time, only static scheduling can be used. Settingcoarsen
to any symbol causes each thread to evaluate a single iteration (default), and setting it to an integer value causes each thread to evaluate that number of iterations (the default is similar to using1
, but slightly more performant). If the total number of iterations in the loop is extremely large, the specified coarsening may require more threads than can be simultaneously launched on the GPU, in which case the amount of coarsening is automatically increased.The optional argument
block_size
is also available for manually specifying the size of each block on a GPU. The default value of:auto
sets the number of threads in each block to the largest possible value that permits a high GPU "occupancy" (the number of active thread warps in each multiprocessor executing the kernel). An integer can be used instead of:auto
to override this default value. If the specified value exceeds the total number of threads, it is automatically decreased to avoid idle threads.
Any iterator with methods for firstindex
, length
, and getindex
can be used in a @threaded
loop. All lazy iterators from Base
and Base.Iterators
, such as zip
, enumerate
, Iterators.product
, and generator expressions, are also compatible with @threaded
. (Although these iterators do not define methods for getindex
, they are automatically modified by threadable
to support getindex
.) Using multiple iterators with @threaded
is equivalent to looping over a single Iterators.product
, with the innermost iterator of the loop appearing first in the product, and the outermost iterator appearing last.
NOTE: When a value in the body of the loop has a type that cannot be inferred by the compiler, an InvalidIRError
will be thrown during compilation for a CUDADevice()
. In particular, global variables are not inferrable, so @threaded
must be wrapped in a function whenever it is used in the REPL:
julia> a = CUDA.CuArray{Int}(undef, 100); b = similar(a);
julia> threaded_copyto!(a, b) = ClimaComms.@threaded for i in axes(a, 1)
a[i] = b[i]
end
threaded_copyto! (generic function with 1 method)
julia> threaded_copyto!(a, b)
julia> ClimaComms.@threaded for i in axes(a, 1)
a[i] = b[i]
end
ERROR: InvalidIRError: ...
Moreover, type variables are not inferrable across function boundaries, so types used in a threaded loop cannot be precomputed before the loop:
julia> threaded_add_epsilon!(a) = ClimaComms.@threaded for i in axes(a, 1)
FT = eltype(a)
a[i] += eps(FT)
end
threaded_add_epsilon! (generic function with 1 method)
julia> threaded_add_epsilon!(a)
julia> function threaded_add_epsilon!(a)
FT = eltype(a)
ClimaComms.@threaded for i in axes(a, 1)
a[i] += eps(FT)
end
end
threaded_add_epsilon! (generic function with 1 method)
julia> threaded_add_epsilon!(a)
ERROR: InvalidIRError: ...
To fix other kinds of inference issues on GPUs, especially ones brought about by indexing into iterators with nonuniform element types, see UnrolledUtilities.jl
.
ClimaComms.threaded
— Functionthreaded(f, device, itrs...; kwargs...)
Functional form of @threaded
. If there are n
iterators and f
is a function of n
arguments, the threaded
function is similar to
@threaded device [kwargs...] for xₙ in itrs[n], ..., x₂ in itrs[2], x₁ in itrs[1]
f(x₁, x₂, ..., xₙ)
end
On single-threaded CPU devices, the @threaded
macro inlines the for-loop without any intermediate function calls, so that it has a lower latency than the threaded
function. On other devices, the only difference between the macro and the function is that keyword argument symbols like :dynamic
and :auto
must be wrapped in Val
s for the function.
ClimaComms.threadable
— Functionthreadable(device, itr)
Modifies an iterator to ensure that it can be used in a @threaded
loop. This will typically return itr
or a ThreadableWrapper
of itr
.
ClimaComms.ThreadableWrapper
— TypeThreadableWrapper
Wrapper for an iterator from Base
or Iterators
that can be used in @threaded
, with methods for firstindex
, length
, and getindex
. The getindex
method only supports linear indices between firstindex
and firstindex + length - 1
. For the ThreadableWrapper
of Iterators.product
, getindex
converts each linear index to a Cartesian index using regular integer division on CPUs and Base.multiplicativeinverse
on GPUs.
Contexts
ClimaComms.AbstractCommsContext
— TypeAbstractCommsContext
The base type for a communications context. Each backend defines a concrete subtype of this.
ClimaComms.SingletonCommsContext
— TypeSingletonCommsContext(device=device())
A singleton communications context, used for single-process runs. ClimaComms.AbstractCPUDevice
and ClimaComms.CUDADevice
device options are currently supported.
ClimaComms.MPICommsContext
— TypeMPICommsContext()
MPICommsContext(device)
MPICommsContext(device, comm)
A MPI communications context, used for distributed runs. AbstractCPUDevice
and CUDADevice
device options are currently supported.
ClimaComms.AbstractGraphContext
— TypeAbstractGraphContext
A context for communicating between processes in a graph.
ClimaComms.context
— FunctionClimaComms.context(device=device())
Construct a default communication context.
By default, it will try to determine if it is running inside an MPI environment variables are set; if so it will return a MPICommsContext
; otherwise it will return a SingletonCommsContext
.
Behavior can be overridden by setting the CLIMACOMMS_CONTEXT
environment variable to either MPI
or SINGLETON
.
ClimaComms.graph_context
— Functiongraph_context(context::AbstractCommsContext,
sendarray, sendlengths, sendpids,
recvarray, recvlengths, recvpids)
Construct a communication context for exchanging neighbor data via a graph.
Arguments:
context
: the communication context on which to construct the graph context.sendarray
: array containing data to sendsendlengths
: list of lengths of data to send to each process IDsendpids
: list of processor IDs to sendrecvarray
: array to receive data intorecvlengths
: list of lengths of data to receive from each process IDrecvpids
: list of processor IDs to receive from
This should return an AbstractGraphContext
object.
Adapt.adapt_structure
— MethodAdapt.adapt_structure(::Type{<:AbstractArray}, context::AbstractCommsContext)
Adapt a given context to a context with a device associated with the given array type.
Example
Adapt.adapt_structure(Array, ClimaComms.context(ClimaComms.CUDADevice())) -> ClimaComms.CPUSingleThreaded()
By default, adapting to Array
creates a CPUSingleThreaded
device, and there is currently no way to conver to a CPUMultiThreaded device.
Logging
ClimaComms.OnlyRootLogger
— FunctionOnlyRootLogger()
OnlyRootLogger(ctx::AbstractCommsContext)
Return a logger that silences non-root processes.
If no context is passed, obtain the default context via context
.
ClimaComms.MPILogger
— FunctionMPILogger(context::AbstractCommsContext)
MPILogger(iostream, context)
Add a rank prefix before log messages.
Outputs to stdout
if no IOStream is given.
ClimaComms.FileLogger
— FunctionFileLogger(context, log_dir; log_stdout = true, min_level = Logging.Info)
Log MPI ranks to different files within the log_dir
.
The minimum logging level is set using min_level
. If log_stdout = true
, root process logs will be sent to stdout as well.
Context operations
ClimaComms.init
— Function(pid, nprocs) = init(ctx::AbstractCommsContext)
Perform any necessary initialization for the specified backend. Return a tuple of the processor ID and the number of participating processors.
ClimaComms.mypid
— Functionmypid(ctx::AbstractCommsContext)
Return the processor ID.
ClimaComms.iamroot
— Functioniamroot(ctx::AbstractCommsContext)
Return true
if the calling processor is the root processor.
ClimaComms.nprocs
— Functionnprocs(ctx::AbstractCommsContext)
Return the number of participating processors.
ClimaComms.abort
— Functionabort(ctx::CC, status::Int) where {CC <: AbstractCommsContext}
Terminate the caller and all participating processors with the specified status
.
Collective operations
ClimaComms.barrier
— Functionbarrier(ctx::CC) where {CC <: AbstractCommsContext}
Perform a global synchronization across all participating processors.
ClimaComms.reduce
— Functionreduce(ctx::CC, val, op) where {CC <: AbstractCommsContext}
Perform a reduction across all participating processors, using op
as the reduction operator and val
as this rank's reduction value. Return the result to the first processor only.
ClimaComms.reduce!
— Functionreduce!(ctx::CC, sendbuf, recvbuf, op)
reduce!(ctx::CC, sendrecvbuf, op)
Performs elementwise reduction using the operator op
on the buffer sendbuf
, storing the result in the recvbuf
of the process. If only one sendrecvbuf
buffer is provided, then the operation is performed in-place.
ClimaComms.allreduce
— Functionallreduce(ctx::CC, sendbuf, op)
Performs elementwise reduction using the operator op
on the buffer sendbuf
, allocating a new array for the result. sendbuf
can also be a scalar, in which case recvbuf
will be a value of the same type.
ClimaComms.allreduce!
— Functionallreduce!(ctx::CC, sendbuf, recvbuf, op)
allreduce!(ctx::CC, sendrecvbuf, op)
Performs elementwise reduction using the operator op
on the buffer sendbuf
, storing the result in the recvbuf
of all processes in the group. Allreduce!
is equivalent to a Reduce!
operation followed by a Bcast!
, but can lead to better performance. If only one sendrecvbuf
buffer is provided, then the operation is performed in-place.
ClimaComms.bcast
— Functionbcast(ctx::AbstractCommsContext, object)
Broadcast object
from the root process to all other processes. The value of object
on non-root processes is ignored.
Graph exchange
ClimaComms.start
— Functionstart(ctx::AbstractGraphContext)
Initiate graph data exchange.
ClimaComms.progress
— Functionprogress(ctx::AbstractGraphContext)
Drive communication. Call after start
to ensure that communication proceeds asynchronously.
ClimaComms.finish
— Functionfinish(ctx::AbstractGraphContext)
Complete the communications step begun by start()
. After this returns, data received from all neighbors will be available in the stage areas of each neighbor's receive buffer.