Skip to main content

hydro_lang/compile/
embedded.rs

1//! "Embedded" deployment backend for Hydro.
2//!
3//! Instead of compiling each location into a standalone binary, this backend generates
4//! a Rust source file containing one function per location. Each function returns a
5//! `dfir_rs::scheduled::graph::Dfir` that can be manually driven by the caller.
6//!
7//! This is useful when you want full control over where and how the projected DFIR
8//! code runs (e.g. embedding it into an existing application).
9//!
10//! # Limitations
11//!
12//! Networking is **not** supported. All `Deploy` networking trait methods will panic
13//! if called. Only pure local computations (with data embedded in the Hydro program)
14//! are supported.
15
16use std::future::Future;
17use std::io::Error;
18use std::pin::Pin;
19
20use bytes::{Bytes, BytesMut};
21use dfir_lang::diagnostic::Diagnostics;
22use dfir_lang::graph::DfirGraph;
23use futures::{Sink, Stream};
24use proc_macro2::Span;
25use quote::quote;
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use stageleft::{QuotedWithContext, q};
29
30use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
31use crate::compile::builder::ExternalPortId;
32use crate::location::dynamic::LocationId;
33use crate::location::member_id::TaglessMemberId;
34use crate::location::{LocationKey, MembershipEvent, NetworkHint};
35
36/// Marker type for the embedded deployment backend.
37///
38/// All networking methods panic — this backend only supports pure local computation.
39pub enum EmbeddedDeploy {}
40
41/// A trivial node type for embedded deployment. Stores a user-provided function name.
42#[derive(Clone)]
43pub struct EmbeddedNode {
44    /// The function name to use in the generated code for this location.
45    pub fn_name: String,
46}
47
48impl Node for EmbeddedNode {
49    type Port = ();
50    type Meta = ();
51    type InstantiateEnv = EmbeddedInstantiateEnv;
52
53    fn next_port(&self) -> Self::Port {}
54
55    fn update_meta(&self, _meta: &Self::Meta) {}
56
57    fn instantiate(
58        &self,
59        _env: &mut Self::InstantiateEnv,
60        _meta: &mut Self::Meta,
61        _graph: DfirGraph,
62        _extra_stmts: &[syn::Stmt],
63        _sidecars: &[syn::Expr],
64    ) {
65        // No-op: embedded mode doesn't instantiate nodes at deploy time.
66    }
67}
68
69impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
70    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
71        panic!("EmbeddedDeploy does not support external ports");
72    }
73
74    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
75    fn as_bytes_bidi(
76        &self,
77        _external_port_id: ExternalPortId,
78    ) -> impl Future<
79        Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
80    > + 'a {
81        async { panic!("EmbeddedDeploy does not support external ports") }
82    }
83
84    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
85    fn as_bincode_bidi<InT, OutT>(
86        &self,
87        _external_port_id: ExternalPortId,
88    ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
89    where
90        InT: Serialize + 'static,
91        OutT: DeserializeOwned + 'static,
92    {
93        async { panic!("EmbeddedDeploy does not support external ports") }
94    }
95
96    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97    fn as_bincode_sink<T>(
98        &self,
99        _external_port_id: ExternalPortId,
100    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
101    where
102        T: Serialize + 'static,
103    {
104        async { panic!("EmbeddedDeploy does not support external ports") }
105    }
106
107    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
108    fn as_bincode_source<T>(
109        &self,
110        _external_port_id: ExternalPortId,
111    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
112    where
113        T: DeserializeOwned + 'static,
114    {
115        async { panic!("EmbeddedDeploy does not support external ports") }
116    }
117}
118
119impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
120    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
121        EmbeddedNode {
122            fn_name: self.into(),
123        }
124    }
125}
126
127impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
128    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
129        EmbeddedNode {
130            fn_name: self.into(),
131        }
132    }
133}
134
135impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
136    fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
137        EmbeddedNode {
138            fn_name: self.into(),
139        }
140    }
141}
142
143/// Collected embedded input registrations.
144///
145/// During `compile_network`, each `EmbeddedInput` IR node registers its ident
146/// and element type here. `generate_embedded` then uses this to add parameters
147/// to the generated functions.
148#[derive(Default)]
149pub struct EmbeddedInstantiateEnv {
150    /// (ident name, element type) pairs collected during compilation.
151    pub inputs: Vec<(syn::Ident, syn::Type)>,
152}
153
154impl<'a> Deploy<'a> for EmbeddedDeploy {
155    type Meta = ();
156    type InstantiateEnv = EmbeddedInstantiateEnv;
157
158    type Process = EmbeddedNode;
159    type Cluster = EmbeddedNode;
160    type External = EmbeddedNode;
161
162    fn o2o_sink_source(
163        _p1: &Self::Process,
164        _p1_port: &(),
165        _p2: &Self::Process,
166        _p2_port: &(),
167    ) -> (syn::Expr, syn::Expr) {
168        panic!("EmbeddedDeploy does not support networking (o2o)")
169    }
170
171    fn o2o_connect(
172        _p1: &Self::Process,
173        _p1_port: &(),
174        _p2: &Self::Process,
175        _p2_port: &(),
176    ) -> Box<dyn FnOnce()> {
177        panic!("EmbeddedDeploy does not support networking (o2o)")
178    }
179
180    fn o2m_sink_source(
181        _p1: &Self::Process,
182        _p1_port: &(),
183        _c2: &Self::Cluster,
184        _c2_port: &(),
185    ) -> (syn::Expr, syn::Expr) {
186        panic!("EmbeddedDeploy does not support networking (o2m)")
187    }
188
189    fn o2m_connect(
190        _p1: &Self::Process,
191        _p1_port: &(),
192        _c2: &Self::Cluster,
193        _c2_port: &(),
194    ) -> Box<dyn FnOnce()> {
195        panic!("EmbeddedDeploy does not support networking (o2m)")
196    }
197
198    fn m2o_sink_source(
199        _c1: &Self::Cluster,
200        _c1_port: &(),
201        _p2: &Self::Process,
202        _p2_port: &(),
203    ) -> (syn::Expr, syn::Expr) {
204        panic!("EmbeddedDeploy does not support networking (m2o)")
205    }
206
207    fn m2o_connect(
208        _c1: &Self::Cluster,
209        _c1_port: &(),
210        _p2: &Self::Process,
211        _p2_port: &(),
212    ) -> Box<dyn FnOnce()> {
213        panic!("EmbeddedDeploy does not support networking (m2o)")
214    }
215
216    fn m2m_sink_source(
217        _c1: &Self::Cluster,
218        _c1_port: &(),
219        _c2: &Self::Cluster,
220        _c2_port: &(),
221    ) -> (syn::Expr, syn::Expr) {
222        panic!("EmbeddedDeploy does not support networking (m2m)")
223    }
224
225    fn m2m_connect(
226        _c1: &Self::Cluster,
227        _c1_port: &(),
228        _c2: &Self::Cluster,
229        _c2_port: &(),
230    ) -> Box<dyn FnOnce()> {
231        panic!("EmbeddedDeploy does not support networking (m2m)")
232    }
233
234    fn e2o_many_source(
235        _extra_stmts: &mut Vec<syn::Stmt>,
236        _p2: &Self::Process,
237        _p2_port: &(),
238        _codec_type: &syn::Type,
239        _shared_handle: String,
240    ) -> syn::Expr {
241        panic!("EmbeddedDeploy does not support networking (e2o)")
242    }
243
244    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
245        panic!("EmbeddedDeploy does not support networking (e2o)")
246    }
247
248    fn e2o_source(
249        _extra_stmts: &mut Vec<syn::Stmt>,
250        _p1: &Self::External,
251        _p1_port: &(),
252        _p2: &Self::Process,
253        _p2_port: &(),
254        _codec_type: &syn::Type,
255        _shared_handle: String,
256    ) -> syn::Expr {
257        panic!("EmbeddedDeploy does not support networking (e2o)")
258    }
259
260    fn e2o_connect(
261        _p1: &Self::External,
262        _p1_port: &(),
263        _p2: &Self::Process,
264        _p2_port: &(),
265        _many: bool,
266        _server_hint: NetworkHint,
267    ) -> Box<dyn FnOnce()> {
268        panic!("EmbeddedDeploy does not support networking (e2o)")
269    }
270
271    fn o2e_sink(
272        _p1: &Self::Process,
273        _p1_port: &(),
274        _p2: &Self::External,
275        _p2_port: &(),
276        _shared_handle: String,
277    ) -> syn::Expr {
278        panic!("EmbeddedDeploy does not support networking (o2e)")
279    }
280
281    #[expect(
282        unreachable_code,
283        reason = "panic before q! which is only for return type"
284    )]
285    fn cluster_ids(
286        _of_cluster: LocationKey,
287    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
288        panic!("EmbeddedDeploy does not support cluster IDs");
289        q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
290    }
291
292    #[expect(
293        unreachable_code,
294        reason = "panic before q! which is only for return type"
295    )]
296    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
297        panic!("EmbeddedDeploy does not support cluster self ID");
298        q!(unreachable!(
299            "EmbeddedDeploy does not support cluster self ID"
300        ))
301    }
302
303    #[expect(
304        unreachable_code,
305        reason = "panic before q! which is only for return type"
306    )]
307    fn cluster_membership_stream(
308        _location_id: &LocationId,
309    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
310    {
311        panic!("EmbeddedDeploy does not support cluster membership streams");
312        q!(unreachable!(
313            "EmbeddedDeploy does not support cluster membership streams"
314        ))
315    }
316
317    fn register_embedded_input(
318        env: &mut Self::InstantiateEnv,
319        ident: &syn::Ident,
320        element_type: &syn::Type,
321    ) {
322        env.inputs.push((ident.clone(), element_type.clone()));
323    }
324}
325
326impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
327    /// Generates a `syn::File` containing one function per location in the flow.
328    ///
329    /// Each generated function has the signature:
330    /// ```ignore
331    /// pub fn <fn_name>() -> dfir_rs::scheduled::graph::Dfir<'static>
332    /// ```
333    /// where `fn_name` is the `String` passed to `with_process` / `with_cluster`.
334    ///
335    /// The returned `Dfir` can be manually executed by the caller.
336    ///
337    /// # Arguments
338    ///
339    /// * `crate_name` — the name of the crate containing the Hydro program (used for stageleft
340    ///   re-exports). Hyphens will be replaced with underscores.
341    ///
342    /// # Usage
343    ///
344    /// Typically called from a `build.rs` in a wrapper crate:
345    /// ```ignore
346    /// // build.rs
347    /// let deploy = flow.with_process(&process, "my_fn".to_string());
348    /// let code = deploy.generate_embedded("my_hydro_crate");
349    /// let out_dir = std::env::var("OUT_DIR").unwrap();
350    /// std::fs::write(format!("{out_dir}/embedded.rs"), prettyplease::unparse(&code)).unwrap();
351    /// ```
352    ///
353    /// Then in `lib.rs`:
354    /// ```ignore
355    /// include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
356    /// ```
357    pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
358        let mut env = EmbeddedInstantiateEnv::default();
359        let compiled = self.compile_internal(&mut env);
360
361        // Sort inputs by name for deterministic output.
362        let mut inputs = env.inputs;
363        inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
364
365        let root = crate::staging_util::get_this_crate();
366        let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
367
368        let mut functions: Vec<syn::Item> = Vec::new();
369
370        // Sort location keys for deterministic output.
371        let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
372        location_keys.sort();
373
374        // Build the input parameters for each generated function.
375        let input_params: Vec<proc_macro2::TokenStream> = inputs
376            .iter()
377            .map(|(ident, element_type)| {
378                quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
379            })
380            .collect();
381
382        for location_key in location_keys {
383            let graph = &compiled.all_dfir()[location_key];
384
385            // Get the user-provided function name from the node.
386            let fn_name = self
387                .processes
388                .get(location_key)
389                .map(|n| &n.fn_name)
390                .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
391                .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
392                .expect("location key not found in any node map");
393
394            let fn_ident = syn::Ident::new(fn_name, Span::call_site());
395
396            let mut diagnostics = Diagnostics::new();
397            let dfir_tokens = graph
398                .as_code(&quote! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
399                .expect("DFIR code generation failed with diagnostics.");
400
401            let func: syn::Item = syn::parse_quote! {
402                #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
403                pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
404                    #dfir_tokens
405                }
406            };
407            functions.push(func);
408        }
409
410        syn::parse_quote! {
411            use #orig_crate_name::__staged::__deps::*;
412            use #root::prelude::*;
413            use #root::runtime_support::dfir_rs as __root_dfir_rs;
414            pub use #orig_crate_name::__staged;
415
416            #( #functions )*
417        }
418    }
419}