diff --git a/compiler/rustc_ast_lowering/src/expr.rs b/compiler/rustc_ast_lowering/src/expr.rs
index 2241b4cd8e5..9990d526bf7 100644
--- a/compiler/rustc_ast_lowering/src/expr.rs
+++ b/compiler/rustc_ast_lowering/src/expr.rs
@@ -1,3 +1,5 @@
+use std::assert_matches::assert_matches;
+
 use super::errors::{
     AsyncCoroutinesNotSupported, AwaitOnlyInAsyncFnAndBlocks, BaseExpressionDoubleDot,
     ClosureCannotBeStatic, CoroutineTooManyParameters,
@@ -1027,6 +1029,12 @@ impl<'hir> LoweringContext<'_, 'hir> {
     ) -> hir::ExprKind<'hir> {
         let (binder_clause, generic_params) = self.lower_closure_binder(binder);
 
+        assert_matches!(
+            coroutine_kind,
+            CoroutineKind::Async { .. },
+            "only async closures are supported currently"
+        );
+
         let body = self.with_new_scopes(fn_decl_span, |this| {
             let inner_decl =
                 FnDecl { inputs: decl.inputs.clone(), output: FnRetTy::Default(fn_decl_span) };
diff --git a/compiler/rustc_borrowck/src/diagnostics/region_name.rs b/compiler/rustc_borrowck/src/diagnostics/region_name.rs
index fc52306e10d..c91b8eaab05 100644
--- a/compiler/rustc_borrowck/src/diagnostics/region_name.rs
+++ b/compiler/rustc_borrowck/src/diagnostics/region_name.rs
@@ -324,9 +324,13 @@ impl<'tcx> MirBorrowckCtxt<'_, 'tcx> {
                 ty::BoundRegionKind::BrEnv => {
                     let def_ty = self.regioncx.universal_regions().defining_ty;
 
-                    let DefiningTy::Closure(_, args) = def_ty else {
-                        // Can't have BrEnv in functions, constants or coroutines.
-                        bug!("BrEnv outside of closure.");
+                    let closure_kind = match def_ty {
+                        DefiningTy::Closure(_, args) => args.as_closure().kind(),
+                        DefiningTy::CoroutineClosure(_, args) => args.as_coroutine_closure().kind(),
+                        _ => {
+                            // Can't have BrEnv in functions, constants or coroutines.
+                            bug!("BrEnv outside of closure.");
+                        }
                     };
                     let hir::ExprKind::Closure(&hir::Closure { fn_decl_span, .. }) =
                         tcx.hir().expect_expr(self.mir_hir_id()).kind
@@ -334,21 +338,18 @@ impl<'tcx> MirBorrowckCtxt<'_, 'tcx> {
                         bug!("Closure is not defined by a closure expr");
                     };
                     let region_name = self.synthesize_region_name();
-
-                    let closure_kind_ty = args.as_closure().kind_ty();
-                    let note = match closure_kind_ty.to_opt_closure_kind() {
-                        Some(ty::ClosureKind::Fn) => {
+                    let note = match closure_kind {
+                        ty::ClosureKind::Fn => {
                             "closure implements `Fn`, so references to captured variables \
                              can't escape the closure"
                         }
-                        Some(ty::ClosureKind::FnMut) => {
+                        ty::ClosureKind::FnMut => {
                             "closure implements `FnMut`, so references to captured variables \
                              can't escape the closure"
                         }
-                        Some(ty::ClosureKind::FnOnce) => {
+                        ty::ClosureKind::FnOnce => {
                             bug!("BrEnv in a `FnOnce` closure");
                         }
-                        None => bug!("Closure kind not inferred in borrow check"),
                     };
 
                     Some(RegionName {
diff --git a/compiler/rustc_borrowck/src/lib.rs b/compiler/rustc_borrowck/src/lib.rs
index eaf9fc45837..bb64571889b 100644
--- a/compiler/rustc_borrowck/src/lib.rs
+++ b/compiler/rustc_borrowck/src/lib.rs
@@ -3,6 +3,7 @@
 #![allow(internal_features)]
 #![feature(rustdoc_internals)]
 #![doc(rust_logo)]
+#![feature(assert_matches)]
 #![feature(associated_type_bounds)]
 #![feature(box_patterns)]
 #![feature(let_chains)]
diff --git a/compiler/rustc_borrowck/src/type_check/input_output.rs b/compiler/rustc_borrowck/src/type_check/input_output.rs
index 59518f68ab1..a3e5088ee09 100644
--- a/compiler/rustc_borrowck/src/type_check/input_output.rs
+++ b/compiler/rustc_borrowck/src/type_check/input_output.rs
@@ -7,13 +7,18 @@
 //! `RETURN_PLACE` the MIR arguments) are always fully normalized (and
 //! contain revealed `impl Trait` values).
 
+use std::assert_matches::assert_matches;
+
 use itertools::Itertools;
-use rustc_infer::infer::BoundRegionConversionTime;
+use rustc_hir as hir;
+use rustc_infer::infer::type_variable::{TypeVariableOrigin, TypeVariableOriginKind};
+use rustc_infer::infer::{BoundRegionConversionTime, RegionVariableOrigin};
 use rustc_middle::mir::*;
 use rustc_middle::ty::{self, Ty};
 use rustc_span::Span;
 
-use crate::universal_regions::UniversalRegions;
+use crate::renumber::RegionCtxt;
+use crate::universal_regions::{DefiningTy, UniversalRegions};
 
 use super::{Locations, TypeChecker};
 
@@ -23,9 +28,11 @@ impl<'a, 'tcx> TypeChecker<'a, 'tcx> {
     #[instrument(skip(self, body), level = "debug")]
     pub(super) fn check_signature_annotation(&mut self, body: &Body<'tcx>) {
         let mir_def_id = body.source.def_id().expect_local();
+
         if !self.tcx().is_closure_or_coroutine(mir_def_id.to_def_id()) {
             return;
         }
+
         let user_provided_poly_sig = self.tcx().closure_user_provided_sig(mir_def_id);
 
         // Instantiate the canonicalized variables from user-provided signature
@@ -34,12 +41,69 @@ impl<'a, 'tcx> TypeChecker<'a, 'tcx> {
         // so that they represent the view from "inside" the closure.
         let user_provided_sig = self
             .instantiate_canonical_with_fresh_inference_vars(body.span, &user_provided_poly_sig);
-        let user_provided_sig = self.infcx.instantiate_binder_with_fresh_vars(
+        let mut user_provided_sig = self.infcx.instantiate_binder_with_fresh_vars(
             body.span,
             BoundRegionConversionTime::FnCall,
             user_provided_sig,
         );
 
+        // FIXME(async_closures): We must apply the same transformation to our
+        // signature here as we do during closure checking.
+        if let DefiningTy::CoroutineClosure(_, args) =
+            self.borrowck_context.universal_regions.defining_ty
+        {
+            assert_matches!(
+                self.tcx().coroutine_kind(self.tcx().coroutine_for_closure(mir_def_id)),
+                Some(hir::CoroutineKind::Desugared(
+                    hir::CoroutineDesugaring::Async,
+                    hir::CoroutineSource::Closure
+                )),
+                "this needs to be modified if we're lowering non-async closures"
+            );
+            let args = args.as_coroutine_closure();
+            let tupled_upvars_ty = ty::CoroutineClosureSignature::tupled_upvars_by_closure_kind(
+                self.tcx(),
+                args.kind(),
+                Ty::new_tup(self.tcx(), user_provided_sig.inputs()),
+                args.tupled_upvars_ty(),
+                args.coroutine_captures_by_ref_ty(),
+                self.infcx.next_region_var(RegionVariableOrigin::MiscVariable(body.span), || {
+                    RegionCtxt::Unknown
+                }),
+            );
+
+            let next_ty_var = || {
+                self.infcx.next_ty_var(TypeVariableOrigin {
+                    span: body.span,
+                    kind: TypeVariableOriginKind::MiscVariable,
+                })
+            };
+            let output_ty = Ty::new_coroutine(
+                self.tcx(),
+                self.tcx().coroutine_for_closure(mir_def_id),
+                ty::CoroutineArgs::new(
+                    self.tcx(),
+                    ty::CoroutineArgsParts {
+                        parent_args: args.parent_args(),
+                        resume_ty: next_ty_var(),
+                        yield_ty: next_ty_var(),
+                        witness: next_ty_var(),
+                        return_ty: user_provided_sig.output(),
+                        tupled_upvars_ty: tupled_upvars_ty,
+                    },
+                )
+                .args,
+            );
+
+            user_provided_sig = self.tcx().mk_fn_sig(
+                user_provided_sig.inputs().iter().copied(),
+                output_ty,
+                user_provided_sig.c_variadic,
+                user_provided_sig.unsafety,
+                user_provided_sig.abi,
+            );
+        }
+
         let is_coroutine_with_implicit_resume_ty = self.tcx().is_coroutine(mir_def_id.to_def_id())
             && user_provided_sig.inputs().is_empty();
 
diff --git a/compiler/rustc_borrowck/src/type_check/mod.rs b/compiler/rustc_borrowck/src/type_check/mod.rs
index 90a9844fda9..cdb187e0776 100644
--- a/compiler/rustc_borrowck/src/type_check/mod.rs
+++ b/compiler/rustc_borrowck/src/type_check/mod.rs
@@ -2773,15 +2773,14 @@ impl<'a, 'tcx> TypeChecker<'a, 'tcx> {
         let typeck_root_args = ty::GenericArgs::identity_for_item(tcx, typeck_root_def_id);
 
         let parent_args = match tcx.def_kind(def_id) {
+            // We don't want to dispatch on 3 different kind of closures here, so take
+            // advantage of the fact that the `parent_args` is the same length as the
+            // `typeck_root_args`.
             DefKind::Closure => {
-                // FIXME(async_closures): It's kind of icky to access HIR here.
-                match tcx.hir_node_by_def_id(def_id).expect_closure().kind {
-                    hir::ClosureKind::Closure => args.as_closure().parent_args(),
-                    hir::ClosureKind::Coroutine(_) => args.as_coroutine().parent_args(),
-                    hir::ClosureKind::CoroutineClosure(_) => {
-                        args.as_coroutine_closure().parent_args()
-                    }
-                }
+                // FIXME(async_closures): It may be useful to add a debug assert here
+                // to actually call `type_of` and check the `parent_args` are the same
+                // length as the `typeck_root_args`.
+                &args[..typeck_root_args.len()]
             }
             DefKind::InlineConst => args.as_inline_const().parent_args(),
             other => bug!("unexpected item {:?}", other),
diff --git a/compiler/rustc_borrowck/src/universal_regions.rs b/compiler/rustc_borrowck/src/universal_regions.rs
index ae8a135f090..9c266d123b3 100644
--- a/compiler/rustc_borrowck/src/universal_regions.rs
+++ b/compiler/rustc_borrowck/src/universal_regions.rs
@@ -97,6 +97,10 @@ pub enum DefiningTy<'tcx> {
     /// `ClosureArgs::coroutine_return_ty`.
     Coroutine(DefId, GenericArgsRef<'tcx>),
 
+    /// The MIR is a special kind of closure that returns coroutines.
+    /// TODO: describe how to make the sig...
+    CoroutineClosure(DefId, GenericArgsRef<'tcx>),
+
     /// The MIR is a fn item with the given `DefId` and args. The signature
     /// of the function can be bound then with the `fn_sig` query.
     FnDef(DefId, GenericArgsRef<'tcx>),
@@ -119,6 +123,7 @@ impl<'tcx> DefiningTy<'tcx> {
     pub fn upvar_tys(self) -> &'tcx ty::List<Ty<'tcx>> {
         match self {
             DefiningTy::Closure(_, args) => args.as_closure().upvar_tys(),
+            DefiningTy::CoroutineClosure(_, args) => args.as_coroutine_closure().upvar_tys(),
             DefiningTy::Coroutine(_, args) => args.as_coroutine().upvar_tys(),
             DefiningTy::FnDef(..) | DefiningTy::Const(..) | DefiningTy::InlineConst(..) => {
                 ty::List::empty()
@@ -131,7 +136,9 @@ impl<'tcx> DefiningTy<'tcx> {
     /// user's code.
     pub fn implicit_inputs(self) -> usize {
         match self {
-            DefiningTy::Closure(..) | DefiningTy::Coroutine(..) => 1,
+            DefiningTy::Closure(..)
+            | DefiningTy::CoroutineClosure(..)
+            | DefiningTy::Coroutine(..) => 1,
             DefiningTy::FnDef(..) | DefiningTy::Const(..) | DefiningTy::InlineConst(..) => 0,
         }
     }
@@ -147,6 +154,7 @@ impl<'tcx> DefiningTy<'tcx> {
     pub fn def_id(&self) -> DefId {
         match *self {
             DefiningTy::Closure(def_id, ..)
+            | DefiningTy::CoroutineClosure(def_id, ..)
             | DefiningTy::Coroutine(def_id, ..)
             | DefiningTy::FnDef(def_id, ..)
             | DefiningTy::Const(def_id, ..)
@@ -355,6 +363,9 @@ impl<'tcx> UniversalRegions<'tcx> {
                     err.note(format!("late-bound region is {:?}", self.to_region_vid(r)));
                 });
             }
+            DefiningTy::CoroutineClosure(..) => {
+                todo!()
+            }
             DefiningTy::Coroutine(def_id, args) => {
                 let v = with_no_trimmed_paths!(
                     args[tcx.generics_of(def_id).parent_count..]
@@ -568,6 +579,9 @@ impl<'cx, 'tcx> UniversalRegionsBuilder<'cx, 'tcx> {
                 match *defining_ty.kind() {
                     ty::Closure(def_id, args) => DefiningTy::Closure(def_id, args),
                     ty::Coroutine(def_id, args) => DefiningTy::Coroutine(def_id, args),
+                    ty::CoroutineClosure(def_id, args) => {
+                        DefiningTy::CoroutineClosure(def_id, args)
+                    }
                     ty::FnDef(def_id, args) => DefiningTy::FnDef(def_id, args),
                     _ => span_bug!(
                         tcx.def_span(self.mir_def),
@@ -623,6 +637,7 @@ impl<'cx, 'tcx> UniversalRegionsBuilder<'cx, 'tcx> {
         let identity_args = GenericArgs::identity_for_item(tcx, typeck_root_def_id);
         let fr_args = match defining_ty {
             DefiningTy::Closure(_, args)
+            | DefiningTy::CoroutineClosure(_, args)
             | DefiningTy::Coroutine(_, args)
             | DefiningTy::InlineConst(_, args) => {
                 // In the case of closures, we rely on the fact that
@@ -702,6 +717,47 @@ impl<'cx, 'tcx> UniversalRegionsBuilder<'cx, 'tcx> {
                 ty::Binder::dummy(inputs_and_output)
             }
 
+            DefiningTy::CoroutineClosure(def_id, args) => {
+                assert_eq!(self.mir_def.to_def_id(), def_id);
+                let closure_sig = args.as_coroutine_closure().coroutine_closure_sig();
+                let bound_vars = tcx.mk_bound_variable_kinds_from_iter(
+                    closure_sig
+                        .bound_vars()
+                        .iter()
+                        .chain(iter::once(ty::BoundVariableKind::Region(ty::BrEnv))),
+                );
+                let br = ty::BoundRegion {
+                    var: ty::BoundVar::from_usize(bound_vars.len() - 1),
+                    kind: ty::BrEnv,
+                };
+                let env_region = ty::Region::new_bound(tcx, ty::INNERMOST, br);
+                let closure_kind = args.as_coroutine_closure().kind();
+
+                let closure_ty = tcx.closure_env_ty(
+                    Ty::new_coroutine_closure(tcx, def_id, args),
+                    closure_kind,
+                    env_region,
+                );
+
+                let inputs = closure_sig.skip_binder().tupled_inputs_ty.tuple_fields();
+                let output = closure_sig.skip_binder().to_coroutine_given_kind_and_upvars(
+                    tcx,
+                    args.as_coroutine_closure().parent_args(),
+                    tcx.coroutine_for_closure(def_id),
+                    closure_kind,
+                    env_region,
+                    args.as_coroutine_closure().tupled_upvars_ty(),
+                    args.as_coroutine_closure().coroutine_captures_by_ref_ty(),
+                );
+
+                ty::Binder::bind_with_vars(
+                    tcx.mk_type_list_from_iter(
+                        iter::once(closure_ty).chain(inputs).chain(iter::once(output)),
+                    ),
+                    bound_vars,
+                )
+            }
+
             DefiningTy::FnDef(def_id, _) => {
                 let sig = tcx.fn_sig(def_id).instantiate_identity();
                 let sig = indices.fold_to_region_vids(tcx, sig);
diff --git a/compiler/rustc_const_eval/src/transform/validate.rs b/compiler/rustc_const_eval/src/transform/validate.rs
index d3230a2455d..c4542aaa7b2 100644
--- a/compiler/rustc_const_eval/src/transform/validate.rs
+++ b/compiler/rustc_const_eval/src/transform/validate.rs
@@ -58,6 +58,7 @@ impl<'tcx> MirPass<'tcx> for Validator {
             let body_abi = match body_ty.kind() {
                 ty::FnDef(..) => body_ty.fn_sig(tcx).abi(),
                 ty::Closure(..) => Abi::RustCall,
+                ty::CoroutineClosure(..) => Abi::RustCall,
                 ty::Coroutine(..) => Abi::Rust,
                 _ => {
                     span_bug!(body.span, "unexpected body ty: {:?} phase {:?}", body_ty, mir_phase)
diff --git a/compiler/rustc_hir/src/lang_items.rs b/compiler/rustc_hir/src/lang_items.rs
index 8e327c029df..9c9e72d6e65 100644
--- a/compiler/rustc_hir/src/lang_items.rs
+++ b/compiler/rustc_hir/src/lang_items.rs
@@ -209,6 +209,7 @@ language_item_table! {
     AsyncFn,                 sym::async_fn,            async_fn_trait,             Target::Trait,          GenericRequirement::Exact(1);
     AsyncFnMut,              sym::async_fn_mut,        async_fn_mut_trait,         Target::Trait,          GenericRequirement::Exact(1);
     AsyncFnOnce,             sym::async_fn_once,       async_fn_once_trait,        Target::Trait,          GenericRequirement::Exact(1);
+    AsyncFnKindHelper,       sym::async_fn_kind_helper,async_fn_kind_helper,       Target::Trait,          GenericRequirement::Exact(1);
 
     FnOnceOutput,            sym::fn_once_output,      fn_once_output,             Target::AssocTy,        GenericRequirement::None;
 
diff --git a/compiler/rustc_hir_analysis/src/collect.rs b/compiler/rustc_hir_analysis/src/collect.rs
index 8d862d5eb71..8fdd3cfe3cf 100644
--- a/compiler/rustc_hir_analysis/src/collect.rs
+++ b/compiler/rustc_hir_analysis/src/collect.rs
@@ -81,6 +81,7 @@ pub fn provide(providers: &mut Providers) {
         impl_trait_ref,
         impl_polarity,
         coroutine_kind,
+        coroutine_for_closure,
         collect_mod_item_types,
         is_type_alias_impl_trait,
         ..*providers
@@ -1531,6 +1532,36 @@ fn coroutine_kind(tcx: TyCtxt<'_>, def_id: LocalDefId) -> Option<hir::CoroutineK
     }
 }
 
+fn coroutine_for_closure(tcx: TyCtxt<'_>, def_id: LocalDefId) -> DefId {
+    let Node::Expr(&hir::Expr {
+        kind:
+            hir::ExprKind::Closure(&rustc_hir::Closure {
+                kind: hir::ClosureKind::CoroutineClosure(_),
+                body,
+                ..
+            }),
+        ..
+    }) = tcx.hir_node_by_def_id(def_id)
+    else {
+        bug!()
+    };
+
+    let &hir::Expr {
+        kind:
+            hir::ExprKind::Closure(&rustc_hir::Closure {
+                def_id,
+                kind: hir::ClosureKind::Coroutine(_),
+                ..
+            }),
+        ..
+    } = tcx.hir().body(body).value
+    else {
+        bug!()
+    };
+
+    def_id.to_def_id()
+}
+
 fn is_type_alias_impl_trait<'tcx>(tcx: TyCtxt<'tcx>, def_id: LocalDefId) -> bool {
     match tcx.hir_node_by_def_id(def_id) {
         Node::Item(hir::Item { kind: hir::ItemKind::OpaqueTy(opaque), .. }) => {
diff --git a/compiler/rustc_hir_typeck/src/callee.rs b/compiler/rustc_hir_typeck/src/callee.rs
index 80650dcb53c..1858b2770cd 100644
--- a/compiler/rustc_hir_typeck/src/callee.rs
+++ b/compiler/rustc_hir_typeck/src/callee.rs
@@ -141,32 +141,67 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                 return Some(CallStep::Builtin(adjusted_ty));
             }
 
-            ty::Closure(def_id, args) => {
+            // Check whether this is a call to a closure where we
+            // haven't yet decided on whether the closure is fn vs
+            // fnmut vs fnonce. If so, we have to defer further processing.
+            ty::Closure(def_id, args) if self.closure_kind(adjusted_ty).is_none() => {
                 let def_id = def_id.expect_local();
+                let closure_sig = args.as_closure().sig();
+                let closure_sig = self.instantiate_binder_with_fresh_vars(
+                    call_expr.span,
+                    infer::FnCall,
+                    closure_sig,
+                );
+                let adjustments = self.adjust_steps(autoderef);
+                self.record_deferred_call_resolution(
+                    def_id,
+                    DeferredCallResolution {
+                        call_expr,
+                        callee_expr,
+                        closure_ty: adjusted_ty,
+                        adjustments,
+                        fn_sig: closure_sig,
+                    },
+                );
+                return Some(CallStep::DeferredClosure(def_id, closure_sig));
+            }
 
-                // Check whether this is a call to a closure where we
-                // haven't yet decided on whether the closure is fn vs
-                // fnmut vs fnonce. If so, we have to defer further processing.
-                if self.closure_kind(adjusted_ty).is_none() {
-                    let closure_sig = args.as_closure().sig();
-                    let closure_sig = self.instantiate_binder_with_fresh_vars(
-                        call_expr.span,
-                        infer::FnCall,
-                        closure_sig,
-                    );
-                    let adjustments = self.adjust_steps(autoderef);
-                    self.record_deferred_call_resolution(
-                        def_id,
-                        DeferredCallResolution {
-                            call_expr,
-                            callee_expr,
-                            closure_ty: adjusted_ty,
-                            adjustments,
-                            fn_sig: closure_sig,
-                        },
-                    );
-                    return Some(CallStep::DeferredClosure(def_id, closure_sig));
-                }
+            ty::CoroutineClosure(def_id, args) if self.closure_kind(adjusted_ty).is_none() => {
+                let def_id = def_id.expect_local();
+                let closure_args = args.as_coroutine_closure();
+                let coroutine_closure_sig = self.instantiate_binder_with_fresh_vars(
+                    call_expr.span,
+                    infer::FnCall,
+                    closure_args.coroutine_closure_sig(),
+                );
+                let tupled_upvars_ty = self.next_ty_var(TypeVariableOrigin {
+                    kind: TypeVariableOriginKind::TypeInference,
+                    span: callee_expr.span,
+                });
+                let call_sig = self.tcx.mk_fn_sig(
+                    [coroutine_closure_sig.tupled_inputs_ty],
+                    coroutine_closure_sig.to_coroutine(
+                        self.tcx,
+                        closure_args.parent_args(),
+                        self.tcx.coroutine_for_closure(def_id),
+                        tupled_upvars_ty,
+                    ),
+                    coroutine_closure_sig.c_variadic,
+                    coroutine_closure_sig.unsafety,
+                    coroutine_closure_sig.abi,
+                );
+                let adjustments = self.adjust_steps(autoderef);
+                self.record_deferred_call_resolution(
+                    def_id,
+                    DeferredCallResolution {
+                        call_expr,
+                        callee_expr,
+                        closure_ty: adjusted_ty,
+                        adjustments,
+                        fn_sig: call_sig,
+                    },
+                );
+                return Some(CallStep::DeferredClosure(def_id, call_sig));
             }
 
             // Hack: we know that there are traits implementing Fn for &F
@@ -935,7 +970,7 @@ impl<'a, 'tcx> DeferredCallResolution<'tcx> {
                 span_bug!(
                     self.call_expr.span,
                     "Expected to find a suitable `Fn`/`FnMut`/`FnOnce` implementation for `{}`",
-                    self.adjusted_ty
+                    self.closure_ty
                 )
             }
         }
diff --git a/compiler/rustc_hir_typeck/src/closure.rs b/compiler/rustc_hir_typeck/src/closure.rs
index 18dff5188af..1d024efdd49 100644
--- a/compiler/rustc_hir_typeck/src/closure.rs
+++ b/compiler/rustc_hir_typeck/src/closure.rs
@@ -63,7 +63,7 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
             None => (None, None),
         };
 
-        let ClosureSignatures { bound_sig, liberated_sig } =
+        let ClosureSignatures { bound_sig, mut liberated_sig } =
             self.sig_of_closure(expr_def_id, closure.fn_decl, closure.kind, expected_sig);
 
         debug!(?bound_sig, ?liberated_sig);
@@ -125,7 +125,7 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                     hir::CoroutineKind::Desugared(hir::CoroutineDesugaring::Gen, _)
                     | hir::CoroutineKind::Coroutine(_) => {
                         let yield_ty = self.next_ty_var(TypeVariableOrigin {
-                            kind: TypeVariableOriginKind::TypeInference,
+                            kind: TypeVariableOriginKind::ClosureSynthetic,
                             span: expr_span,
                         });
                         self.require_type_is_sized(yield_ty, expr_span, traits::SizedYieldType);
@@ -137,7 +137,7 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                     // not a problem.
                     hir::CoroutineKind::Desugared(hir::CoroutineDesugaring::AsyncGen, _) => {
                         let yield_ty = self.next_ty_var(TypeVariableOrigin {
-                            kind: TypeVariableOriginKind::TypeInference,
+                            kind: TypeVariableOriginKind::ClosureSynthetic,
                             span: expr_span,
                         });
                         self.require_type_is_sized(yield_ty, expr_span, traits::SizedYieldType);
@@ -166,8 +166,8 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                 let resume_ty = liberated_sig.inputs().get(0).copied().unwrap_or(tcx.types.unit);
 
                 let interior = self.next_ty_var(TypeVariableOrigin {
-                    kind: TypeVariableOriginKind::MiscVariable,
-                    span: body.value.span,
+                    kind: TypeVariableOriginKind::ClosureSynthetic,
+                    span: expr_span,
                 });
                 self.deferred_coroutine_interiors.borrow_mut().push((
                     expr_def_id,
@@ -192,7 +192,82 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                     Some(CoroutineTypes { resume_ty, yield_ty }),
                 )
             }
-            hir::ClosureKind::CoroutineClosure(_) => todo!(),
+            hir::ClosureKind::CoroutineClosure(kind) => {
+                let (bound_return_ty, bound_yield_ty) = match kind {
+                    hir::CoroutineDesugaring::Async => {
+                        (bound_sig.skip_binder().output(), tcx.types.unit)
+                    }
+                    hir::CoroutineDesugaring::Gen | hir::CoroutineDesugaring::AsyncGen => {
+                        todo!("`gen` and `async gen` closures not supported yet")
+                    }
+                };
+                let resume_ty = self.next_ty_var(TypeVariableOrigin {
+                    kind: TypeVariableOriginKind::ClosureSynthetic,
+                    span: expr_span,
+                });
+                let interior = self.next_ty_var(TypeVariableOrigin {
+                    kind: TypeVariableOriginKind::ClosureSynthetic,
+                    span: expr_span,
+                });
+                let closure_kind_ty = self.next_ty_var(TypeVariableOrigin {
+                    // FIXME(eddyb) distinguish closure kind inference variables from the rest.
+                    kind: TypeVariableOriginKind::ClosureSynthetic,
+                    span: expr_span,
+                });
+                let coroutine_captures_by_ref_ty = self.next_ty_var(TypeVariableOrigin {
+                    kind: TypeVariableOriginKind::ClosureSynthetic,
+                    span: expr_span,
+                });
+                let closure_args = ty::CoroutineClosureArgs::new(
+                    tcx,
+                    ty::CoroutineClosureArgsParts {
+                        parent_args,
+                        closure_kind_ty,
+                        signature_parts_ty: Ty::new_fn_ptr(
+                            tcx,
+                            bound_sig.map_bound(|sig| {
+                                tcx.mk_fn_sig(
+                                    [
+                                        resume_ty,
+                                        Ty::new_tup_from_iter(tcx, sig.inputs().iter().copied()),
+                                    ],
+                                    Ty::new_tup(tcx, &[bound_yield_ty, bound_return_ty]),
+                                    sig.c_variadic,
+                                    sig.unsafety,
+                                    sig.abi,
+                                )
+                            }),
+                        ),
+                        tupled_upvars_ty,
+                        coroutine_captures_by_ref_ty,
+                        coroutine_witness_ty: interior,
+                    },
+                );
+
+                let coroutine_upvars_ty = self.next_ty_var(TypeVariableOrigin {
+                    kind: TypeVariableOriginKind::ClosureSynthetic,
+                    span: expr_span,
+                });
+                liberated_sig = tcx.mk_fn_sig(
+                    liberated_sig.inputs().iter().copied(),
+                    tcx.liberate_late_bound_regions(
+                        expr_def_id.to_def_id(),
+                        closure_args.coroutine_closure_sig().map_bound(|sig| {
+                            sig.to_coroutine(
+                                tcx,
+                                parent_args,
+                                tcx.coroutine_for_closure(expr_def_id),
+                                coroutine_upvars_ty,
+                            )
+                        }),
+                    ),
+                    liberated_sig.c_variadic,
+                    liberated_sig.unsafety,
+                    liberated_sig.abi,
+                );
+
+                (Ty::new_coroutine_closure(tcx, expr_def_id.to_def_id(), closure_args.args), None)
+            }
         };
 
         check_fn(
diff --git a/compiler/rustc_hir_typeck/src/upvar.rs b/compiler/rustc_hir_typeck/src/upvar.rs
index 55ffac8d92f..b087d6d9e57 100644
--- a/compiler/rustc_hir_typeck/src/upvar.rs
+++ b/compiler/rustc_hir_typeck/src/upvar.rs
@@ -174,6 +174,9 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
             ty::Closure(def_id, args) => {
                 (def_id, UpvarArgs::Closure(args), self.closure_kind(ty).is_none())
             }
+            ty::CoroutineClosure(def_id, args) => {
+                (def_id, UpvarArgs::CoroutineClosure(args), self.closure_kind(ty).is_none())
+            }
             ty::Coroutine(def_id, args) => (def_id, UpvarArgs::Coroutine(args), false),
             ty::Error(_) => {
                 // #51714: skip analysis when we have already encountered type errors
@@ -333,6 +336,65 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
             }
         }
 
+        if let UpvarArgs::CoroutineClosure(args) = args {
+            let closure_env_region: ty::Region<'_> = ty::Region::new_bound(
+                self.tcx,
+                ty::INNERMOST,
+                ty::BoundRegion {
+                    var: ty::BoundVar::from_usize(0),
+                    kind: ty::BoundRegionKind::BrEnv,
+                },
+            );
+            let tupled_upvars_ty_for_borrow = Ty::new_tup_from_iter(
+                self.tcx,
+                self.typeck_results
+                    .borrow()
+                    .closure_min_captures_flattened(
+                        self.tcx.coroutine_for_closure(closure_def_id).expect_local(),
+                    )
+                    // Skip the captures that are just moving the closure's args
+                    // into the coroutine. These are always by move.
+                    .skip(
+                        args.as_coroutine_closure()
+                            .coroutine_closure_sig()
+                            .skip_binder()
+                            .tupled_inputs_ty
+                            .tuple_fields()
+                            .len(),
+                    )
+                    .map(|captured_place| {
+                        let upvar_ty = captured_place.place.ty();
+                        let capture = captured_place.info.capture_kind;
+                        apply_capture_kind_on_capture_ty(
+                            self.tcx,
+                            upvar_ty,
+                            capture,
+                            Some(closure_env_region),
+                        )
+                    }),
+            );
+            let coroutine_captures_by_ref_ty = Ty::new_fn_ptr(
+                self.tcx,
+                ty::Binder::bind_with_vars(
+                    self.tcx.mk_fn_sig(
+                        [],
+                        tupled_upvars_ty_for_borrow,
+                        false,
+                        hir::Unsafety::Normal,
+                        rustc_target::spec::abi::Abi::Rust,
+                    ),
+                    self.tcx.mk_bound_variable_kinds(&[ty::BoundVariableKind::Region(
+                        ty::BoundRegionKind::BrEnv,
+                    )]),
+                ),
+            );
+            self.demand_eqtype(
+                span,
+                args.as_coroutine_closure().coroutine_captures_by_ref_ty(),
+                coroutine_captures_by_ref_ty,
+            );
+        }
+
         self.log_closure_min_capture_info(closure_def_id, span);
 
         // Now that we've analyzed the closure, we know how each
@@ -602,7 +664,7 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
             };
 
             // Go through each entry in the current list of min_captures
-            // - if ancestor is found, update it's capture kind to account for current place's
+            // - if ancestor is found, update its capture kind to account for current place's
             // capture information.
             //
             // - if descendant is found, remove it from the list, and update the current place's
diff --git a/compiler/rustc_middle/src/middle/lang_items.rs b/compiler/rustc_middle/src/middle/lang_items.rs
index f92c72c8a58..a4e193ba2c9 100644
--- a/compiler/rustc_middle/src/middle/lang_items.rs
+++ b/compiler/rustc_middle/src/middle/lang_items.rs
@@ -23,7 +23,7 @@ impl<'tcx> TyCtxt<'tcx> {
         })
     }
 
-    /// Given a [`DefId`] of a [`Fn`], [`FnMut`] or [`FnOnce`] traits,
+    /// Given a [`DefId`] of one of the [`Fn`], [`FnMut`] or [`FnOnce`] traits,
     /// returns a corresponding [`ty::ClosureKind`].
     /// For any other [`DefId`] return `None`.
     pub fn fn_trait_kind_from_def_id(self, id: DefId) -> Option<ty::ClosureKind> {
@@ -36,6 +36,19 @@ impl<'tcx> TyCtxt<'tcx> {
         }
     }
 
+    /// Given a [`DefId`] of one of the `AsyncFn`, `AsyncFnMut` or `AsyncFnOnce` traits,
+    /// returns a corresponding [`ty::ClosureKind`].
+    /// For any other [`DefId`] return `None`.
+    pub fn async_fn_trait_kind_from_def_id(self, id: DefId) -> Option<ty::ClosureKind> {
+        let items = self.lang_items();
+        match Some(id) {
+            x if x == items.async_fn_trait() => Some(ty::ClosureKind::Fn),
+            x if x == items.async_fn_mut_trait() => Some(ty::ClosureKind::FnMut),
+            x if x == items.async_fn_once_trait() => Some(ty::ClosureKind::FnOnce),
+            _ => None,
+        }
+    }
+
     /// Given a [`ty::ClosureKind`], get the [`DefId`] of its corresponding `Fn`-family
     /// trait, if it is defined.
     pub fn fn_trait_kind_to_def_id(self, kind: ty::ClosureKind) -> Option<DefId> {
diff --git a/compiler/rustc_middle/src/query/mod.rs b/compiler/rustc_middle/src/query/mod.rs
index 2438f826441..f9ab32b16f5 100644
--- a/compiler/rustc_middle/src/query/mod.rs
+++ b/compiler/rustc_middle/src/query/mod.rs
@@ -755,6 +755,11 @@ rustc_queries! {
         separate_provide_extern
     }
 
+    query coroutine_for_closure(def_id: DefId) -> DefId {
+        desc { |_tcx| "TODO" }
+        separate_provide_extern
+    }
+
     /// Gets a map with the variance of every item; use `variances_of` instead.
     query crate_variances(_: ()) -> &'tcx ty::CrateVariancesMap<'tcx> {
         arena_cache
diff --git a/compiler/rustc_middle/src/traits/select.rs b/compiler/rustc_middle/src/traits/select.rs
index 64f4af08e12..4e11575cf98 100644
--- a/compiler/rustc_middle/src/traits/select.rs
+++ b/compiler/rustc_middle/src/traits/select.rs
@@ -134,6 +134,13 @@ pub enum SelectionCandidate<'tcx> {
         is_const: bool,
     },
 
+    /// Implementation of an `AsyncFn`-family trait by one of the anonymous types
+    /// generated for an `async ||` expression.
+    AsyncClosureCandidate,
+
+    // TODO:
+    AsyncFnKindHelperCandidate,
+
     /// Implementation of a `Coroutine` trait by one of the anonymous types
     /// generated for a coroutine.
     CoroutineCandidate,
diff --git a/compiler/rustc_middle/src/ty/mod.rs b/compiler/rustc_middle/src/ty/mod.rs
index 8e51db82f95..4348d01eff5 100644
--- a/compiler/rustc_middle/src/ty/mod.rs
+++ b/compiler/rustc_middle/src/ty/mod.rs
@@ -105,9 +105,10 @@ pub use self::region::{
 pub use self::rvalue_scopes::RvalueScopes;
 pub use self::sty::{
     AliasTy, Article, Binder, BoundTy, BoundTyKind, BoundVariableKind, CanonicalPolyFnSig,
-    ClosureArgs, ClosureArgsParts, CoroutineArgs, CoroutineArgsParts, FnSig, GenSig,
-    InlineConstArgs, InlineConstArgsParts, ParamConst, ParamTy, PolyFnSig, TyKind, TypeAndMut,
-    UpvarArgs, VarianceDiagInfo,
+    ClosureArgs, ClosureArgsParts, CoroutineArgs, CoroutineArgsParts, CoroutineClosureArgs,
+    CoroutineClosureArgsParts, CoroutineClosureSignature, FnSig, GenSig, InlineConstArgs,
+    InlineConstArgsParts, ParamConst, ParamTy, PolyFnSig, TyKind, TypeAndMut, UpvarArgs,
+    VarianceDiagInfo,
 };
 pub use self::trait_def::TraitDef;
 pub use self::typeck_results::{
diff --git a/compiler/rustc_middle/src/ty/sty.rs b/compiler/rustc_middle/src/ty/sty.rs
index c047f6a0521..e2a2e24f06d 100644
--- a/compiler/rustc_middle/src/ty/sty.rs
+++ b/compiler/rustc_middle/src/ty/sty.rs
@@ -36,6 +36,7 @@ use rustc_type_ir::TyKind as IrTyKind;
 use rustc_type_ir::TyKind::*;
 use rustc_type_ir::TypeAndMut as IrTypeAndMut;
 
+use super::fold::FnMutDelegate;
 use super::GenericParamDefKind;
 
 // Re-export and re-parameterize some `I = TyCtxt<'tcx>` types here
@@ -351,6 +352,27 @@ impl<'tcx> CoroutineClosureArgs<'tcx> {
         self.split().signature_parts_ty
     }
 
+    pub fn coroutine_closure_sig(self) -> ty::Binder<'tcx, CoroutineClosureSignature<'tcx>> {
+        let interior = self.coroutine_witness_ty();
+        let ty::FnPtr(sig) = self.signature_parts_ty().kind() else { bug!() };
+        sig.map_bound(|sig| {
+            let [resume_ty, tupled_inputs_ty] = *sig.inputs() else {
+                bug!();
+            };
+            let [yield_ty, return_ty] = **sig.output().tuple_fields() else { bug!() };
+            CoroutineClosureSignature {
+                interior,
+                tupled_inputs_ty,
+                resume_ty,
+                yield_ty,
+                return_ty,
+                c_variadic: sig.c_variadic,
+                unsafety: sig.unsafety,
+                abi: sig.abi,
+            }
+        })
+    }
+
     pub fn coroutine_captures_by_ref_ty(self) -> Ty<'tcx> {
         self.split().coroutine_captures_by_ref_ty
     }
@@ -360,6 +382,103 @@ impl<'tcx> CoroutineClosureArgs<'tcx> {
     }
 }
 
+#[derive(Copy, Clone, PartialEq, Eq, Debug, TypeFoldable, TypeVisitable)]
+pub struct CoroutineClosureSignature<'tcx> {
+    pub interior: Ty<'tcx>,
+    pub tupled_inputs_ty: Ty<'tcx>,
+    pub resume_ty: Ty<'tcx>,
+    pub yield_ty: Ty<'tcx>,
+    pub return_ty: Ty<'tcx>,
+    pub c_variadic: bool,
+    pub unsafety: hir::Unsafety,
+    pub abi: abi::Abi,
+}
+
+impl<'tcx> CoroutineClosureSignature<'tcx> {
+    pub fn to_coroutine(
+        self,
+        tcx: TyCtxt<'tcx>,
+        parent_args: &'tcx [GenericArg<'tcx>],
+        coroutine_def_id: DefId,
+        tupled_upvars_ty: Ty<'tcx>,
+    ) -> Ty<'tcx> {
+        let coroutine_args = ty::CoroutineArgs::new(
+            tcx,
+            ty::CoroutineArgsParts {
+                parent_args,
+                resume_ty: self.resume_ty,
+                yield_ty: self.yield_ty,
+                return_ty: self.return_ty,
+                witness: self.interior,
+                tupled_upvars_ty,
+            },
+        );
+
+        Ty::new_coroutine(tcx, coroutine_def_id, coroutine_args.args)
+    }
+
+    pub fn to_coroutine_given_kind_and_upvars(
+        self,
+        tcx: TyCtxt<'tcx>,
+        parent_args: &'tcx [GenericArg<'tcx>],
+        coroutine_def_id: DefId,
+        closure_kind: ty::ClosureKind,
+        env_region: ty::Region<'tcx>,
+        closure_tupled_upvars_ty: Ty<'tcx>,
+        coroutine_captures_by_ref_ty: Ty<'tcx>,
+    ) -> Ty<'tcx> {
+        let tupled_upvars_ty = Self::tupled_upvars_by_closure_kind(
+            tcx,
+            closure_kind,
+            self.tupled_inputs_ty,
+            closure_tupled_upvars_ty,
+            coroutine_captures_by_ref_ty,
+            env_region,
+        );
+
+        self.to_coroutine(tcx, parent_args, coroutine_def_id, tupled_upvars_ty)
+    }
+
+    /// Given a closure kind, compute the tupled upvars that the given coroutine would return.
+    pub fn tupled_upvars_by_closure_kind(
+        tcx: TyCtxt<'tcx>,
+        kind: ty::ClosureKind,
+        tupled_inputs_ty: Ty<'tcx>,
+        closure_tupled_upvars_ty: Ty<'tcx>,
+        coroutine_captures_by_ref_ty: Ty<'tcx>,
+        env_region: ty::Region<'tcx>,
+    ) -> Ty<'tcx> {
+        match kind {
+            ty::ClosureKind::Fn | ty::ClosureKind::FnMut => {
+                let ty::FnPtr(sig) = *coroutine_captures_by_ref_ty.kind() else {
+                    bug!();
+                };
+                let coroutine_captures_by_ref_ty = tcx.replace_escaping_bound_vars_uncached(
+                    sig.output().skip_binder(),
+                    FnMutDelegate {
+                        consts: &mut |c, t| ty::Const::new_bound(tcx, ty::INNERMOST, c, t),
+                        types: &mut |t| Ty::new_bound(tcx, ty::INNERMOST, t),
+                        regions: &mut |_| env_region,
+                    },
+                );
+                Ty::new_tup_from_iter(
+                    tcx,
+                    tupled_inputs_ty
+                        .tuple_fields()
+                        .iter()
+                        .chain(coroutine_captures_by_ref_ty.tuple_fields()),
+                )
+            }
+            ty::ClosureKind::FnOnce => Ty::new_tup_from_iter(
+                tcx,
+                tupled_inputs_ty
+                    .tuple_fields()
+                    .iter()
+                    .chain(closure_tupled_upvars_ty.tuple_fields()),
+            ),
+        }
+    }
+}
 /// Similar to `ClosureArgs`; see the above documentation for more.
 #[derive(Copy, Clone, PartialEq, Eq, Debug, TypeFoldable, TypeVisitable)]
 pub struct CoroutineArgs<'tcx> {
@@ -1495,7 +1614,7 @@ impl<'tcx> Ty<'tcx> {
     ) -> Ty<'tcx> {
         debug_assert_eq!(
             closure_args.len(),
-            tcx.generics_of(tcx.typeck_root_def_id(def_id)).count() + 3,
+            tcx.generics_of(tcx.typeck_root_def_id(def_id)).count() + 5,
             "closure constructed with incorrect substitutions"
         );
         Ty::new(tcx, CoroutineClosure(def_id, closure_args))
@@ -1835,6 +1954,11 @@ impl<'tcx> Ty<'tcx> {
         matches!(self.kind(), Coroutine(..))
     }
 
+    #[inline]
+    pub fn is_coroutine_closure(self) -> bool {
+        matches!(self.kind(), CoroutineClosure(..))
+    }
+
     #[inline]
     pub fn is_integral(self) -> bool {
         matches!(self.kind(), Infer(IntVar(_)) | Int(_) | Uint(_))
@@ -2144,7 +2268,7 @@ impl<'tcx> Ty<'tcx> {
 
             // "Bound" types appear in canonical queries when the
             // closure type is not yet known
-            Bound(..) | Infer(_) => None,
+            Bound(..) | Param(_) | Infer(_) => None,
 
             Error(_) => Some(ty::ClosureKind::Fn),
 
diff --git a/compiler/rustc_mir_build/src/build/mod.rs b/compiler/rustc_mir_build/src/build/mod.rs
index 9bcfe9fbc33..9a2f2ceced1 100644
--- a/compiler/rustc_mir_build/src/build/mod.rs
+++ b/compiler/rustc_mir_build/src/build/mod.rs
@@ -822,6 +822,7 @@ impl<'a, 'tcx> Builder<'a, 'tcx> {
         let upvar_args = match closure_ty.kind() {
             ty::Closure(_, args) => ty::UpvarArgs::Closure(args),
             ty::Coroutine(_, args) => ty::UpvarArgs::Coroutine(args),
+            ty::CoroutineClosure(_, args) => ty::UpvarArgs::CoroutineClosure(args),
             _ => return,
         };
 
diff --git a/compiler/rustc_mir_build/src/thir/cx/expr.rs b/compiler/rustc_mir_build/src/thir/cx/expr.rs
index 22094c112fc..35adcc33480 100644
--- a/compiler/rustc_mir_build/src/thir/cx/expr.rs
+++ b/compiler/rustc_mir_build/src/thir/cx/expr.rs
@@ -556,6 +556,9 @@ impl<'tcx> Cx<'tcx> {
                     ty::Coroutine(def_id, args) => {
                         (def_id, UpvarArgs::Coroutine(args), Some(tcx.coroutine_movability(def_id)))
                     }
+                    ty::CoroutineClosure(def_id, args) => {
+                        (def_id, UpvarArgs::CoroutineClosure(args), None)
+                    }
                     _ => {
                         span_bug!(expr.span, "closure expr w/o closure type: {:?}", closure_ty);
                     }
diff --git a/compiler/rustc_mir_build/src/thir/cx/mod.rs b/compiler/rustc_mir_build/src/thir/cx/mod.rs
index 5d0bb3954cc..f4f591d15b9 100644
--- a/compiler/rustc_mir_build/src/thir/cx/mod.rs
+++ b/compiler/rustc_mir_build/src/thir/cx/mod.rs
@@ -127,10 +127,24 @@ impl<'tcx> Cx<'tcx> {
             ty::Coroutine(..) => {
                 Param { ty: closure_ty, pat: None, ty_span: None, self_kind: None, hir_id: None }
             }
-            ty::Closure(_, closure_args) => {
+            ty::Closure(_, args) => {
                 let closure_env_ty = self.tcx.closure_env_ty(
                     closure_ty,
-                    closure_args.as_closure().kind(),
+                    args.as_closure().kind(),
+                    self.tcx.lifetimes.re_erased,
+                );
+                Param {
+                    ty: closure_env_ty,
+                    pat: None,
+                    ty_span: None,
+                    self_kind: None,
+                    hir_id: None,
+                }
+            }
+            ty::CoroutineClosure(_, args) => {
+                let closure_env_ty = self.tcx.closure_env_ty(
+                    closure_ty,
+                    args.as_coroutine_closure().kind(),
                     self.tcx.lifetimes.re_erased,
                 );
                 Param {
diff --git a/compiler/rustc_mir_dataflow/src/value_analysis.rs b/compiler/rustc_mir_dataflow/src/value_analysis.rs
index 2802f5491be..a85e53ff241 100644
--- a/compiler/rustc_mir_dataflow/src/value_analysis.rs
+++ b/compiler/rustc_mir_dataflow/src/value_analysis.rs
@@ -1149,6 +1149,9 @@ pub fn iter_fields<'tcx>(
         ty::Closure(_, args) => {
             iter_fields(args.as_closure().tupled_upvars_ty(), tcx, param_env, f);
         }
+        ty::CoroutineClosure(_, args) => {
+            iter_fields(args.as_coroutine_closure().tupled_upvars_ty(), tcx, param_env, f);
+        }
         _ => (),
     }
 }
diff --git a/compiler/rustc_mir_transform/src/const_prop_lint.rs b/compiler/rustc_mir_transform/src/const_prop_lint.rs
index aa22b8c7c58..f2448ee3d44 100644
--- a/compiler/rustc_mir_transform/src/const_prop_lint.rs
+++ b/compiler/rustc_mir_transform/src/const_prop_lint.rs
@@ -607,7 +607,8 @@ impl<'mir, 'tcx> ConstPropagator<'mir, 'tcx> {
                     AggregateKind::Array(_)
                     | AggregateKind::Tuple
                     | AggregateKind::Closure(_, _)
-                    | AggregateKind::Coroutine(_, _) => VariantIdx::new(0),
+                    | AggregateKind::Coroutine(_, _)
+                    | AggregateKind::CoroutineClosure(_, _) => VariantIdx::new(0),
                 },
             },
 
diff --git a/compiler/rustc_mir_transform/src/coverage/spans/from_mir.rs b/compiler/rustc_mir_transform/src/coverage/spans/from_mir.rs
index 5b4d58836b4..01fae7c0bec 100644
--- a/compiler/rustc_mir_transform/src/coverage/spans/from_mir.rs
+++ b/compiler/rustc_mir_transform/src/coverage/spans/from_mir.rs
@@ -156,7 +156,9 @@ fn bcb_to_initial_coverage_spans<'a, 'tcx>(
 fn is_closure_or_coroutine(statement: &Statement<'_>) -> bool {
     match statement.kind {
         StatementKind::Assign(box (_, Rvalue::Aggregate(box ref agg_kind, _))) => match agg_kind {
-            AggregateKind::Closure(_, _) | AggregateKind::Coroutine(_, _) => true,
+            AggregateKind::Closure(_, _)
+            | AggregateKind::Coroutine(_, _)
+            | AggregateKind::CoroutineClosure(..) => true,
             _ => false,
         },
         _ => false,
diff --git a/compiler/rustc_span/src/symbol.rs b/compiler/rustc_span/src/symbol.rs
index dbfc89c2d49..587eb1c7cc5 100644
--- a/compiler/rustc_span/src/symbol.rs
+++ b/compiler/rustc_span/src/symbol.rs
@@ -167,6 +167,9 @@ symbols! {
         Break,
         C,
         CStr,
+        CallFuture,
+        CallMutFuture,
+        CallOnceFuture,
         Capture,
         Center,
         Cleanup,
@@ -420,6 +423,7 @@ symbols! {
         async_closure,
         async_fn,
         async_fn_in_trait,
+        async_fn_kind_helper,
         async_fn_mut,
         async_fn_once,
         async_fn_track_caller,
diff --git a/compiler/rustc_trait_selection/src/solve/assembly/mod.rs b/compiler/rustc_trait_selection/src/solve/assembly/mod.rs
index e1c68039e79..8451fbcc434 100644
--- a/compiler/rustc_trait_selection/src/solve/assembly/mod.rs
+++ b/compiler/rustc_trait_selection/src/solve/assembly/mod.rs
@@ -182,6 +182,20 @@ pub(super) trait GoalKind<'tcx>:
         kind: ty::ClosureKind,
     ) -> QueryResult<'tcx>;
 
+    /// An async closure is known to implement the `AsyncFn<A>` family of traits
+    /// where `A` is given by the signature of the type.
+    fn consider_builtin_async_fn_trait_candidates(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+        kind: ty::ClosureKind,
+    ) -> QueryResult<'tcx>;
+
+    /// TODO:
+    fn consider_builtin_async_fn_kind_helper_candidate(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+    ) -> QueryResult<'tcx>;
+
     /// `Tuple` is implemented if the `Self` type is a tuple.
     fn consider_builtin_tuple_candidate(
         ecx: &mut EvalCtxt<'_, 'tcx>,
@@ -461,6 +475,10 @@ impl<'tcx> EvalCtxt<'_, 'tcx> {
             G::consider_builtin_fn_ptr_trait_candidate(self, goal)
         } else if let Some(kind) = self.tcx().fn_trait_kind_from_def_id(trait_def_id) {
             G::consider_builtin_fn_trait_candidates(self, goal, kind)
+        } else if let Some(kind) = self.tcx().async_fn_trait_kind_from_def_id(trait_def_id) {
+            G::consider_builtin_async_fn_trait_candidates(self, goal, kind)
+        } else if lang_items.async_fn_kind_helper() == Some(trait_def_id) {
+            G::consider_builtin_async_fn_kind_helper_candidate(self, goal)
         } else if lang_items.tuple_trait() == Some(trait_def_id) {
             G::consider_builtin_tuple_candidate(self, goal)
         } else if lang_items.pointee_trait() == Some(trait_def_id) {
diff --git a/compiler/rustc_trait_selection/src/solve/assembly/structural_traits.rs b/compiler/rustc_trait_selection/src/solve/assembly/structural_traits.rs
index 3c571e1d96f..c35134c78eb 100644
--- a/compiler/rustc_trait_selection/src/solve/assembly/structural_traits.rs
+++ b/compiler/rustc_trait_selection/src/solve/assembly/structural_traits.rs
@@ -1,11 +1,12 @@
 //! Code which is used by built-in goals that match "structurally", such a auto
 //! traits, `Copy`/`Clone`.
 use rustc_data_structures::fx::FxHashMap;
+use rustc_hir::LangItem;
 use rustc_hir::{def_id::DefId, Movability, Mutability};
 use rustc_infer::traits::query::NoSolution;
 use rustc_middle::traits::solve::Goal;
 use rustc_middle::ty::{
-    self, Ty, TyCtxt, TypeFoldable, TypeFolder, TypeSuperFoldable, TypeVisitableExt,
+    self, ToPredicate, Ty, TyCtxt, TypeFoldable, TypeFolder, TypeSuperFoldable, TypeVisitableExt,
 };
 
 use crate::solve::EvalCtxt;
@@ -306,6 +307,114 @@ pub(in crate::solve) fn extract_tupled_inputs_and_output_from_callable<'tcx>(
     }
 }
 
+// Returns a binder of the tupled inputs types, output type, and coroutine type
+// from a builtin async closure type.
+pub(in crate::solve) fn extract_tupled_inputs_and_output_from_async_callable<'tcx>(
+    tcx: TyCtxt<'tcx>,
+    self_ty: Ty<'tcx>,
+    goal_kind: ty::ClosureKind,
+    env_region: ty::Region<'tcx>,
+) -> Result<(ty::Binder<'tcx, (Ty<'tcx>, Ty<'tcx>, Ty<'tcx>)>, Vec<ty::Predicate<'tcx>>), NoSolution>
+{
+    match *self_ty.kind() {
+        ty::CoroutineClosure(def_id, args) => {
+            let args = args.as_coroutine_closure();
+            let kind_ty = args.kind_ty();
+
+            if let Some(closure_kind) = kind_ty.to_opt_closure_kind() {
+                if !closure_kind.extends(goal_kind) {
+                    return Err(NoSolution);
+                }
+                Ok((
+                    args.coroutine_closure_sig().map_bound(|sig| {
+                        let coroutine_ty = sig.to_coroutine_given_kind_and_upvars(
+                            tcx,
+                            args.parent_args(),
+                            tcx.coroutine_for_closure(def_id),
+                            goal_kind,
+                            env_region,
+                            args.tupled_upvars_ty(),
+                            args.coroutine_captures_by_ref_ty(),
+                        );
+                        (sig.tupled_inputs_ty, sig.return_ty, coroutine_ty)
+                    }),
+                    vec![],
+                ))
+            } else {
+                let helper_trait_def_id = tcx.require_lang_item(LangItem::AsyncFnKindHelper, None);
+                // FIXME(async_closures): Make this into a lang item.
+                let upvars_projection_def_id = tcx
+                    .associated_items(helper_trait_def_id)
+                    .in_definition_order()
+                    .next()
+                    .unwrap()
+                    .def_id;
+                Ok((
+                    args.coroutine_closure_sig().map_bound(|sig| {
+                        let tupled_upvars_ty = Ty::new_projection(
+                            tcx,
+                            upvars_projection_def_id,
+                            [
+                                ty::GenericArg::from(kind_ty),
+                                Ty::from_closure_kind(tcx, goal_kind).into(),
+                                env_region.into(),
+                                sig.tupled_inputs_ty.into(),
+                                args.tupled_upvars_ty().into(),
+                                args.coroutine_captures_by_ref_ty().into(),
+                            ],
+                        );
+                        let coroutine_ty = sig.to_coroutine(
+                            tcx,
+                            args.parent_args(),
+                            tcx.coroutine_for_closure(def_id),
+                            tupled_upvars_ty,
+                        );
+                        (sig.tupled_inputs_ty, sig.return_ty, coroutine_ty)
+                    }),
+                    vec![
+                        ty::TraitRef::new(
+                            tcx,
+                            helper_trait_def_id,
+                            [kind_ty, Ty::from_closure_kind(tcx, goal_kind)],
+                        )
+                        .to_predicate(tcx),
+                    ],
+                ))
+            }
+        }
+
+        ty::FnDef(..) | ty::FnPtr(..) | ty::Closure(..) => Err(NoSolution),
+
+        ty::Bool
+        | ty::Char
+        | ty::Int(_)
+        | ty::Uint(_)
+        | ty::Float(_)
+        | ty::Adt(_, _)
+        | ty::Foreign(_)
+        | ty::Str
+        | ty::Array(_, _)
+        | ty::Slice(_)
+        | ty::RawPtr(_)
+        | ty::Ref(_, _, _)
+        | ty::Dynamic(_, _, _)
+        | ty::Coroutine(_, _)
+        | ty::CoroutineWitness(..)
+        | ty::Never
+        | ty::Tuple(_)
+        | ty::Alias(_, _)
+        | ty::Param(_)
+        | ty::Placeholder(..)
+        | ty::Infer(ty::IntVar(_) | ty::FloatVar(_))
+        | ty::Error(_) => Err(NoSolution),
+
+        ty::Bound(..)
+        | ty::Infer(ty::TyVar(_) | ty::FreshTy(_) | ty::FreshIntTy(_) | ty::FreshFloatTy(_)) => {
+            bug!("unexpected type `{self_ty}`")
+        }
+    }
+}
+
 /// Assemble a list of predicates that would be present on a theoretical
 /// user impl for an object type. These predicates must be checked any time
 /// we assemble a built-in object candidate for an object type, since they
diff --git a/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs b/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs
index e1c6f67f05e..47ba549022d 100644
--- a/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs
+++ b/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs
@@ -366,6 +366,119 @@ impl<'tcx> assembly::GoalKind<'tcx> for NormalizesTo<'tcx> {
         Self::consider_implied_clause(ecx, goal, pred, [goal.with(tcx, output_is_sized_pred)])
     }
 
+    fn consider_builtin_async_fn_trait_candidates(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+        goal_kind: ty::ClosureKind,
+    ) -> QueryResult<'tcx> {
+        let tcx = ecx.tcx();
+
+        let env_region = match goal_kind {
+            ty::ClosureKind::Fn | ty::ClosureKind::FnMut => goal.predicate.alias.args.region_at(2),
+            // Doesn't matter what this region is
+            ty::ClosureKind::FnOnce => tcx.lifetimes.re_static,
+        };
+        let (tupled_inputs_and_output_and_coroutine, nested_preds) =
+            structural_traits::extract_tupled_inputs_and_output_from_async_callable(
+                tcx,
+                goal.predicate.self_ty(),
+                goal_kind,
+                env_region,
+            )?;
+        let output_is_sized_pred =
+            tupled_inputs_and_output_and_coroutine.map_bound(|(_, output, _)| {
+                ty::TraitRef::from_lang_item(tcx, LangItem::Sized, DUMMY_SP, [output])
+            });
+
+        let pred = tupled_inputs_and_output_and_coroutine
+            .map_bound(|(inputs, output, coroutine)| {
+                let (projection_ty, term) = match tcx.item_name(goal.predicate.def_id()) {
+                    sym::CallOnceFuture => (
+                        ty::AliasTy::new(
+                            tcx,
+                            goal.predicate.def_id(),
+                            [goal.predicate.self_ty(), inputs],
+                        ),
+                        coroutine.into(),
+                    ),
+                    sym::CallMutFuture | sym::CallFuture => (
+                        ty::AliasTy::new(
+                            tcx,
+                            goal.predicate.def_id(),
+                            [
+                                ty::GenericArg::from(goal.predicate.self_ty()),
+                                inputs.into(),
+                                env_region.into(),
+                            ],
+                        ),
+                        coroutine.into(),
+                    ),
+                    sym::Output => (
+                        ty::AliasTy::new(
+                            tcx,
+                            goal.predicate.def_id(),
+                            [ty::GenericArg::from(goal.predicate.self_ty()), inputs.into()],
+                        ),
+                        output.into(),
+                    ),
+                    name => bug!("no such associated type: {name}"),
+                };
+                ty::ProjectionPredicate { projection_ty, term }
+            })
+            .to_predicate(tcx);
+
+        // A built-in `AsyncFn` impl only holds if the output is sized.
+        // (FIXME: technically we only need to check this if the type is a fn ptr...)
+        Self::consider_implied_clause(
+            ecx,
+            goal,
+            pred,
+            [goal.with(tcx, output_is_sized_pred)]
+                .into_iter()
+                .chain(nested_preds.into_iter().map(|pred| goal.with(tcx, pred))),
+        )
+    }
+
+    fn consider_builtin_async_fn_kind_helper_candidate(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+    ) -> QueryResult<'tcx> {
+        let [
+            closure_fn_kind_ty,
+            goal_kind_ty,
+            borrow_region,
+            tupled_inputs_ty,
+            tupled_upvars_ty,
+            coroutine_captures_by_ref_ty,
+        ] = **goal.predicate.alias.args
+        else {
+            bug!();
+        };
+
+        let Some(closure_kind) = closure_fn_kind_ty.expect_ty().to_opt_closure_kind() else {
+            // We don't need to worry about the self type being an infer var.
+            return Err(NoSolution);
+        };
+        let Some(goal_kind) = goal_kind_ty.expect_ty().to_opt_closure_kind() else {
+            return Err(NoSolution);
+        };
+        if !closure_kind.extends(goal_kind) {
+            return Err(NoSolution);
+        }
+
+        let upvars_ty = ty::CoroutineClosureSignature::tupled_upvars_by_closure_kind(
+            ecx.tcx(),
+            goal_kind,
+            tupled_inputs_ty.expect_ty(),
+            tupled_upvars_ty.expect_ty(),
+            coroutine_captures_by_ref_ty.expect_ty(),
+            borrow_region.expect_region(),
+        );
+
+        ecx.eq(goal.param_env, goal.predicate.term.ty().unwrap(), upvars_ty)?;
+        ecx.evaluate_added_goals_and_make_canonical_response(Certainty::Yes)
+    }
+
     fn consider_builtin_tuple_candidate(
         _ecx: &mut EvalCtxt<'_, 'tcx>,
         goal: Goal<'tcx, Self>,
diff --git a/compiler/rustc_trait_selection/src/solve/trait_goals.rs b/compiler/rustc_trait_selection/src/solve/trait_goals.rs
index efaad47b6dd..fd09a6b671d 100644
--- a/compiler/rustc_trait_selection/src/solve/trait_goals.rs
+++ b/compiler/rustc_trait_selection/src/solve/trait_goals.rs
@@ -303,6 +303,66 @@ impl<'tcx> assembly::GoalKind<'tcx> for TraitPredicate<'tcx> {
         Self::consider_implied_clause(ecx, goal, pred, [goal.with(tcx, output_is_sized_pred)])
     }
 
+    fn consider_builtin_async_fn_trait_candidates(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+        goal_kind: ty::ClosureKind,
+    ) -> QueryResult<'tcx> {
+        if goal.predicate.polarity != ty::ImplPolarity::Positive {
+            return Err(NoSolution);
+        }
+
+        let tcx = ecx.tcx();
+        let (tupled_inputs_and_output_and_coroutine, nested_preds) =
+            structural_traits::extract_tupled_inputs_and_output_from_async_callable(
+                tcx,
+                goal.predicate.self_ty(),
+                goal_kind,
+                // This region doesn't matter because we're throwing away the coroutine type
+                tcx.lifetimes.re_static,
+            )?;
+        let output_is_sized_pred =
+            tupled_inputs_and_output_and_coroutine.map_bound(|(_, output, _)| {
+                ty::TraitRef::from_lang_item(tcx, LangItem::Sized, DUMMY_SP, [output])
+            });
+
+        let pred = tupled_inputs_and_output_and_coroutine
+            .map_bound(|(inputs, _, _)| {
+                ty::TraitRef::new(tcx, goal.predicate.def_id(), [goal.predicate.self_ty(), inputs])
+            })
+            .to_predicate(tcx);
+        // A built-in `AsyncFn` impl only holds if the output is sized.
+        // (FIXME: technically we only need to check this if the type is a fn ptr...)
+        Self::consider_implied_clause(
+            ecx,
+            goal,
+            pred,
+            [goal.with(tcx, output_is_sized_pred)]
+                .into_iter()
+                .chain(nested_preds.into_iter().map(|pred| goal.with(tcx, pred))),
+        )
+    }
+
+    fn consider_builtin_async_fn_kind_helper_candidate(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+    ) -> QueryResult<'tcx> {
+        let [closure_fn_kind_ty, goal_kind_ty] = **goal.predicate.trait_ref.args else {
+            bug!();
+        };
+
+        let Some(closure_kind) = closure_fn_kind_ty.expect_ty().to_opt_closure_kind() else {
+            // We don't need to worry about the self type being an infer var.
+            return Err(NoSolution);
+        };
+        let goal_kind = goal_kind_ty.expect_ty().to_opt_closure_kind().unwrap();
+        if closure_kind.extends(goal_kind) {
+            ecx.evaluate_added_goals_and_make_canonical_response(Certainty::Yes)
+        } else {
+            Err(NoSolution)
+        }
+    }
+
     fn consider_builtin_tuple_candidate(
         ecx: &mut EvalCtxt<'_, 'tcx>,
         goal: Goal<'tcx, Self>,
diff --git a/compiler/rustc_trait_selection/src/traits/project.rs b/compiler/rustc_trait_selection/src/traits/project.rs
index a960befcd4b..648f14beaa7 100644
--- a/compiler/rustc_trait_selection/src/traits/project.rs
+++ b/compiler/rustc_trait_selection/src/traits/project.rs
@@ -1833,10 +1833,28 @@ fn assemble_candidates_from_impls<'cx, 'tcx>(
                     lang_items.fn_trait(),
                     lang_items.fn_mut_trait(),
                     lang_items.fn_once_trait(),
+                    lang_items.async_fn_trait(),
+                    lang_items.async_fn_mut_trait(),
+                    lang_items.async_fn_once_trait(),
                 ].contains(&Some(trait_ref.def_id))
                 {
                     true
-                }else if lang_items.discriminant_kind_trait() == Some(trait_ref.def_id) {
+                } else if lang_items.async_fn_kind_helper() == Some(trait_ref.def_id) {
+                    // FIXME(async_closures): Validity constraints here could be cleaned up.
+                    if obligation.predicate.args.type_at(0).is_ty_var()
+                        || obligation.predicate.args.type_at(4).is_ty_var()
+                        || obligation.predicate.args.type_at(5).is_ty_var()
+                    {
+                        candidate_set.mark_ambiguous();
+                        true
+                    } else if obligation.predicate.args.type_at(0).to_opt_closure_kind().is_some()
+                        && obligation.predicate.args.type_at(1).to_opt_closure_kind().is_some()
+                    {
+                        true
+                    } else {
+                        false
+                    }
+                } else if lang_items.discriminant_kind_trait() == Some(trait_ref.def_id) {
                     match self_ty.kind() {
                         ty::Bool
                         | ty::Char
@@ -2061,6 +2079,10 @@ fn confirm_select_candidate<'cx, 'tcx>(
                 } else {
                     confirm_fn_pointer_candidate(selcx, obligation, data)
                 }
+            } else if selcx.tcx().async_fn_trait_kind_from_def_id(trait_def_id).is_some() {
+                confirm_async_closure_candidate(selcx, obligation, data)
+            } else if lang_items.async_fn_kind_helper() == Some(trait_def_id) {
+                confirm_async_fn_kind_helper_candidate(selcx, obligation, data)
             } else {
                 confirm_builtin_candidate(selcx, obligation, data)
             }
@@ -2421,6 +2443,164 @@ fn confirm_callable_candidate<'cx, 'tcx>(
     confirm_param_env_candidate(selcx, obligation, predicate, true)
 }
 
+fn confirm_async_closure_candidate<'cx, 'tcx>(
+    selcx: &mut SelectionContext<'cx, 'tcx>,
+    obligation: &ProjectionTyObligation<'tcx>,
+    mut nested: Vec<PredicateObligation<'tcx>>,
+) -> Progress<'tcx> {
+    let self_ty = selcx.infcx.shallow_resolve(obligation.predicate.self_ty());
+    let ty::CoroutineClosure(def_id, args) = *self_ty.kind() else {
+        unreachable!(
+            "expected coroutine-closure self type for coroutine-closure candidate, found {self_ty}"
+        )
+    };
+    let args = args.as_coroutine_closure();
+    let kind_ty = args.kind_ty();
+
+    let tcx = selcx.tcx();
+    let goal_kind =
+        tcx.async_fn_trait_kind_from_def_id(obligation.predicate.trait_def_id(tcx)).unwrap();
+
+    let helper_trait_def_id = tcx.require_lang_item(LangItem::AsyncFnKindHelper, None);
+    nested.push(obligation.with(
+        tcx,
+        ty::TraitRef::new(
+            tcx,
+            helper_trait_def_id,
+            [kind_ty, Ty::from_closure_kind(tcx, goal_kind)],
+        ),
+    ));
+
+    let env_region = match goal_kind {
+        ty::ClosureKind::Fn | ty::ClosureKind::FnMut => obligation.predicate.args.region_at(2),
+        ty::ClosureKind::FnOnce => tcx.lifetimes.re_static,
+    };
+
+    // FIXME(async_closures): Make this into a lang item.
+    let upvars_projection_def_id =
+        tcx.associated_items(helper_trait_def_id).in_definition_order().next().unwrap().def_id;
+
+    // FIXME(async_closures): Confirmation is kind of a mess here. Ideally,
+    // we'd short-circuit when we know that the goal_kind >= closure_kind, and not
+    // register a nested predicate or create a new projection ty here. But I'm too
+    // lazy to make this more efficient atm, and we can always tweak it later,
+    // since all this does is make the solver do more work.
+    //
+    // The code duplication due to the different length args is kind of weird, too.
+    let poly_cache_entry = args.coroutine_closure_sig().map_bound(|sig| {
+        let (projection_ty, term) = match tcx.item_name(obligation.predicate.def_id) {
+            sym::CallOnceFuture => {
+                let tupled_upvars_ty = Ty::new_projection(
+                    tcx,
+                    upvars_projection_def_id,
+                    [
+                        ty::GenericArg::from(kind_ty),
+                        Ty::from_closure_kind(tcx, goal_kind).into(),
+                        env_region.into(),
+                        sig.tupled_inputs_ty.into(),
+                        args.tupled_upvars_ty().into(),
+                        args.coroutine_captures_by_ref_ty().into(),
+                    ],
+                );
+                let coroutine_ty = sig.to_coroutine(
+                    tcx,
+                    args.parent_args(),
+                    tcx.coroutine_for_closure(def_id),
+                    tupled_upvars_ty,
+                );
+                (
+                    ty::AliasTy::new(
+                        tcx,
+                        obligation.predicate.def_id,
+                        [self_ty, sig.tupled_inputs_ty],
+                    ),
+                    coroutine_ty.into(),
+                )
+            }
+            sym::CallMutFuture | sym::CallFuture => {
+                let tupled_upvars_ty = Ty::new_projection(
+                    tcx,
+                    upvars_projection_def_id,
+                    [
+                        ty::GenericArg::from(kind_ty),
+                        Ty::from_closure_kind(tcx, goal_kind).into(),
+                        env_region.into(),
+                        sig.tupled_inputs_ty.into(),
+                        args.tupled_upvars_ty().into(),
+                        args.coroutine_captures_by_ref_ty().into(),
+                    ],
+                );
+                let coroutine_ty = sig.to_coroutine(
+                    tcx,
+                    args.parent_args(),
+                    tcx.coroutine_for_closure(def_id),
+                    tupled_upvars_ty,
+                );
+                (
+                    ty::AliasTy::new(
+                        tcx,
+                        obligation.predicate.def_id,
+                        [
+                            ty::GenericArg::from(self_ty),
+                            sig.tupled_inputs_ty.into(),
+                            env_region.into(),
+                        ],
+                    ),
+                    coroutine_ty.into(),
+                )
+            }
+            sym::Output => (
+                ty::AliasTy::new(tcx, obligation.predicate.def_id, [self_ty, sig.tupled_inputs_ty]),
+                sig.return_ty.into(),
+            ),
+            name => bug!("no such associated type: {name}"),
+        };
+        ty::ProjectionPredicate { projection_ty, term }
+    });
+
+    confirm_param_env_candidate(selcx, obligation, poly_cache_entry, true)
+        .with_addl_obligations(nested)
+}
+
+fn confirm_async_fn_kind_helper_candidate<'cx, 'tcx>(
+    selcx: &mut SelectionContext<'cx, 'tcx>,
+    obligation: &ProjectionTyObligation<'tcx>,
+    nested: Vec<PredicateObligation<'tcx>>,
+) -> Progress<'tcx> {
+    let [
+        // We already checked that the goal_kind >= closure_kind
+        _closure_kind_ty,
+        goal_kind_ty,
+        borrow_region,
+        tupled_inputs_ty,
+        tupled_upvars_ty,
+        coroutine_captures_by_ref_ty,
+    ] = **obligation.predicate.args
+    else {
+        bug!();
+    };
+
+    let predicate = ty::ProjectionPredicate {
+        projection_ty: ty::AliasTy::new(
+            selcx.tcx(),
+            obligation.predicate.def_id,
+            obligation.predicate.args,
+        ),
+        term: ty::CoroutineClosureSignature::tupled_upvars_by_closure_kind(
+            selcx.tcx(),
+            goal_kind_ty.expect_ty().to_opt_closure_kind().unwrap(),
+            tupled_inputs_ty.expect_ty(),
+            tupled_upvars_ty.expect_ty(),
+            coroutine_captures_by_ref_ty.expect_ty(),
+            borrow_region.expect_region(),
+        )
+        .into(),
+    };
+
+    confirm_param_env_candidate(selcx, obligation, ty::Binder::dummy(predicate), false)
+        .with_addl_obligations(nested)
+}
+
 fn confirm_param_env_candidate<'cx, 'tcx>(
     selcx: &mut SelectionContext<'cx, 'tcx>,
     obligation: &ProjectionTyObligation<'tcx>,
diff --git a/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs b/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs
index b354ebf111f..75aedd5cd22 100644
--- a/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs
+++ b/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs
@@ -117,9 +117,12 @@ impl<'cx, 'tcx> SelectionContext<'cx, 'tcx> {
                     self.assemble_iterator_candidates(obligation, &mut candidates);
                 } else if lang_items.async_iterator_trait() == Some(def_id) {
                     self.assemble_async_iterator_candidates(obligation, &mut candidates);
+                } else if lang_items.async_fn_kind_helper() == Some(def_id) {
+                    self.assemble_async_fn_kind_helper_candidates(obligation, &mut candidates);
                 }
 
                 self.assemble_closure_candidates(obligation, &mut candidates);
+                self.assemble_async_closure_candidates(obligation, &mut candidates);
                 self.assemble_fn_pointer_candidates(obligation, &mut candidates);
                 self.assemble_candidates_from_impls(obligation, &mut candidates);
                 self.assemble_candidates_from_object_ty(obligation, &mut candidates);
@@ -335,6 +338,49 @@ impl<'cx, 'tcx> SelectionContext<'cx, 'tcx> {
         }
     }
 
+    fn assemble_async_closure_candidates(
+        &mut self,
+        obligation: &PolyTraitObligation<'tcx>,
+        candidates: &mut SelectionCandidateSet<'tcx>,
+    ) {
+        let Some(goal_kind) =
+            self.tcx().async_fn_trait_kind_from_def_id(obligation.predicate.def_id())
+        else {
+            return;
+        };
+
+        match *obligation.self_ty().skip_binder().kind() {
+            ty::CoroutineClosure(_, args) => {
+                if let Some(closure_kind) =
+                    args.as_coroutine_closure().kind_ty().to_opt_closure_kind()
+                    && !closure_kind.extends(goal_kind)
+                {
+                    return;
+                }
+                candidates.vec.push(AsyncClosureCandidate);
+            }
+            ty::Infer(ty::TyVar(_)) => {
+                candidates.ambiguous = true;
+            }
+            _ => {}
+        }
+    }
+
+    fn assemble_async_fn_kind_helper_candidates(
+        &mut self,
+        obligation: &PolyTraitObligation<'tcx>,
+        candidates: &mut SelectionCandidateSet<'tcx>,
+    ) {
+        if let Some(closure_kind) = obligation.self_ty().skip_binder().to_opt_closure_kind()
+            && let Some(goal_kind) =
+                obligation.predicate.skip_binder().trait_ref.args.type_at(1).to_opt_closure_kind()
+        {
+            if closure_kind.extends(goal_kind) {
+                candidates.vec.push(AsyncFnKindHelperCandidate);
+            }
+        }
+    }
+
     /// Implements one of the `Fn()` family for a fn pointer.
     fn assemble_fn_pointer_candidates(
         &mut self,
diff --git a/compiler/rustc_trait_selection/src/traits/select/confirmation.rs b/compiler/rustc_trait_selection/src/traits/select/confirmation.rs
index 74f388e53a3..79336a9d4e8 100644
--- a/compiler/rustc_trait_selection/src/traits/select/confirmation.rs
+++ b/compiler/rustc_trait_selection/src/traits/select/confirmation.rs
@@ -83,6 +83,13 @@ impl<'cx, 'tcx> SelectionContext<'cx, 'tcx> {
                 ImplSource::Builtin(BuiltinImplSource::Misc, vtable_closure)
             }
 
+            AsyncClosureCandidate => {
+                let vtable_closure = self.confirm_async_closure_candidate(obligation)?;
+                ImplSource::Builtin(BuiltinImplSource::Misc, vtable_closure)
+            }
+
+            AsyncFnKindHelperCandidate => ImplSource::Builtin(BuiltinImplSource::Misc, vec![]),
+
             CoroutineCandidate => {
                 let vtable_coroutine = self.confirm_coroutine_candidate(obligation)?;
                 ImplSource::Builtin(BuiltinImplSource::Misc, vtable_coroutine)
@@ -869,6 +876,49 @@ impl<'cx, 'tcx> SelectionContext<'cx, 'tcx> {
         Ok(nested)
     }
 
+    #[instrument(skip(self), level = "debug")]
+    fn confirm_async_closure_candidate(
+        &mut self,
+        obligation: &PolyTraitObligation<'tcx>,
+    ) -> Result<Vec<PredicateObligation<'tcx>>, SelectionError<'tcx>> {
+        // Okay to skip binder because the args on closure types never
+        // touch bound regions, they just capture the in-scope
+        // type/region parameters.
+        let self_ty = self.infcx.shallow_resolve(obligation.self_ty().skip_binder());
+        let ty::CoroutineClosure(closure_def_id, args) = *self_ty.kind() else {
+            bug!("async closure candidate for non-coroutine-closure {:?}", obligation);
+        };
+
+        let trait_ref = args.as_coroutine_closure().coroutine_closure_sig().map_bound(|sig| {
+            ty::TraitRef::new(
+                self.tcx(),
+                obligation.predicate.def_id(),
+                [self_ty, sig.tupled_inputs_ty],
+            )
+        });
+
+        let mut nested = self.confirm_poly_trait_refs(obligation, trait_ref)?;
+
+        let goal_kind =
+            self.tcx().async_fn_trait_kind_from_def_id(obligation.predicate.def_id()).unwrap();
+        nested.push(obligation.with(
+            self.tcx(),
+            ty::TraitRef::from_lang_item(
+                self.tcx(),
+                LangItem::AsyncFnKindHelper,
+                obligation.cause.span,
+                [
+                    args.as_coroutine_closure().kind_ty(),
+                    Ty::from_closure_kind(self.tcx(), goal_kind),
+                ],
+            ),
+        ));
+
+        debug!(?closure_def_id, ?trait_ref, ?nested, "confirm closure candidate obligations");
+
+        Ok(nested)
+    }
+
     /// In the case of closure types and fn pointers,
     /// we currently treat the input type parameters on the trait as
     /// outputs. This means that when we have a match we have only
diff --git a/compiler/rustc_trait_selection/src/traits/select/mod.rs b/compiler/rustc_trait_selection/src/traits/select/mod.rs
index e79277b89c4..7f41c73b72f 100644
--- a/compiler/rustc_trait_selection/src/traits/select/mod.rs
+++ b/compiler/rustc_trait_selection/src/traits/select/mod.rs
@@ -1864,6 +1864,8 @@ impl<'tcx> SelectionContext<'_, 'tcx> {
                 ImplCandidate(..)
                 | AutoImplCandidate
                 | ClosureCandidate { .. }
+                | AsyncClosureCandidate
+                | AsyncFnKindHelperCandidate
                 | CoroutineCandidate
                 | FutureCandidate
                 | IteratorCandidate
@@ -1894,6 +1896,8 @@ impl<'tcx> SelectionContext<'_, 'tcx> {
                 ImplCandidate(_)
                 | AutoImplCandidate
                 | ClosureCandidate { .. }
+                | AsyncClosureCandidate
+                | AsyncFnKindHelperCandidate
                 | CoroutineCandidate
                 | FutureCandidate
                 | IteratorCandidate
@@ -1930,6 +1934,8 @@ impl<'tcx> SelectionContext<'_, 'tcx> {
                 ImplCandidate(..)
                 | AutoImplCandidate
                 | ClosureCandidate { .. }
+                | AsyncClosureCandidate
+                | AsyncFnKindHelperCandidate
                 | CoroutineCandidate
                 | FutureCandidate
                 | IteratorCandidate
@@ -1946,6 +1952,8 @@ impl<'tcx> SelectionContext<'_, 'tcx> {
                 ImplCandidate(..)
                 | AutoImplCandidate
                 | ClosureCandidate { .. }
+                | AsyncClosureCandidate
+                | AsyncFnKindHelperCandidate
                 | CoroutineCandidate
                 | FutureCandidate
                 | IteratorCandidate
@@ -2054,6 +2062,8 @@ impl<'tcx> SelectionContext<'_, 'tcx> {
             (
                 ImplCandidate(_)
                 | ClosureCandidate { .. }
+                | AsyncClosureCandidate
+                | AsyncFnKindHelperCandidate
                 | CoroutineCandidate
                 | FutureCandidate
                 | IteratorCandidate
@@ -2066,6 +2076,8 @@ impl<'tcx> SelectionContext<'_, 'tcx> {
                 | TraitAliasCandidate,
                 ImplCandidate(_)
                 | ClosureCandidate { .. }
+                | AsyncClosureCandidate
+                | AsyncFnKindHelperCandidate
                 | CoroutineCandidate
                 | FutureCandidate
                 | IteratorCandidate
diff --git a/compiler/rustc_ty_utils/src/abi.rs b/compiler/rustc_ty_utils/src/abi.rs
index af26616831f..8d1bdec6684 100644
--- a/compiler/rustc_ty_utils/src/abi.rs
+++ b/compiler/rustc_ty_utils/src/abi.rs
@@ -101,6 +101,42 @@ fn fn_sig_for_fn_abi<'tcx>(
                 bound_vars,
             )
         }
+        ty::CoroutineClosure(def_id, args) => {
+            let sig = args.as_coroutine_closure().coroutine_closure_sig();
+            let bound_vars = tcx.mk_bound_variable_kinds_from_iter(
+                sig.bound_vars().iter().chain(iter::once(ty::BoundVariableKind::Region(ty::BrEnv))),
+            );
+            let br = ty::BoundRegion {
+                var: ty::BoundVar::from_usize(bound_vars.len() - 1),
+                kind: ty::BoundRegionKind::BrEnv,
+            };
+            let env_region = ty::Region::new_bound(tcx, ty::INNERMOST, br);
+            let env_ty = tcx.closure_env_ty(
+                Ty::new_coroutine_closure(tcx, def_id, args),
+                args.as_coroutine_closure().kind(),
+                env_region,
+            );
+
+            let sig = sig.skip_binder();
+            ty::Binder::bind_with_vars(
+                tcx.mk_fn_sig(
+                    iter::once(env_ty).chain([sig.tupled_inputs_ty]),
+                    sig.to_coroutine_given_kind_and_upvars(
+                        tcx,
+                        args.as_coroutine_closure().parent_args(),
+                        tcx.coroutine_for_closure(def_id),
+                        args.as_coroutine_closure().kind(),
+                        env_region,
+                        args.as_coroutine_closure().tupled_upvars_ty(),
+                        args.as_coroutine_closure().coroutine_captures_by_ref_ty(),
+                    ),
+                    sig.c_variadic,
+                    sig.unsafety,
+                    sig.abi,
+                ),
+                bound_vars,
+            )
+        }
         ty::Coroutine(did, args) => {
             let coroutine_kind = tcx.coroutine_kind(did).unwrap();
             let sig = args.as_coroutine().sig();
diff --git a/compiler/rustc_ty_utils/src/instance.rs b/compiler/rustc_ty_utils/src/instance.rs
index a5a2c53c732..50d27ed4ced 100644
--- a/compiler/rustc_ty_utils/src/instance.rs
+++ b/compiler/rustc_ty_utils/src/instance.rs
@@ -38,6 +38,7 @@ fn resolve_instance<'tcx>(
                 debug!(" => nontrivial drop glue");
                 match *ty.kind() {
                     ty::Closure(..)
+                    | ty::CoroutineClosure(..)
                     | ty::Coroutine(..)
                     | ty::Tuple(..)
                     | ty::Adt(..)
@@ -282,6 +283,16 @@ fn resolve_associated_item<'tcx>(
                         tcx.item_name(trait_item_id)
                     ),
                 }
+            } else if tcx.async_fn_trait_kind_from_def_id(trait_ref.def_id).is_some() {
+                match *rcvr_args.type_at(0).kind() {
+                    ty::CoroutineClosure(closure_def_id, args) => {
+                        Some(Instance::new(closure_def_id, args))
+                    }
+                    _ => bug!(
+                        "no built-in definition for `{trait_ref}::{}` for non-lending-closure type",
+                        tcx.item_name(trait_item_id)
+                    ),
+                }
             } else {
                 Instance::try_resolve_item_for_coroutine(tcx, trait_item_id, trait_id, rcvr_args)
             }
diff --git a/library/core/src/ops/async_function.rs b/library/core/src/ops/async_function.rs
index 965873f163e..b11d5643990 100644
--- a/library/core/src/ops/async_function.rs
+++ b/library/core/src/ops/async_function.rs
@@ -106,3 +106,11 @@ mod impls {
         }
     }
 }
+
+mod internal_implementation_detail {
+    // TODO: needs a detailed explanation
+    #[cfg_attr(not(bootstrap), lang = "async_fn_kind_helper")]
+    trait AsyncFnKindHelper<GoalKind> {
+        type Assoc<'closure_env, Inputs, Upvars, BorrowedUpvarsAsFnPtr>;
+    }
+}