1use std::collections::HashMap;
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23 ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31 D: Deploy<'a>,
32{
33 pub(super) ir: Vec<HydroRoot>,
34
35 pub(super) locations: SlotMap<LocationKey, LocationType>,
36 pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38 pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40 pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41 pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43 pub(super) sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>>,
46
47 pub(super) flow_name: String,
49
50 pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54 pub fn ir(&self) -> &Vec<HydroRoot> {
55 &self.ir
56 }
57
58 pub fn flow_name(&self) -> &str {
60 &self.flow_name
61 }
62
63 pub fn with_process<P>(
64 mut self,
65 process: &Process<P>,
66 spec: impl IntoProcessSpec<'a, D>,
67 ) -> Self {
68 self.processes.insert(
69 process.key,
70 spec.into_process_spec()
71 .build(process.key, &self.location_names[process.key]),
72 );
73 self
74 }
75
76 #[doc(hidden)]
78 pub fn with_process_erased(
79 mut self,
80 process_loc_key: LocationKey,
81 spec: impl IntoProcessSpec<'a, D>,
82 ) -> Self {
83 assert_eq!(
84 Some(&LocationType::Process),
85 self.locations.get(process_loc_key),
86 "No process with the given `LocationKey` was found."
87 );
88 self.processes.insert(
89 process_loc_key,
90 spec.into_process_spec()
91 .build(process_loc_key, &self.location_names[process_loc_key]),
92 );
93 self
94 }
95
96 pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
97 mut self,
98 spec: impl Fn() -> S,
99 ) -> Self {
100 for (location_key, &location_type) in self.locations.iter() {
101 if LocationType::Process == location_type {
102 self.processes
103 .entry(location_key)
104 .expect("location was removed")
105 .or_insert_with(|| {
106 spec()
107 .into_process_spec()
108 .build(location_key, &self.location_names[location_key])
109 });
110 }
111 }
112 self
113 }
114
115 pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
116 self.clusters.insert(
117 cluster.key,
118 spec.build(cluster.key, &self.location_names[cluster.key]),
119 );
120 self
121 }
122
123 #[doc(hidden)]
125 pub fn with_cluster_erased(
126 mut self,
127 cluster_loc_key: LocationKey,
128 spec: impl ClusterSpec<'a, D>,
129 ) -> Self {
130 assert_eq!(
131 Some(&LocationType::Cluster),
132 self.locations.get(cluster_loc_key),
133 "No cluster with the given `LocationKey` was found."
134 );
135 self.clusters.insert(
136 cluster_loc_key,
137 spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
138 );
139 self
140 }
141
142 pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
143 mut self,
144 spec: impl Fn() -> S,
145 ) -> Self {
146 for (location_key, &location_type) in self.locations.iter() {
147 if LocationType::Cluster == location_type {
148 self.clusters
149 .entry(location_key)
150 .expect("location was removed")
151 .or_insert_with(|| {
152 spec().build(location_key, &self.location_names[location_key])
153 });
154 }
155 }
156 self
157 }
158
159 pub fn with_external<P>(
160 mut self,
161 external: &External<P>,
162 spec: impl ExternalSpec<'a, D>,
163 ) -> Self {
164 self.externals.insert(
165 external.key,
166 spec.build(external.key, &self.location_names[external.key]),
167 );
168 self
169 }
170
171 pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
172 mut self,
173 spec: impl Fn() -> S,
174 ) -> Self {
175 for (location_key, &location_type) in self.locations.iter() {
176 if LocationType::External == location_type {
177 self.externals
178 .entry(location_key)
179 .expect("location was removed")
180 .or_insert_with(|| {
181 spec().build(location_key, &self.location_names[location_key])
182 });
183 }
184 }
185 self
186 }
187
188 pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
190 for (location_key, &location_type) in self.locations.iter() {
191 if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
192 continue;
193 }
194
195 let location_name = &self.location_names[location_key];
196
197 let sidecar = sidecar.to_expr(
198 self.flow_name(),
199 location_key,
200 location_type,
201 location_name,
202 "e::format_ident!("{}", super::DFIR_IDENT),
203 );
204 self.sidecars
205 .entry(location_key)
206 .expect("location was removed")
207 .or_default()
208 .push(sidecar);
209 }
210
211 self
212 }
213
214 pub fn with_sidecar_internal(
216 mut self,
217 location_key: LocationKey,
218 sidecar: &impl Sidecar,
219 ) -> Self {
220 let location_type = self.locations[location_key];
221 let location_name = &self.location_names[location_key];
222 let sidecar = sidecar.to_expr(
223 self.flow_name(),
224 location_key,
225 location_type,
226 location_name,
227 "e::format_ident!("{}", super::DFIR_IDENT),
228 );
229 self.sidecars
230 .entry(location_key)
231 .expect("location was removed")
232 .or_default()
233 .push(sidecar);
234 self
235 }
236
237 pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
239 self.with_sidecar_internal(process.key, sidecar)
240 }
241
242 pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
244 self.with_sidecar_internal(cluster.key, sidecar)
245 }
246
247 pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
252 CompiledFlow {
255 dfir: build_inner::<D>(&mut self.ir),
256 extra_stmts: SparseSecondaryMap::new(),
257 sidecars: SparseSecondaryMap::new(),
258 _phantom: PhantomData,
259 }
260 }
261
262 pub fn compile(mut self) -> CompiledFlow<'a>
266 where
267 D: Deploy<'a, InstantiateEnv = ()>,
268 {
269 self.compile_internal(&mut ())
270 }
271
272 pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
276 let mut seen_tees: HashMap<_, _> = HashMap::new();
277 let mut extra_stmts = SparseSecondaryMap::new();
278 for leaf in self.ir.iter_mut() {
279 leaf.compile_network::<D>(
280 &mut extra_stmts,
281 &mut seen_tees,
282 &self.processes,
283 &self.clusters,
284 &self.externals,
285 env,
286 );
287 }
288
289 CompiledFlow {
290 dfir: build_inner::<D>(&mut self.ir),
291 extra_stmts,
292 sidecars: std::mem::take(&mut self.sidecars),
293 _phantom: PhantomData,
294 }
295 }
296
297 fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
299 #[expect(
300 clippy::disallowed_methods,
301 reason = "nondeterministic iteration order, will be sorted"
302 )]
303 let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
304 all_clusters_sorted.sort();
305
306 for cluster_key in all_clusters_sorted {
307 let self_id_ident = syn::Ident::new(
308 &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
309 Span::call_site(),
310 );
311 let self_id_expr = D::cluster_self_id().splice_untyped();
312 extra_stmts
313 .entry(cluster_key)
314 .expect("location was removed")
315 .or_default()
316 .push(syn::parse_quote! {
317 let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
318 });
319
320 let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
321 self.processes.contains_key(location_key)
322 || self.clusters.contains_key(location_key)
323 });
324 for other_location in process_cluster_locations {
325 let other_id_ident = syn::Ident::new(
326 &format!("__hydro_lang_cluster_ids_{}", cluster_key),
327 Span::call_site(),
328 );
329 let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
330 extra_stmts
331 .entry(other_location)
332 .expect("location was removed")
333 .or_default()
334 .push(syn::parse_quote! {
335 let #other_id_ident = #other_id_expr;
336 });
337 }
338 }
339 }
340
341 #[must_use]
349 pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
350 let CompiledFlow {
351 dfir,
352 mut extra_stmts,
353 mut sidecars,
354 _phantom,
355 } = self.compile_internal(env);
356
357 let mut compiled = dfir;
358 self.cluster_id_stmts(&mut extra_stmts);
359 let mut meta = D::Meta::default();
360
361 let (processes, clusters, externals) = (
362 self.processes
363 .into_iter()
364 .filter(|&(node_key, ref node)| {
365 if let Some(ir) = compiled.remove(node_key) {
366 node.instantiate(
367 env,
368 &mut meta,
369 ir,
370 extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
371 sidecars.remove(node_key).as_deref().unwrap_or_default(),
372 );
373 true
374 } else {
375 false
376 }
377 })
378 .collect::<SparseSecondaryMap<_, _>>(),
379 self.clusters
380 .into_iter()
381 .filter(|&(cluster_key, ref cluster)| {
382 if let Some(ir) = compiled.remove(cluster_key) {
383 cluster.instantiate(
384 env,
385 &mut meta,
386 ir,
387 extra_stmts
388 .remove(cluster_key)
389 .as_deref()
390 .unwrap_or_default(),
391 sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
392 );
393 true
394 } else {
395 false
396 }
397 })
398 .collect::<SparseSecondaryMap<_, _>>(),
399 self.externals
400 .into_iter()
401 .inspect(|&(external_key, ref external)| {
402 assert!(!extra_stmts.contains_key(external_key));
403 assert!(!sidecars.contains_key(external_key));
404 external.instantiate(env, &mut meta, Default::default(), &[], &[]);
405 })
406 .collect::<SparseSecondaryMap<_, _>>(),
407 );
408
409 for location_key in self.locations.keys() {
410 if let Some(node) = processes.get(location_key) {
411 node.update_meta(&meta);
412 } else if let Some(cluster) = clusters.get(location_key) {
413 cluster.update_meta(&meta);
414 } else if let Some(external) = externals.get(location_key) {
415 external.update_meta(&meta);
416 }
417 }
418
419 let mut seen_tees_connect = HashMap::new();
420 for leaf in self.ir.iter_mut() {
421 leaf.connect_network(&mut seen_tees_connect);
422 }
423
424 DeployResult {
425 location_names: self.location_names,
426 processes,
427 clusters,
428 externals,
429 }
430 }
431}
432
433pub struct DeployResult<'a, D: Deploy<'a>> {
434 location_names: SecondaryMap<LocationKey, String>,
435 processes: SparseSecondaryMap<LocationKey, D::Process>,
436 clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
437 externals: SparseSecondaryMap<LocationKey, D::External>,
438}
439
440impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
441 pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
442 let LocationId::Process(location_key) = p.id() else {
443 panic!("Process ID expected")
444 };
445 self.processes.get(location_key).unwrap()
446 }
447
448 pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
449 let LocationId::Cluster(location_key) = c.id() else {
450 panic!("Cluster ID expected")
451 };
452 self.clusters.get(location_key).unwrap()
453 }
454
455 pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
456 self.externals.get(e.key).unwrap()
457 }
458
459 pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
460 self.location_names
461 .iter()
462 .filter_map(|(location_key, location_name)| {
463 self.processes
464 .get(location_key)
465 .map(|process| (LocationId::Process(location_key), &**location_name, process))
466 })
467 }
468
469 pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
470 self.location_names
471 .iter()
472 .filter_map(|(location_key, location_name)| {
473 self.clusters
474 .get(location_key)
475 .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
476 })
477 }
478
479 #[deprecated(note = "use `connect` instead")]
480 pub async fn connect_bytes<M>(
481 &self,
482 port: ExternalBytesPort<M>,
483 ) -> (
484 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
485 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
486 ) {
487 self.connect(port).await
488 }
489
490 #[deprecated(note = "use `connect` instead")]
491 pub async fn connect_sink_bytes<M>(
492 &self,
493 port: ExternalBytesPort<M>,
494 ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
495 self.connect(port).await.1
496 }
497
498 pub async fn connect_bincode<
499 InT: Serialize + 'static,
500 OutT: DeserializeOwned + 'static,
501 Many,
502 >(
503 &self,
504 port: ExternalBincodeBidi<InT, OutT, Many>,
505 ) -> (
506 Pin<Box<dyn Stream<Item = OutT>>>,
507 Pin<Box<dyn Sink<InT, Error = Error>>>,
508 ) {
509 self.externals
510 .get(port.process_key)
511 .unwrap()
512 .as_bincode_bidi(port.port_id)
513 .await
514 }
515
516 #[deprecated(note = "use `connect` instead")]
517 pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
518 &self,
519 port: ExternalBincodeSink<T, Many>,
520 ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
521 self.connect(port).await
522 }
523
524 #[deprecated(note = "use `connect` instead")]
525 pub async fn connect_source_bytes(
526 &self,
527 port: ExternalBytesPort,
528 ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
529 self.connect(port).await.0
530 }
531
532 #[deprecated(note = "use `connect` instead")]
533 pub async fn connect_source_bincode<
534 T: Serialize + DeserializeOwned + 'static,
535 O: Ordering,
536 R: Retries,
537 >(
538 &self,
539 port: ExternalBincodeStream<T, O, R>,
540 ) -> Pin<Box<dyn Stream<Item = T>>> {
541 self.connect(port).await
542 }
543
544 pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
545 &'b self,
546 port: P,
547 ) -> <P as ConnectableAsync<&'b Self>>::Output {
548 port.connect(self).await
549 }
550}
551
552#[cfg(stageleft_runtime)]
553#[cfg(feature = "deploy")]
554#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
555impl DeployResult<'_, crate::deploy::HydroDeploy> {
556 pub fn raw_port<M>(
558 &self,
559 port: ExternalBytesPort<M>,
560 ) -> hydro_deploy::custom_service::CustomClientPort {
561 self.externals
562 .get(port.process_key)
563 .unwrap()
564 .raw_port(port.port_id)
565 }
566}
567
568pub trait ConnectableAsync<Ctx> {
569 type Output;
570
571 fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
572}
573
574impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
575 type Output = (
576 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
577 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
578 );
579
580 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
581 ctx.externals
582 .get(self.process_key)
583 .unwrap()
584 .as_bytes_bidi(self.port_id)
585 .await
586 }
587}
588
589impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
590 ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
591{
592 type Output = Pin<Box<dyn Stream<Item = T>>>;
593
594 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
595 ctx.externals
596 .get(self.process_key)
597 .unwrap()
598 .as_bincode_source(self.port_id)
599 .await
600 }
601}
602
603impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
604 for ExternalBincodeSink<T, Many>
605{
606 type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
607
608 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
609 ctx.externals
610 .get(self.process_key)
611 .unwrap()
612 .as_bincode_sink(self.port_id)
613 .await
614 }
615}