Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
34use crate::forward_handle::ForwardRef;
35#[cfg(stageleft_runtime)]
36use crate::forward_handle::{CycleCollection, ForwardHandle};
37use crate::live_collections::boundedness::{Bounded, Unbounded};
38use crate::live_collections::keyed_stream::KeyedStream;
39use crate::live_collections::singleton::Singleton;
40use crate::live_collections::stream::{
41    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
42};
43use crate::location::dynamic::LocationId;
44use crate::location::external_process::{
45    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
46};
47use crate::nondet::NonDet;
48#[cfg(feature = "sim")]
49use crate::sim::SimSender;
50use crate::staging_util::get_this_crate;
51
52pub mod dynamic;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod external_process;
56pub use external_process::External;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod process;
60pub use process::Process;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod cluster;
64pub use cluster::Cluster;
65
66#[expect(missing_docs, reason = "TODO")]
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70#[expect(missing_docs, reason = "TODO")]
71pub mod tick;
72pub use tick::{Atomic, NoTick, Tick};
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77    Joined,
78    Left,
79}
80
81#[expect(missing_docs, reason = "TODO")]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum NetworkHint {
84    Auto,
85    TcpPort(Option<u16>),
86}
87
88pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
89    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
90}
91
92#[stageleft::export(LocationKey)]
93new_key_type! {
94    /// A unique identifier for a clock tick.
95    pub struct LocationKey;
96}
97
98impl std::fmt::Display for LocationKey {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
101    }
102}
103
104/// This is used for the ECS membership stream.
105/// TODO(mingwei): Make this more robust?
106impl std::str::FromStr for LocationKey {
107    type Err = Option<ParseIntError>;
108
109    fn from_str(s: &str) -> Result<Self, Self::Err> {
110        let nvn = s.strip_prefix("loc").ok_or(None)?;
111        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
112        let idx: u64 = idx.parse()?;
113        let ver: u64 = ver.parse()?;
114        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
115    }
116}
117
118impl LocationKey {
119    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
120    /// The first location key, used by the simulator as the default external location.
121    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
122
123    /// A key for testing with index 1.
124    #[cfg(test)]
125    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
126
127    /// A key for testing with index 2.
128    #[cfg(test)]
129    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
130}
131
132/// This is used within `q!` code in docker and ECS.
133impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
134    type O = LocationKey;
135
136    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
137    where
138        Self: Sized,
139    {
140        let root = get_this_crate();
141        let n = Key::data(&self).as_ffi();
142        (
143            QuoteTokens {
144                prelude: None,
145                expr: Some(quote! {
146                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
147                }),
148            },
149            (),
150        )
151    }
152}
153
154/// A simple enum for the type of a root location.
155#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
156pub enum LocationType {
157    /// A process (single node).
158    Process,
159    /// A cluster (multiple nodes).
160    Cluster,
161    /// An external client.
162    External,
163}
164
165/// A location where data can be materialized and computation can be executed.
166///
167/// Hydro is a **global**, **distributed** programming model. This means that the data
168/// and computation in a Hydro program can be spread across multiple machines, data
169/// centers, and even continents. To achieve this, Hydro uses the concept of
170/// **locations** to keep track of _where_ data is located and computation is executed.
171///
172/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
173/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
174/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
175/// to allow live collections to be _moved_ between locations via network send/receive.
176///
177/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
178#[expect(
179    private_bounds,
180    reason = "only internal Hydro code can define location types"
181)]
182pub trait Location<'a>: dynamic::DynLocation {
183    /// The root location type for this location.
184    ///
185    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
186    /// For nested locations like [`Tick`], this is the root location that contains it.
187    type Root: Location<'a>;
188
189    /// Returns the root location for this location.
190    ///
191    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
192    /// For nested locations like [`Tick`], this returns the root location that contains it.
193    fn root(&self) -> Self::Root;
194
195    /// Attempts to create a new [`Tick`] clock domain at this location.
196    ///
197    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
198    /// or `None` if this location is already inside a tick (nested ticks are not supported).
199    ///
200    /// Prefer using [`Location::tick`] when you know the location is top-level.
201    fn try_tick(&self) -> Option<Tick<Self>> {
202        if Self::is_top_level() {
203            let next_id = self.flow_state().borrow_mut().next_clock_id;
204            self.flow_state().borrow_mut().next_clock_id += 1;
205            Some(Tick {
206                id: next_id,
207                l: self.clone(),
208            })
209        } else {
210            None
211        }
212    }
213
214    /// Returns the unique identifier for this location.
215    fn id(&self) -> LocationId {
216        dynamic::DynLocation::id(self)
217    }
218
219    /// Creates a new [`Tick`] clock domain at this location.
220    ///
221    /// A tick represents a logical clock that can be used to batch streaming data
222    /// into discrete time steps. This is useful for implementing iterative algorithms
223    /// or for synchronizing data across multiple streams.
224    ///
225    /// # Example
226    /// ```rust
227    /// # #[cfg(feature = "deploy")] {
228    /// # use hydro_lang::prelude::*;
229    /// # use futures::StreamExt;
230    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
231    /// let tick = process.tick();
232    /// let inside_tick = process
233    ///     .source_iter(q!(vec![1, 2, 3, 4]))
234    ///     .batch(&tick, nondet!(/** test */));
235    /// inside_tick.all_ticks()
236    /// # }, |mut stream| async move {
237    /// // 1, 2, 3, 4
238    /// # for w in vec![1, 2, 3, 4] {
239    /// #     assert_eq!(stream.next().await.unwrap(), w);
240    /// # }
241    /// # }));
242    /// # }
243    /// ```
244    fn tick(&self) -> Tick<Self>
245    where
246        Self: NoTick,
247    {
248        let next_id = self.flow_state().borrow_mut().next_clock_id;
249        self.flow_state().borrow_mut().next_clock_id += 1;
250        Tick {
251            id: next_id,
252            l: self.clone(),
253        }
254    }
255
256    /// Creates an unbounded stream that continuously emits unit values `()`.
257    ///
258    /// This is useful for driving computations that need to run continuously,
259    /// such as polling or heartbeat mechanisms.
260    ///
261    /// # Example
262    /// ```rust
263    /// # #[cfg(feature = "deploy")] {
264    /// # use hydro_lang::prelude::*;
265    /// # use futures::StreamExt;
266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267    /// let tick = process.tick();
268    /// process.spin()
269    ///     .batch(&tick, nondet!(/** test */))
270    ///     .map(q!(|_| 42))
271    ///     .all_ticks()
272    /// # }, |mut stream| async move {
273    /// // 42, 42, 42, ...
274    /// # assert_eq!(stream.next().await.unwrap(), 42);
275    /// # assert_eq!(stream.next().await.unwrap(), 42);
276    /// # assert_eq!(stream.next().await.unwrap(), 42);
277    /// # }));
278    /// # }
279    /// ```
280    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
281    where
282        Self: Sized + NoTick,
283    {
284        Stream::new(
285            self.clone(),
286            HydroNode::Source {
287                source: HydroSource::Spin(),
288                metadata: self.new_node_metadata(Stream::<
289                    (),
290                    Self,
291                    Unbounded,
292                    TotalOrder,
293                    ExactlyOnce,
294                >::collection_kind()),
295            },
296        )
297    }
298
299    /// Creates a stream from an async [`FuturesStream`].
300    ///
301    /// This is useful for integrating with external async data sources,
302    /// such as network connections or file readers.
303    ///
304    /// # Example
305    /// ```rust
306    /// # #[cfg(feature = "deploy")] {
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
311    /// # }, |mut stream| async move {
312    /// // 1, 2, 3
313    /// # for w in vec![1, 2, 3] {
314    /// #     assert_eq!(stream.next().await.unwrap(), w);
315    /// # }
316    /// # }));
317    /// # }
318    /// ```
319    fn source_stream<T, E>(
320        &self,
321        e: impl QuotedWithContext<'a, E, Self>,
322    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
323    where
324        E: FuturesStream<Item = T> + Unpin,
325        Self: Sized + NoTick,
326    {
327        let e = e.splice_untyped_ctx(self);
328
329        Stream::new(
330            self.clone(),
331            HydroNode::Source {
332                source: HydroSource::Stream(e.into()),
333                metadata: self.new_node_metadata(Stream::<
334                    T,
335                    Self,
336                    Unbounded,
337                    TotalOrder,
338                    ExactlyOnce,
339                >::collection_kind()),
340            },
341        )
342    }
343
344    /// Creates a bounded stream from an iterator.
345    ///
346    /// The iterator is evaluated once at runtime, and all elements are emitted
347    /// in order. This is useful for creating streams from static data or
348    /// for testing.
349    ///
350    /// # Example
351    /// ```rust
352    /// # #[cfg(feature = "deploy")] {
353    /// # use hydro_lang::prelude::*;
354    /// # use futures::StreamExt;
355    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
356    /// process.source_iter(q!(vec![1, 2, 3, 4]))
357    /// # }, |mut stream| async move {
358    /// // 1, 2, 3, 4
359    /// # for w in vec![1, 2, 3, 4] {
360    /// #     assert_eq!(stream.next().await.unwrap(), w);
361    /// # }
362    /// # }));
363    /// # }
364    /// ```
365    fn source_iter<T, E>(
366        &self,
367        e: impl QuotedWithContext<'a, E, Self>,
368    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
369    where
370        E: IntoIterator<Item = T>,
371        Self: Sized + NoTick,
372    {
373        let e = e.splice_typed_ctx(self);
374
375        Stream::new(
376            self.clone(),
377            HydroNode::Source {
378                source: HydroSource::Iter(e.into()),
379                metadata: self.new_node_metadata(
380                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
381                ),
382            },
383        )
384    }
385
386    /// Creates a stream of membership events for a cluster.
387    ///
388    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
389    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
390    /// keyed by the [`MemberId`] of the cluster member.
391    ///
392    /// This is useful for implementing protocols that need to track cluster membership,
393    /// such as broadcasting to all members or detecting failures.
394    ///
395    /// # Example
396    /// ```rust
397    /// # #[cfg(feature = "deploy")] {
398    /// # use hydro_lang::prelude::*;
399    /// # use futures::StreamExt;
400    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
401    /// let p1 = flow.process::<()>();
402    /// let workers: Cluster<()> = flow.cluster::<()>();
403    /// # // do nothing on each worker
404    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
405    /// let cluster_members = p1.source_cluster_members(&workers);
406    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
407    /// // if there are 4 members in the cluster, we would see a join event for each
408    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
409    /// # }, |mut stream| async move {
410    /// # let mut results = Vec::new();
411    /// # for w in 0..4 {
412    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
413    /// # }
414    /// # results.sort();
415    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
416    /// # }));
417    /// # }
418    /// ```
419    fn source_cluster_members<C: 'a>(
420        &self,
421        cluster: &Cluster<'a, C>,
422    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
423    where
424        Self: Sized + NoTick,
425    {
426        Stream::new(
427            self.clone(),
428            HydroNode::Source {
429                source: HydroSource::ClusterMembers(cluster.id()),
430                metadata: self.new_node_metadata(Stream::<
431                    (TaglessMemberId, MembershipEvent),
432                    Self,
433                    Unbounded,
434                    TotalOrder,
435                    ExactlyOnce,
436                >::collection_kind()),
437            },
438        )
439        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
440        .into_keyed()
441    }
442
443    /// Creates a one-way connection from an external process to receive raw bytes.
444    ///
445    /// Returns a port handle for the external process to connect to, and a stream
446    /// of received byte buffers.
447    ///
448    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
449    /// or [`Location::source_external_bincode`].
450    fn source_external_bytes<L>(
451        &self,
452        from: &External<L>,
453    ) -> (
454        ExternalBytesPort,
455        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
456    )
457    where
458        Self: Sized + NoTick,
459    {
460        let (port, stream, sink) =
461            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
462
463        sink.complete(self.source_iter(q!([])));
464
465        (port, stream)
466    }
467
468    /// Creates a one-way connection from an external process to receive bincode-serialized data.
469    ///
470    /// Returns a sink handle for the external process to send data to, and a stream
471    /// of received values.
472    ///
473    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
474    #[expect(clippy::type_complexity, reason = "stream markers")]
475    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
476        &self,
477        from: &External<L>,
478    ) -> (
479        ExternalBincodeSink<T, NotMany, O, R>,
480        Stream<T, Self, Unbounded, O, R>,
481    )
482    where
483        Self: Sized + NoTick,
484        T: Serialize + DeserializeOwned,
485    {
486        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
487        sink.complete(self.source_iter(q!([])));
488
489        (
490            ExternalBincodeSink {
491                process_key: from.key,
492                port_id: port.port_id,
493                _phantom: PhantomData,
494            },
495            stream.weaken_ordering().weaken_retries(),
496        )
497    }
498
499    /// Sets up a simulated input port on this location for testing.
500    ///
501    /// Returns a handle to send messages to the location as well as a stream
502    /// of received messages. This is only available when the `sim` feature is enabled.
503    #[cfg(feature = "sim")]
504    #[expect(clippy::type_complexity, reason = "stream markers")]
505    fn sim_input<T, O: Ordering, R: Retries>(
506        &self,
507    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
508    where
509        Self: Sized + NoTick,
510        T: Serialize + DeserializeOwned,
511    {
512        let external_location: External<'a, ()> = External {
513            key: LocationKey::FIRST,
514            flow_state: self.flow_state().clone(),
515            _phantom: PhantomData,
516        };
517
518        let (external, stream) = self.source_external_bincode(&external_location);
519
520        (SimSender(external.port_id, PhantomData), stream)
521    }
522
523    /// Creates an external input stream for embedded deployment mode.
524    ///
525    /// The `name` parameter specifies the name of the generated function parameter
526    /// that will supply data to this stream at runtime. The generated function will
527    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
528    fn embedded_input<T>(
529        &self,
530        name: impl Into<String>,
531    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
532    where
533        Self: Sized + NoTick,
534    {
535        let ident = syn::Ident::new(&name.into(), Span::call_site());
536
537        Stream::new(
538            self.clone(),
539            HydroNode::EmbeddedInput {
540                ident,
541                metadata: self.new_node_metadata(Stream::<
542                    T,
543                    Self,
544                    Unbounded,
545                    TotalOrder,
546                    ExactlyOnce,
547                >::collection_kind()),
548            },
549        )
550    }
551
552    /// Establishes a server on this location to receive a bidirectional connection from a single
553    /// client, identified by the given `External` handle. Returns a port handle for the external
554    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
555    /// messages.
556    ///
557    /// # Example
558    /// ```rust
559    /// # #[cfg(feature = "deploy")] {
560    /// # use hydro_lang::prelude::*;
561    /// # use hydro_deploy::Deployment;
562    /// # use futures::{SinkExt, StreamExt};
563    /// # tokio_test::block_on(async {
564    /// # use bytes::Bytes;
565    /// # use hydro_lang::location::NetworkHint;
566    /// # use tokio_util::codec::LengthDelimitedCodec;
567    /// # let mut flow = FlowBuilder::new();
568    /// let node = flow.process::<()>();
569    /// let external = flow.external::<()>();
570    /// let (port, incoming, outgoing) =
571    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
572    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
573    ///     let mut resp: Vec<u8> = data.into();
574    ///     resp.push(42);
575    ///     resp.into() // : Bytes
576    /// })));
577    ///
578    /// # let mut deployment = Deployment::new();
579    /// let nodes = flow // ... with_process and with_external
580    /// #     .with_process(&node, deployment.Localhost())
581    /// #     .with_external(&external, deployment.Localhost())
582    /// #     .deploy(&mut deployment);
583    ///
584    /// deployment.deploy().await.unwrap();
585    /// deployment.start().await.unwrap();
586    ///
587    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
588    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
589    /// assert_eq!(
590    ///     external_out.next().await.unwrap().unwrap(),
591    ///     vec![1, 2, 3, 42]
592    /// );
593    /// # });
594    /// # }
595    /// ```
596    #[expect(clippy::type_complexity, reason = "stream markers")]
597    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
598        &self,
599        from: &External<L>,
600        port_hint: NetworkHint,
601    ) -> (
602        ExternalBytesPort<NotMany>,
603        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
604        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
605    )
606    where
607        Self: Sized + NoTick,
608    {
609        let next_external_port_id = from
610            .flow_state
611            .borrow_mut()
612            .next_external_port
613            .get_and_increment();
614
615        let (fwd_ref, to_sink) =
616            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
617        let mut flow_state_borrow = self.flow_state().borrow_mut();
618
619        flow_state_borrow.push_root(HydroRoot::SendExternal {
620            to_external_key: from.key,
621            to_port_id: next_external_port_id,
622            to_many: false,
623            unpaired: false,
624            serialize_fn: None,
625            instantiate_fn: DebugInstantiate::Building,
626            input: Box::new(to_sink.ir_node.into_inner()),
627            op_metadata: HydroIrOpMetadata::new(),
628        });
629
630        let raw_stream: Stream<
631            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
632            Self,
633            Unbounded,
634            TotalOrder,
635            ExactlyOnce,
636        > = Stream::new(
637            self.clone(),
638            HydroNode::ExternalInput {
639                from_external_key: from.key,
640                from_port_id: next_external_port_id,
641                from_many: false,
642                codec_type: quote_type::<Codec>().into(),
643                port_hint,
644                instantiate_fn: DebugInstantiate::Building,
645                deserialize_fn: None,
646                metadata: self.new_node_metadata(Stream::<
647                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
648                    Self,
649                    Unbounded,
650                    TotalOrder,
651                    ExactlyOnce,
652                >::collection_kind()),
653            },
654        );
655
656        (
657            ExternalBytesPort {
658                process_key: from.key,
659                port_id: next_external_port_id,
660                _phantom: PhantomData,
661            },
662            raw_stream.flatten_ordered(),
663            fwd_ref,
664        )
665    }
666
667    /// Establishes a bidirectional connection from a single external client using bincode serialization.
668    ///
669    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
670    /// and a handle to send outgoing messages. This is a convenience wrapper around
671    /// [`Location::bind_single_client`] that uses bincode for serialization.
672    ///
673    /// # Type Parameters
674    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
675    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
676    #[expect(clippy::type_complexity, reason = "stream markers")]
677    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
678        &self,
679        from: &External<L>,
680    ) -> (
681        ExternalBincodeBidi<InT, OutT, NotMany>,
682        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
683        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
684    )
685    where
686        Self: Sized + NoTick,
687    {
688        let next_external_port_id = from
689            .flow_state
690            .borrow_mut()
691            .next_external_port
692            .get_and_increment();
693
694        let (fwd_ref, to_sink) =
695            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
696        let mut flow_state_borrow = self.flow_state().borrow_mut();
697
698        let root = get_this_crate();
699
700        let out_t_type = quote_type::<OutT>();
701        let ser_fn: syn::Expr = syn::parse_quote! {
702            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
703                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
704            )
705        };
706
707        flow_state_borrow.push_root(HydroRoot::SendExternal {
708            to_external_key: from.key,
709            to_port_id: next_external_port_id,
710            to_many: false,
711            unpaired: false,
712            serialize_fn: Some(ser_fn.into()),
713            instantiate_fn: DebugInstantiate::Building,
714            input: Box::new(to_sink.ir_node.into_inner()),
715            op_metadata: HydroIrOpMetadata::new(),
716        });
717
718        let in_t_type = quote_type::<InT>();
719
720        let deser_fn: syn::Expr = syn::parse_quote! {
721            |res| {
722                let b = res.unwrap();
723                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
724            }
725        };
726
727        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
728            self.clone(),
729            HydroNode::ExternalInput {
730                from_external_key: from.key,
731                from_port_id: next_external_port_id,
732                from_many: false,
733                codec_type: quote_type::<LengthDelimitedCodec>().into(),
734                port_hint: NetworkHint::Auto,
735                instantiate_fn: DebugInstantiate::Building,
736                deserialize_fn: Some(deser_fn.into()),
737                metadata: self.new_node_metadata(Stream::<
738                    InT,
739                    Self,
740                    Unbounded,
741                    TotalOrder,
742                    ExactlyOnce,
743                >::collection_kind()),
744            },
745        );
746
747        (
748            ExternalBincodeBidi {
749                process_key: from.key,
750                port_id: next_external_port_id,
751                _phantom: PhantomData,
752            },
753            raw_stream,
754            fwd_ref,
755        )
756    }
757
758    /// Establishes a server on this location to receive bidirectional connections from multiple
759    /// external clients using raw bytes.
760    ///
761    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
762    /// connections. Each client is assigned a unique `u64` identifier.
763    ///
764    /// Returns:
765    /// - A port handle for external processes to connect to
766    /// - A keyed stream of incoming messages, keyed by client ID
767    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
768    /// - A handle to send outgoing messages, keyed by client ID
769    #[expect(clippy::type_complexity, reason = "stream markers")]
770    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
771        &self,
772        from: &External<L>,
773        port_hint: NetworkHint,
774    ) -> (
775        ExternalBytesPort<Many>,
776        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
777        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
778        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
779    )
780    where
781        Self: Sized + NoTick,
782    {
783        let next_external_port_id = from
784            .flow_state
785            .borrow_mut()
786            .next_external_port
787            .get_and_increment();
788
789        let (fwd_ref, to_sink) =
790            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
791        let mut flow_state_borrow = self.flow_state().borrow_mut();
792
793        flow_state_borrow.push_root(HydroRoot::SendExternal {
794            to_external_key: from.key,
795            to_port_id: next_external_port_id,
796            to_many: true,
797            unpaired: false,
798            serialize_fn: None,
799            instantiate_fn: DebugInstantiate::Building,
800            input: Box::new(to_sink.entries().ir_node.into_inner()),
801            op_metadata: HydroIrOpMetadata::new(),
802        });
803
804        let raw_stream: Stream<
805            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
806            Self,
807            Unbounded,
808            TotalOrder,
809            ExactlyOnce,
810        > = Stream::new(
811            self.clone(),
812            HydroNode::ExternalInput {
813                from_external_key: from.key,
814                from_port_id: next_external_port_id,
815                from_many: true,
816                codec_type: quote_type::<Codec>().into(),
817                port_hint,
818                instantiate_fn: DebugInstantiate::Building,
819                deserialize_fn: None,
820                metadata: self.new_node_metadata(Stream::<
821                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
822                    Self,
823                    Unbounded,
824                    TotalOrder,
825                    ExactlyOnce,
826                >::collection_kind()),
827            },
828        );
829
830        let membership_stream_ident = syn::Ident::new(
831            &format!(
832                "__hydro_deploy_many_{}_{}_membership",
833                from.key, next_external_port_id
834            ),
835            Span::call_site(),
836        );
837        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
838        let raw_membership_stream: KeyedStream<
839            u64,
840            bool,
841            Self,
842            Unbounded,
843            TotalOrder,
844            ExactlyOnce,
845        > = KeyedStream::new(
846            self.clone(),
847            HydroNode::Source {
848                source: HydroSource::Stream(membership_stream_expr.into()),
849                metadata: self.new_node_metadata(KeyedStream::<
850                    u64,
851                    bool,
852                    Self,
853                    Unbounded,
854                    TotalOrder,
855                    ExactlyOnce,
856                >::collection_kind()),
857            },
858        );
859
860        (
861            ExternalBytesPort {
862                process_key: from.key,
863                port_id: next_external_port_id,
864                _phantom: PhantomData,
865            },
866            raw_stream
867                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
868                .into_keyed(),
869            raw_membership_stream.map(q!(|join| {
870                if join {
871                    MembershipEvent::Joined
872                } else {
873                    MembershipEvent::Left
874                }
875            })),
876            fwd_ref,
877        )
878    }
879
880    /// Establishes a server on this location to receive bidirectional connections from multiple
881    /// external clients using bincode serialization.
882    ///
883    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
884    /// client connections. Each client is assigned a unique `u64` identifier.
885    ///
886    /// Returns:
887    /// - A port handle for external processes to connect to
888    /// - A keyed stream of incoming messages, keyed by client ID
889    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
890    /// - A handle to send outgoing messages, keyed by client ID
891    ///
892    /// # Type Parameters
893    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
894    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
895    #[expect(clippy::type_complexity, reason = "stream markers")]
896    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
897        &self,
898        from: &External<L>,
899    ) -> (
900        ExternalBincodeBidi<InT, OutT, Many>,
901        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
902        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
903        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
904    )
905    where
906        Self: Sized + NoTick,
907    {
908        let next_external_port_id = from
909            .flow_state
910            .borrow_mut()
911            .next_external_port
912            .get_and_increment();
913
914        let (fwd_ref, to_sink) =
915            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
916        let mut flow_state_borrow = self.flow_state().borrow_mut();
917
918        let root = get_this_crate();
919
920        let out_t_type = quote_type::<OutT>();
921        let ser_fn: syn::Expr = syn::parse_quote! {
922            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
923                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
924            )
925        };
926
927        flow_state_borrow.push_root(HydroRoot::SendExternal {
928            to_external_key: from.key,
929            to_port_id: next_external_port_id,
930            to_many: true,
931            unpaired: false,
932            serialize_fn: Some(ser_fn.into()),
933            instantiate_fn: DebugInstantiate::Building,
934            input: Box::new(to_sink.entries().ir_node.into_inner()),
935            op_metadata: HydroIrOpMetadata::new(),
936        });
937
938        let in_t_type = quote_type::<InT>();
939
940        let deser_fn: syn::Expr = syn::parse_quote! {
941            |res| {
942                let (id, b) = res.unwrap();
943                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
944            }
945        };
946
947        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
948            KeyedStream::new(
949                self.clone(),
950                HydroNode::ExternalInput {
951                    from_external_key: from.key,
952                    from_port_id: next_external_port_id,
953                    from_many: true,
954                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
955                    port_hint: NetworkHint::Auto,
956                    instantiate_fn: DebugInstantiate::Building,
957                    deserialize_fn: Some(deser_fn.into()),
958                    metadata: self.new_node_metadata(KeyedStream::<
959                        u64,
960                        InT,
961                        Self,
962                        Unbounded,
963                        TotalOrder,
964                        ExactlyOnce,
965                    >::collection_kind()),
966                },
967            );
968
969        let membership_stream_ident = syn::Ident::new(
970            &format!(
971                "__hydro_deploy_many_{}_{}_membership",
972                from.key, next_external_port_id
973            ),
974            Span::call_site(),
975        );
976        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
977        let raw_membership_stream: KeyedStream<
978            u64,
979            bool,
980            Self,
981            Unbounded,
982            TotalOrder,
983            ExactlyOnce,
984        > = KeyedStream::new(
985            self.clone(),
986            HydroNode::Source {
987                source: HydroSource::Stream(membership_stream_expr.into()),
988                metadata: self.new_node_metadata(KeyedStream::<
989                    u64,
990                    bool,
991                    Self,
992                    Unbounded,
993                    TotalOrder,
994                    ExactlyOnce,
995                >::collection_kind()),
996            },
997        );
998
999        (
1000            ExternalBincodeBidi {
1001                process_key: from.key,
1002                port_id: next_external_port_id,
1003                _phantom: PhantomData,
1004            },
1005            raw_stream,
1006            raw_membership_stream.map(q!(|join| {
1007                if join {
1008                    MembershipEvent::Joined
1009                } else {
1010                    MembershipEvent::Left
1011                }
1012            })),
1013            fwd_ref,
1014        )
1015    }
1016
1017    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1018    ///
1019    /// # Example
1020    /// ```rust
1021    /// # #[cfg(feature = "deploy")] {
1022    /// # use hydro_lang::prelude::*;
1023    /// # use futures::StreamExt;
1024    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1025    /// let tick = process.tick();
1026    /// let singleton = tick.singleton(q!(5));
1027    /// # singleton.all_ticks()
1028    /// # }, |mut stream| async move {
1029    /// // 5
1030    /// # assert_eq!(stream.next().await.unwrap(), 5);
1031    /// # }));
1032    /// # }
1033    /// ```
1034    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1035    where
1036        T: Clone,
1037        Self: Sized,
1038    {
1039        let e = e.splice_untyped_ctx(self);
1040
1041        Singleton::new(
1042            self.clone(),
1043            HydroNode::SingletonSource {
1044                value: e.into(),
1045                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1046            },
1047        )
1048    }
1049
1050    /// Generates a stream with values emitted at a fixed interval, with
1051    /// each value being the current time (as an [`tokio::time::Instant`]).
1052    ///
1053    /// The clock source used is monotonic, so elements will be emitted in
1054    /// increasing order.
1055    ///
1056    /// # Non-Determinism
1057    /// Because this stream is generated by an OS timer, it will be
1058    /// non-deterministic because each timestamp will be arbitrary.
1059    fn source_interval(
1060        &self,
1061        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1062        _nondet: NonDet,
1063    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1064    where
1065        Self: Sized + NoTick,
1066    {
1067        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1068            tokio::time::interval(interval)
1069        )))
1070    }
1071
1072    /// Generates a stream with values emitted at a fixed interval (with an
1073    /// initial delay), with each value being the current time
1074    /// (as an [`tokio::time::Instant`]).
1075    ///
1076    /// The clock source used is monotonic, so elements will be emitted in
1077    /// increasing order.
1078    ///
1079    /// # Non-Determinism
1080    /// Because this stream is generated by an OS timer, it will be
1081    /// non-deterministic because each timestamp will be arbitrary.
1082    fn source_interval_delayed(
1083        &self,
1084        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1085        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1086        _nondet: NonDet,
1087    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1088    where
1089        Self: Sized + NoTick,
1090    {
1091        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1092            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1093        )))
1094    }
1095
1096    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1097    ///
1098    /// Returns a handle that must be completed with the actual stream, and a placeholder
1099    /// stream that can be used in the dataflow graph before the actual stream is defined.
1100    ///
1101    /// This is useful for implementing feedback loops or recursive computations where
1102    /// a stream depends on its own output.
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # #[cfg(feature = "deploy")] {
1107    /// # use hydro_lang::prelude::*;
1108    /// # use hydro_lang::live_collections::stream::NoOrder;
1109    /// # use futures::StreamExt;
1110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1111    /// // Create a forward reference for the feedback stream
1112    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1113    ///
1114    /// // Combine initial input with feedback, then increment
1115    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1116    /// let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));
1117    ///
1118    /// // Complete the forward reference with the output
1119    /// complete.complete(output.clone());
1120    /// output
1121    /// # }, |mut stream| async move {
1122    /// // 2, 3, 4, 5, ...
1123    /// # assert_eq!(stream.next().await.unwrap(), 2);
1124    /// # assert_eq!(stream.next().await.unwrap(), 3);
1125    /// # assert_eq!(stream.next().await.unwrap(), 4);
1126    /// # }));
1127    /// # }
1128    /// ```
1129    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1130    where
1131        S: CycleCollection<'a, ForwardRef, Location = Self>,
1132    {
1133        let next_id = self.flow_state().borrow_mut().next_cycle_id();
1134        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
1135
1136        (
1137            ForwardHandle {
1138                completed: false,
1139                ident: ident.clone(),
1140                expected_location: Location::id(self),
1141                _phantom: PhantomData,
1142            },
1143            S::create_source(ident, self.clone()),
1144        )
1145    }
1146}
1147
1148#[cfg(feature = "deploy")]
1149#[cfg(test)]
1150mod tests {
1151    use std::collections::HashSet;
1152
1153    use futures::{SinkExt, StreamExt};
1154    use hydro_deploy::Deployment;
1155    use stageleft::q;
1156    use tokio_util::codec::LengthDelimitedCodec;
1157
1158    use crate::compile::builder::FlowBuilder;
1159    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1160    use crate::location::{Location, NetworkHint};
1161    use crate::nondet::nondet;
1162
1163    #[tokio::test]
1164    async fn top_level_singleton_replay_cardinality() {
1165        let mut deployment = Deployment::new();
1166
1167        let mut flow = FlowBuilder::new();
1168        let node = flow.process::<()>();
1169        let external = flow.external::<()>();
1170
1171        let (in_port, input) =
1172            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1173        let singleton = node.singleton(q!(123));
1174        let tick = node.tick();
1175        let out = input
1176            .batch(&tick, nondet!(/** test */))
1177            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1178            .cross_singleton(
1179                singleton
1180                    .snapshot(&tick, nondet!(/** test */))
1181                    .into_stream()
1182                    .count(),
1183            )
1184            .all_ticks()
1185            .send_bincode_external(&external);
1186
1187        let nodes = flow
1188            .with_process(&node, deployment.Localhost())
1189            .with_external(&external, deployment.Localhost())
1190            .deploy(&mut deployment);
1191
1192        deployment.deploy().await.unwrap();
1193
1194        let mut external_in = nodes.connect(in_port).await;
1195        let mut external_out = nodes.connect(out).await;
1196
1197        deployment.start().await.unwrap();
1198
1199        external_in.send(1).await.unwrap();
1200        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1201
1202        external_in.send(2).await.unwrap();
1203        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1204    }
1205
1206    #[tokio::test]
1207    async fn tick_singleton_replay_cardinality() {
1208        let mut deployment = Deployment::new();
1209
1210        let mut flow = FlowBuilder::new();
1211        let node = flow.process::<()>();
1212        let external = flow.external::<()>();
1213
1214        let (in_port, input) =
1215            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1216        let tick = node.tick();
1217        let singleton = tick.singleton(q!(123));
1218        let out = input
1219            .batch(&tick, nondet!(/** test */))
1220            .cross_singleton(singleton.clone())
1221            .cross_singleton(singleton.into_stream().count())
1222            .all_ticks()
1223            .send_bincode_external(&external);
1224
1225        let nodes = flow
1226            .with_process(&node, deployment.Localhost())
1227            .with_external(&external, deployment.Localhost())
1228            .deploy(&mut deployment);
1229
1230        deployment.deploy().await.unwrap();
1231
1232        let mut external_in = nodes.connect(in_port).await;
1233        let mut external_out = nodes.connect(out).await;
1234
1235        deployment.start().await.unwrap();
1236
1237        external_in.send(1).await.unwrap();
1238        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1239
1240        external_in.send(2).await.unwrap();
1241        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1242    }
1243
1244    #[tokio::test]
1245    async fn external_bytes() {
1246        let mut deployment = Deployment::new();
1247
1248        let mut flow = FlowBuilder::new();
1249        let first_node = flow.process::<()>();
1250        let external = flow.external::<()>();
1251
1252        let (in_port, input) = first_node.source_external_bytes(&external);
1253        let out = input.send_bincode_external(&external);
1254
1255        let nodes = flow
1256            .with_process(&first_node, deployment.Localhost())
1257            .with_external(&external, deployment.Localhost())
1258            .deploy(&mut deployment);
1259
1260        deployment.deploy().await.unwrap();
1261
1262        let mut external_in = nodes.connect(in_port).await.1;
1263        let mut external_out = nodes.connect(out).await;
1264
1265        deployment.start().await.unwrap();
1266
1267        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1268
1269        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1270    }
1271
1272    #[tokio::test]
1273    async fn multi_external_source() {
1274        let mut deployment = Deployment::new();
1275
1276        let mut flow = FlowBuilder::new();
1277        let first_node = flow.process::<()>();
1278        let external = flow.external::<()>();
1279
1280        let (in_port, input, _membership, complete_sink) =
1281            first_node.bidi_external_many_bincode(&external);
1282        let out = input.entries().send_bincode_external(&external);
1283        complete_sink.complete(
1284            first_node
1285                .source_iter::<(u64, ()), _>(q!([]))
1286                .into_keyed()
1287                .weaken_ordering(),
1288        );
1289
1290        let nodes = flow
1291            .with_process(&first_node, deployment.Localhost())
1292            .with_external(&external, deployment.Localhost())
1293            .deploy(&mut deployment);
1294
1295        deployment.deploy().await.unwrap();
1296
1297        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1298        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1299        let external_out = nodes.connect(out).await;
1300
1301        deployment.start().await.unwrap();
1302
1303        external_in_1.send(123).await.unwrap();
1304        external_in_2.send(456).await.unwrap();
1305
1306        assert_eq!(
1307            external_out.take(2).collect::<HashSet<_>>().await,
1308            vec![(0, 123), (1, 456)].into_iter().collect()
1309        );
1310    }
1311
1312    #[tokio::test]
1313    async fn second_connection_only_multi_source() {
1314        let mut deployment = Deployment::new();
1315
1316        let mut flow = FlowBuilder::new();
1317        let first_node = flow.process::<()>();
1318        let external = flow.external::<()>();
1319
1320        let (in_port, input, _membership, complete_sink) =
1321            first_node.bidi_external_many_bincode(&external);
1322        let out = input.entries().send_bincode_external(&external);
1323        complete_sink.complete(
1324            first_node
1325                .source_iter::<(u64, ()), _>(q!([]))
1326                .into_keyed()
1327                .weaken_ordering(),
1328        );
1329
1330        let nodes = flow
1331            .with_process(&first_node, deployment.Localhost())
1332            .with_external(&external, deployment.Localhost())
1333            .deploy(&mut deployment);
1334
1335        deployment.deploy().await.unwrap();
1336
1337        // intentionally skipped to test stream waking logic
1338        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1339        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1340        let mut external_out = nodes.connect(out).await;
1341
1342        deployment.start().await.unwrap();
1343
1344        external_in_2.send(456).await.unwrap();
1345
1346        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1347    }
1348
1349    #[tokio::test]
1350    async fn multi_external_bytes() {
1351        let mut deployment = Deployment::new();
1352
1353        let mut flow = FlowBuilder::new();
1354        let first_node = flow.process::<()>();
1355        let external = flow.external::<()>();
1356
1357        let (in_port, input, _membership, complete_sink) = first_node
1358            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1359        let out = input.entries().send_bincode_external(&external);
1360        complete_sink.complete(
1361            first_node
1362                .source_iter(q!([]))
1363                .into_keyed()
1364                .weaken_ordering(),
1365        );
1366
1367        let nodes = flow
1368            .with_process(&first_node, deployment.Localhost())
1369            .with_external(&external, deployment.Localhost())
1370            .deploy(&mut deployment);
1371
1372        deployment.deploy().await.unwrap();
1373
1374        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1375        let mut external_in_2 = nodes.connect(in_port).await.1;
1376        let external_out = nodes.connect(out).await;
1377
1378        deployment.start().await.unwrap();
1379
1380        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1381        external_in_2.send(vec![4, 5].into()).await.unwrap();
1382
1383        assert_eq!(
1384            external_out.take(2).collect::<HashSet<_>>().await,
1385            vec![
1386                (0, (&[1u8, 2, 3] as &[u8]).into()),
1387                (1, (&[4u8, 5] as &[u8]).into())
1388            ]
1389            .into_iter()
1390            .collect()
1391        );
1392    }
1393
1394    #[tokio::test]
1395    async fn single_client_external_bytes() {
1396        let mut deployment = Deployment::new();
1397        let mut flow = FlowBuilder::new();
1398        let first_node = flow.process::<()>();
1399        let external = flow.external::<()>();
1400        let (port, input, complete_sink) = first_node
1401            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1402        complete_sink.complete(input.map(q!(|data| {
1403            let mut resp: Vec<u8> = data.into();
1404            resp.push(42);
1405            resp.into() // : Bytes
1406        })));
1407
1408        let nodes = flow
1409            .with_process(&first_node, deployment.Localhost())
1410            .with_external(&external, deployment.Localhost())
1411            .deploy(&mut deployment);
1412
1413        deployment.deploy().await.unwrap();
1414        deployment.start().await.unwrap();
1415
1416        let (mut external_out, mut external_in) = nodes.connect(port).await;
1417
1418        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1419        assert_eq!(
1420            external_out.next().await.unwrap().unwrap(),
1421            vec![1, 2, 3, 42]
1422        );
1423    }
1424
1425    #[tokio::test]
1426    async fn echo_external_bytes() {
1427        let mut deployment = Deployment::new();
1428
1429        let mut flow = FlowBuilder::new();
1430        let first_node = flow.process::<()>();
1431        let external = flow.external::<()>();
1432
1433        let (port, input, _membership, complete_sink) = first_node
1434            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1435        complete_sink
1436            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1437
1438        let nodes = flow
1439            .with_process(&first_node, deployment.Localhost())
1440            .with_external(&external, deployment.Localhost())
1441            .deploy(&mut deployment);
1442
1443        deployment.deploy().await.unwrap();
1444
1445        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1446        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1447
1448        deployment.start().await.unwrap();
1449
1450        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1451        external_in_2.send(vec![4, 5].into()).await.unwrap();
1452
1453        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1454        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1455    }
1456
1457    #[tokio::test]
1458    async fn echo_external_bincode() {
1459        let mut deployment = Deployment::new();
1460
1461        let mut flow = FlowBuilder::new();
1462        let first_node = flow.process::<()>();
1463        let external = flow.external::<()>();
1464
1465        let (port, input, _membership, complete_sink) =
1466            first_node.bidi_external_many_bincode(&external);
1467        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1468
1469        let nodes = flow
1470            .with_process(&first_node, deployment.Localhost())
1471            .with_external(&external, deployment.Localhost())
1472            .deploy(&mut deployment);
1473
1474        deployment.deploy().await.unwrap();
1475
1476        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1477        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1478
1479        deployment.start().await.unwrap();
1480
1481        external_in_1.send("hi".to_owned()).await.unwrap();
1482        external_in_2.send("hello".to_owned()).await.unwrap();
1483
1484        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1485        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1486    }
1487}