Skip to main content

hydro_lang/compile/
deploy.rs

1use std::collections::HashMap;
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23    ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31    D: Deploy<'a>,
32{
33    pub(super) ir: Vec<HydroRoot>,
34
35    pub(super) locations: SlotMap<LocationKey, LocationType>,
36    pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38    /// Deployed instances of each process in the flow
39    pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40    pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41    pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43    /// Sidecars which may be added to each location (process or cluster, not externals).
44    /// See [`crate::telemetry::Sidecar`].
45    pub(super) sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>>,
46
47    /// Application name used in telemetry.
48    pub(super) flow_name: String,
49
50    pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54    pub fn ir(&self) -> &Vec<HydroRoot> {
55        &self.ir
56    }
57
58    /// Application name used in telemetry.
59    pub fn flow_name(&self) -> &str {
60        &self.flow_name
61    }
62
63    pub fn with_process<P>(
64        mut self,
65        process: &Process<P>,
66        spec: impl IntoProcessSpec<'a, D>,
67    ) -> Self {
68        self.processes.insert(
69            process.key,
70            spec.into_process_spec()
71                .build(process.key, &self.location_names[process.key]),
72        );
73        self
74    }
75
76    /// TODO(mingwei): unstable API
77    #[doc(hidden)]
78    pub fn with_process_erased(
79        mut self,
80        process_loc_key: LocationKey,
81        spec: impl IntoProcessSpec<'a, D>,
82    ) -> Self {
83        assert_eq!(
84            Some(&LocationType::Process),
85            self.locations.get(process_loc_key),
86            "No process with the given `LocationKey` was found."
87        );
88        self.processes.insert(
89            process_loc_key,
90            spec.into_process_spec()
91                .build(process_loc_key, &self.location_names[process_loc_key]),
92        );
93        self
94    }
95
96    pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
97        mut self,
98        spec: impl Fn() -> S,
99    ) -> Self {
100        for (location_key, &location_type) in self.locations.iter() {
101            if LocationType::Process == location_type {
102                self.processes
103                    .entry(location_key)
104                    .expect("location was removed")
105                    .or_insert_with(|| {
106                        spec()
107                            .into_process_spec()
108                            .build(location_key, &self.location_names[location_key])
109                    });
110            }
111        }
112        self
113    }
114
115    pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
116        self.clusters.insert(
117            cluster.key,
118            spec.build(cluster.key, &self.location_names[cluster.key]),
119        );
120        self
121    }
122
123    /// TODO(mingwei): unstable API
124    #[doc(hidden)]
125    pub fn with_cluster_erased(
126        mut self,
127        cluster_loc_key: LocationKey,
128        spec: impl ClusterSpec<'a, D>,
129    ) -> Self {
130        assert_eq!(
131            Some(&LocationType::Cluster),
132            self.locations.get(cluster_loc_key),
133            "No cluster with the given `LocationKey` was found."
134        );
135        self.clusters.insert(
136            cluster_loc_key,
137            spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
138        );
139        self
140    }
141
142    pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
143        mut self,
144        spec: impl Fn() -> S,
145    ) -> Self {
146        for (location_key, &location_type) in self.locations.iter() {
147            if LocationType::Cluster == location_type {
148                self.clusters
149                    .entry(location_key)
150                    .expect("location was removed")
151                    .or_insert_with(|| {
152                        spec().build(location_key, &self.location_names[location_key])
153                    });
154            }
155        }
156        self
157    }
158
159    pub fn with_external<P>(
160        mut self,
161        external: &External<P>,
162        spec: impl ExternalSpec<'a, D>,
163    ) -> Self {
164        self.externals.insert(
165            external.key,
166            spec.build(external.key, &self.location_names[external.key]),
167        );
168        self
169    }
170
171    pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
172        mut self,
173        spec: impl Fn() -> S,
174    ) -> Self {
175        for (location_key, &location_type) in self.locations.iter() {
176            if LocationType::External == location_type {
177                self.externals
178                    .entry(location_key)
179                    .expect("location was removed")
180                    .or_insert_with(|| {
181                        spec().build(location_key, &self.location_names[location_key])
182                    });
183            }
184        }
185        self
186    }
187
188    /// Adds a [`Sidecar`] to all processes and clusters in the flow.
189    pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
190        for (location_key, &location_type) in self.locations.iter() {
191            if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
192                continue;
193            }
194
195            let location_name = &self.location_names[location_key];
196
197            let sidecar = sidecar.to_expr(
198                self.flow_name(),
199                location_key,
200                location_type,
201                location_name,
202                &quote::format_ident!("{}", super::DFIR_IDENT),
203            );
204            self.sidecars
205                .entry(location_key)
206                .expect("location was removed")
207                .or_default()
208                .push(sidecar);
209        }
210
211        self
212    }
213
214    /// Adds a [`Sidecar`] to the given location.
215    pub fn with_sidecar_internal(
216        mut self,
217        location_key: LocationKey,
218        sidecar: &impl Sidecar,
219    ) -> Self {
220        let location_type = self.locations[location_key];
221        let location_name = &self.location_names[location_key];
222        let sidecar = sidecar.to_expr(
223            self.flow_name(),
224            location_key,
225            location_type,
226            location_name,
227            &quote::format_ident!("{}", super::DFIR_IDENT),
228        );
229        self.sidecars
230            .entry(location_key)
231            .expect("location was removed")
232            .or_default()
233            .push(sidecar);
234        self
235    }
236
237    /// Adds a [`Sidecar`] to a specific process in the flow.
238    pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
239        self.with_sidecar_internal(process.key, sidecar)
240    }
241
242    /// Adds a [`Sidecar`] to a specific cluster in the flow.
243    pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
244        self.with_sidecar_internal(cluster.key, sidecar)
245    }
246
247    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) without networking.
248    /// Useful for generating Mermaid diagrams of the DFIR.
249    ///
250    /// (This returned DFIR will not compile due to the networking missing).
251    pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
252        // NOTE: `build_inner` does not actually mutate the IR, but `&mut` is required
253        // only because the shared traversal logic requires it
254        CompiledFlow {
255            dfir: build_inner::<D>(&mut self.ir),
256            extra_stmts: SparseSecondaryMap::new(),
257            sidecars: SparseSecondaryMap::new(),
258            _phantom: PhantomData,
259        }
260    }
261
262    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking.
263    ///
264    /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR).
265    pub fn compile(mut self) -> CompiledFlow<'a>
266    where
267        D: Deploy<'a, InstantiateEnv = ()>,
268    {
269        self.compile_internal(&mut ())
270    }
271
272    /// Same as [`Self::compile`] but does not invalidate `self`, for internal use.
273    ///
274    /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state.
275    pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
276        let mut seen_tees: HashMap<_, _> = HashMap::new();
277        let mut extra_stmts = SparseSecondaryMap::new();
278        for leaf in self.ir.iter_mut() {
279            leaf.compile_network::<D>(
280                &mut extra_stmts,
281                &mut seen_tees,
282                &self.processes,
283                &self.clusters,
284                &self.externals,
285                env,
286            );
287        }
288
289        CompiledFlow {
290            dfir: build_inner::<D>(&mut self.ir),
291            extra_stmts,
292            sidecars: std::mem::take(&mut self.sidecars),
293            _phantom: PhantomData,
294        }
295    }
296
297    /// Creates the variables for cluster IDs and adds them into `extra_stmts`.
298    fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
299        #[expect(
300            clippy::disallowed_methods,
301            reason = "nondeterministic iteration order, will be sorted"
302        )]
303        let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
304        all_clusters_sorted.sort();
305
306        for cluster_key in all_clusters_sorted {
307            let self_id_ident = syn::Ident::new(
308                &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
309                Span::call_site(),
310            );
311            let self_id_expr = D::cluster_self_id().splice_untyped();
312            extra_stmts
313                .entry(cluster_key)
314                .expect("location was removed")
315                .or_default()
316                .push(syn::parse_quote! {
317                    let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
318                });
319
320            let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
321                self.processes.contains_key(location_key)
322                    || self.clusters.contains_key(location_key)
323            });
324            for other_location in process_cluster_locations {
325                let other_id_ident = syn::Ident::new(
326                    &format!("__hydro_lang_cluster_ids_{}", cluster_key),
327                    Span::call_site(),
328                );
329                let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
330                extra_stmts
331                    .entry(other_location)
332                    .expect("location was removed")
333                    .or_default()
334                    .push(syn::parse_quote! {
335                        let #other_id_ident = #other_id_expr;
336                    });
337            }
338        }
339    }
340
341    /// Compiles and deploys the flow.
342    ///
343    /// Rough outline of steps:
344    /// * Compiles the Hydro into DFIR.
345    /// * Instantiates nodes as configured.
346    /// * Compiles the corresponding DFIR into binaries for nodes as needed.
347    /// * Connects up networking as needed.
348    #[must_use]
349    pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
350        let CompiledFlow {
351            dfir,
352            mut extra_stmts,
353            mut sidecars,
354            _phantom,
355        } = self.compile_internal(env);
356
357        let mut compiled = dfir;
358        self.cluster_id_stmts(&mut extra_stmts);
359        let mut meta = D::Meta::default();
360
361        let (processes, clusters, externals) = (
362            self.processes
363                .into_iter()
364                .filter(|&(node_key, ref node)| {
365                    if let Some(ir) = compiled.remove(node_key) {
366                        node.instantiate(
367                            env,
368                            &mut meta,
369                            ir,
370                            extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
371                            sidecars.remove(node_key).as_deref().unwrap_or_default(),
372                        );
373                        true
374                    } else {
375                        false
376                    }
377                })
378                .collect::<SparseSecondaryMap<_, _>>(),
379            self.clusters
380                .into_iter()
381                .filter(|&(cluster_key, ref cluster)| {
382                    if let Some(ir) = compiled.remove(cluster_key) {
383                        cluster.instantiate(
384                            env,
385                            &mut meta,
386                            ir,
387                            extra_stmts
388                                .remove(cluster_key)
389                                .as_deref()
390                                .unwrap_or_default(),
391                            sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
392                        );
393                        true
394                    } else {
395                        false
396                    }
397                })
398                .collect::<SparseSecondaryMap<_, _>>(),
399            self.externals
400                .into_iter()
401                .inspect(|&(external_key, ref external)| {
402                    assert!(!extra_stmts.contains_key(external_key));
403                    assert!(!sidecars.contains_key(external_key));
404                    external.instantiate(env, &mut meta, Default::default(), &[], &[]);
405                })
406                .collect::<SparseSecondaryMap<_, _>>(),
407        );
408
409        for location_key in self.locations.keys() {
410            if let Some(node) = processes.get(location_key) {
411                node.update_meta(&meta);
412            } else if let Some(cluster) = clusters.get(location_key) {
413                cluster.update_meta(&meta);
414            } else if let Some(external) = externals.get(location_key) {
415                external.update_meta(&meta);
416            }
417        }
418
419        let mut seen_tees_connect = HashMap::new();
420        for leaf in self.ir.iter_mut() {
421            leaf.connect_network(&mut seen_tees_connect);
422        }
423
424        DeployResult {
425            location_names: self.location_names,
426            processes,
427            clusters,
428            externals,
429        }
430    }
431}
432
433pub struct DeployResult<'a, D: Deploy<'a>> {
434    location_names: SecondaryMap<LocationKey, String>,
435    processes: SparseSecondaryMap<LocationKey, D::Process>,
436    clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
437    externals: SparseSecondaryMap<LocationKey, D::External>,
438}
439
440impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
441    pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
442        let LocationId::Process(location_key) = p.id() else {
443            panic!("Process ID expected")
444        };
445        self.processes.get(location_key).unwrap()
446    }
447
448    pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
449        let LocationId::Cluster(location_key) = c.id() else {
450            panic!("Cluster ID expected")
451        };
452        self.clusters.get(location_key).unwrap()
453    }
454
455    pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
456        self.externals.get(e.key).unwrap()
457    }
458
459    pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
460        self.location_names
461            .iter()
462            .filter_map(|(location_key, location_name)| {
463                self.processes
464                    .get(location_key)
465                    .map(|process| (LocationId::Process(location_key), &**location_name, process))
466            })
467    }
468
469    pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
470        self.location_names
471            .iter()
472            .filter_map(|(location_key, location_name)| {
473                self.clusters
474                    .get(location_key)
475                    .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
476            })
477    }
478
479    #[deprecated(note = "use `connect` instead")]
480    pub async fn connect_bytes<M>(
481        &self,
482        port: ExternalBytesPort<M>,
483    ) -> (
484        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
485        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
486    ) {
487        self.connect(port).await
488    }
489
490    #[deprecated(note = "use `connect` instead")]
491    pub async fn connect_sink_bytes<M>(
492        &self,
493        port: ExternalBytesPort<M>,
494    ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
495        self.connect(port).await.1
496    }
497
498    pub async fn connect_bincode<
499        InT: Serialize + 'static,
500        OutT: DeserializeOwned + 'static,
501        Many,
502    >(
503        &self,
504        port: ExternalBincodeBidi<InT, OutT, Many>,
505    ) -> (
506        Pin<Box<dyn Stream<Item = OutT>>>,
507        Pin<Box<dyn Sink<InT, Error = Error>>>,
508    ) {
509        self.externals
510            .get(port.process_key)
511            .unwrap()
512            .as_bincode_bidi(port.port_id)
513            .await
514    }
515
516    #[deprecated(note = "use `connect` instead")]
517    pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
518        &self,
519        port: ExternalBincodeSink<T, Many>,
520    ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
521        self.connect(port).await
522    }
523
524    #[deprecated(note = "use `connect` instead")]
525    pub async fn connect_source_bytes(
526        &self,
527        port: ExternalBytesPort,
528    ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
529        self.connect(port).await.0
530    }
531
532    #[deprecated(note = "use `connect` instead")]
533    pub async fn connect_source_bincode<
534        T: Serialize + DeserializeOwned + 'static,
535        O: Ordering,
536        R: Retries,
537    >(
538        &self,
539        port: ExternalBincodeStream<T, O, R>,
540    ) -> Pin<Box<dyn Stream<Item = T>>> {
541        self.connect(port).await
542    }
543
544    pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
545        &'b self,
546        port: P,
547    ) -> <P as ConnectableAsync<&'b Self>>::Output {
548        port.connect(self).await
549    }
550}
551
552#[cfg(stageleft_runtime)]
553#[cfg(feature = "deploy")]
554#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
555impl DeployResult<'_, crate::deploy::HydroDeploy> {
556    /// Get the raw port handle.
557    pub fn raw_port<M>(
558        &self,
559        port: ExternalBytesPort<M>,
560    ) -> hydro_deploy::custom_service::CustomClientPort {
561        self.externals
562            .get(port.process_key)
563            .unwrap()
564            .raw_port(port.port_id)
565    }
566}
567
568pub trait ConnectableAsync<Ctx> {
569    type Output;
570
571    fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
572}
573
574impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
575    type Output = (
576        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
577        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
578    );
579
580    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
581        ctx.externals
582            .get(self.process_key)
583            .unwrap()
584            .as_bytes_bidi(self.port_id)
585            .await
586    }
587}
588
589impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
590    ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
591{
592    type Output = Pin<Box<dyn Stream<Item = T>>>;
593
594    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
595        ctx.externals
596            .get(self.process_key)
597            .unwrap()
598            .as_bincode_source(self.port_id)
599            .await
600    }
601}
602
603impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
604    for ExternalBincodeSink<T, Many>
605{
606    type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
607
608    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
609        ctx.externals
610            .get(self.process_key)
611            .unwrap()
612            .as_bincode_sink(self.port_id)
613            .await
614    }
615}