1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
34use crate::forward_handle::ForwardRef;
35#[cfg(stageleft_runtime)]
36use crate::forward_handle::{CycleCollection, ForwardHandle};
37use crate::live_collections::boundedness::{Bounded, Unbounded};
38use crate::live_collections::keyed_stream::KeyedStream;
39use crate::live_collections::singleton::Singleton;
40use crate::live_collections::stream::{
41 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
42};
43use crate::location::dynamic::LocationId;
44use crate::location::external_process::{
45 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
46};
47use crate::nondet::NonDet;
48#[cfg(feature = "sim")]
49use crate::sim::SimSender;
50use crate::staging_util::get_this_crate;
51
52pub mod dynamic;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod external_process;
56pub use external_process::External;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod process;
60pub use process::Process;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod cluster;
64pub use cluster::Cluster;
65
66#[expect(missing_docs, reason = "TODO")]
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70#[expect(missing_docs, reason = "TODO")]
71pub mod tick;
72pub use tick::{Atomic, NoTick, Tick};
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77 Joined,
78 Left,
79}
80
81#[expect(missing_docs, reason = "TODO")]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum NetworkHint {
84 Auto,
85 TcpPort(Option<u16>),
86}
87
88pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
89 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
90}
91
92#[stageleft::export(LocationKey)]
93new_key_type! {
94 pub struct LocationKey;
96}
97
98impl std::fmt::Display for LocationKey {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "loc{:?}", self.data()) }
102}
103
104impl std::str::FromStr for LocationKey {
107 type Err = Option<ParseIntError>;
108
109 fn from_str(s: &str) -> Result<Self, Self::Err> {
110 let nvn = s.strip_prefix("loc").ok_or(None)?;
111 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
112 let idx: u64 = idx.parse()?;
113 let ver: u64 = ver.parse()?;
114 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
115 }
116}
117
118impl LocationKey {
119 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
125 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
129 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
131
132impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
134 type O = LocationKey;
135
136 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
137 where
138 Self: Sized,
139 {
140 let root = get_this_crate();
141 let n = Key::data(&self).as_ffi();
142 (
143 QuoteTokens {
144 prelude: None,
145 expr: Some(quote! {
146 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
147 }),
148 },
149 (),
150 )
151 }
152}
153
154#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
156pub enum LocationType {
157 Process,
159 Cluster,
161 External,
163}
164
165#[expect(
179 private_bounds,
180 reason = "only internal Hydro code can define location types"
181)]
182pub trait Location<'a>: dynamic::DynLocation {
183 type Root: Location<'a>;
188
189 fn root(&self) -> Self::Root;
194
195 fn try_tick(&self) -> Option<Tick<Self>> {
202 if Self::is_top_level() {
203 let next_id = self.flow_state().borrow_mut().next_clock_id;
204 self.flow_state().borrow_mut().next_clock_id += 1;
205 Some(Tick {
206 id: next_id,
207 l: self.clone(),
208 })
209 } else {
210 None
211 }
212 }
213
214 fn id(&self) -> LocationId {
216 dynamic::DynLocation::id(self)
217 }
218
219 fn tick(&self) -> Tick<Self>
245 where
246 Self: NoTick,
247 {
248 let next_id = self.flow_state().borrow_mut().next_clock_id;
249 self.flow_state().borrow_mut().next_clock_id += 1;
250 Tick {
251 id: next_id,
252 l: self.clone(),
253 }
254 }
255
256 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
281 where
282 Self: Sized + NoTick,
283 {
284 Stream::new(
285 self.clone(),
286 HydroNode::Source {
287 source: HydroSource::Spin(),
288 metadata: self.new_node_metadata(Stream::<
289 (),
290 Self,
291 Unbounded,
292 TotalOrder,
293 ExactlyOnce,
294 >::collection_kind()),
295 },
296 )
297 }
298
299 fn source_stream<T, E>(
320 &self,
321 e: impl QuotedWithContext<'a, E, Self>,
322 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
323 where
324 E: FuturesStream<Item = T> + Unpin,
325 Self: Sized + NoTick,
326 {
327 let e = e.splice_untyped_ctx(self);
328
329 Stream::new(
330 self.clone(),
331 HydroNode::Source {
332 source: HydroSource::Stream(e.into()),
333 metadata: self.new_node_metadata(Stream::<
334 T,
335 Self,
336 Unbounded,
337 TotalOrder,
338 ExactlyOnce,
339 >::collection_kind()),
340 },
341 )
342 }
343
344 fn source_iter<T, E>(
366 &self,
367 e: impl QuotedWithContext<'a, E, Self>,
368 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
369 where
370 E: IntoIterator<Item = T>,
371 Self: Sized + NoTick,
372 {
373 let e = e.splice_typed_ctx(self);
374
375 Stream::new(
376 self.clone(),
377 HydroNode::Source {
378 source: HydroSource::Iter(e.into()),
379 metadata: self.new_node_metadata(
380 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
381 ),
382 },
383 )
384 }
385
386 fn source_cluster_members<C: 'a>(
420 &self,
421 cluster: &Cluster<'a, C>,
422 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
423 where
424 Self: Sized + NoTick,
425 {
426 Stream::new(
427 self.clone(),
428 HydroNode::Source {
429 source: HydroSource::ClusterMembers(cluster.id()),
430 metadata: self.new_node_metadata(Stream::<
431 (TaglessMemberId, MembershipEvent),
432 Self,
433 Unbounded,
434 TotalOrder,
435 ExactlyOnce,
436 >::collection_kind()),
437 },
438 )
439 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
440 .into_keyed()
441 }
442
443 fn source_external_bytes<L>(
451 &self,
452 from: &External<L>,
453 ) -> (
454 ExternalBytesPort,
455 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
456 )
457 where
458 Self: Sized + NoTick,
459 {
460 let (port, stream, sink) =
461 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
462
463 sink.complete(self.source_iter(q!([])));
464
465 (port, stream)
466 }
467
468 #[expect(clippy::type_complexity, reason = "stream markers")]
475 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
476 &self,
477 from: &External<L>,
478 ) -> (
479 ExternalBincodeSink<T, NotMany, O, R>,
480 Stream<T, Self, Unbounded, O, R>,
481 )
482 where
483 Self: Sized + NoTick,
484 T: Serialize + DeserializeOwned,
485 {
486 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
487 sink.complete(self.source_iter(q!([])));
488
489 (
490 ExternalBincodeSink {
491 process_key: from.key,
492 port_id: port.port_id,
493 _phantom: PhantomData,
494 },
495 stream.weaken_ordering().weaken_retries(),
496 )
497 }
498
499 #[cfg(feature = "sim")]
504 #[expect(clippy::type_complexity, reason = "stream markers")]
505 fn sim_input<T, O: Ordering, R: Retries>(
506 &self,
507 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
508 where
509 Self: Sized + NoTick,
510 T: Serialize + DeserializeOwned,
511 {
512 let external_location: External<'a, ()> = External {
513 key: LocationKey::FIRST,
514 flow_state: self.flow_state().clone(),
515 _phantom: PhantomData,
516 };
517
518 let (external, stream) = self.source_external_bincode(&external_location);
519
520 (SimSender(external.port_id, PhantomData), stream)
521 }
522
523 fn embedded_input<T>(
529 &self,
530 name: impl Into<String>,
531 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
532 where
533 Self: Sized + NoTick,
534 {
535 let ident = syn::Ident::new(&name.into(), Span::call_site());
536
537 Stream::new(
538 self.clone(),
539 HydroNode::EmbeddedInput {
540 ident,
541 metadata: self.new_node_metadata(Stream::<
542 T,
543 Self,
544 Unbounded,
545 TotalOrder,
546 ExactlyOnce,
547 >::collection_kind()),
548 },
549 )
550 }
551
552 #[expect(clippy::type_complexity, reason = "stream markers")]
597 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
598 &self,
599 from: &External<L>,
600 port_hint: NetworkHint,
601 ) -> (
602 ExternalBytesPort<NotMany>,
603 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
604 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
605 )
606 where
607 Self: Sized + NoTick,
608 {
609 let next_external_port_id = from
610 .flow_state
611 .borrow_mut()
612 .next_external_port
613 .get_and_increment();
614
615 let (fwd_ref, to_sink) =
616 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
617 let mut flow_state_borrow = self.flow_state().borrow_mut();
618
619 flow_state_borrow.push_root(HydroRoot::SendExternal {
620 to_external_key: from.key,
621 to_port_id: next_external_port_id,
622 to_many: false,
623 unpaired: false,
624 serialize_fn: None,
625 instantiate_fn: DebugInstantiate::Building,
626 input: Box::new(to_sink.ir_node.into_inner()),
627 op_metadata: HydroIrOpMetadata::new(),
628 });
629
630 let raw_stream: Stream<
631 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
632 Self,
633 Unbounded,
634 TotalOrder,
635 ExactlyOnce,
636 > = Stream::new(
637 self.clone(),
638 HydroNode::ExternalInput {
639 from_external_key: from.key,
640 from_port_id: next_external_port_id,
641 from_many: false,
642 codec_type: quote_type::<Codec>().into(),
643 port_hint,
644 instantiate_fn: DebugInstantiate::Building,
645 deserialize_fn: None,
646 metadata: self.new_node_metadata(Stream::<
647 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
648 Self,
649 Unbounded,
650 TotalOrder,
651 ExactlyOnce,
652 >::collection_kind()),
653 },
654 );
655
656 (
657 ExternalBytesPort {
658 process_key: from.key,
659 port_id: next_external_port_id,
660 _phantom: PhantomData,
661 },
662 raw_stream.flatten_ordered(),
663 fwd_ref,
664 )
665 }
666
667 #[expect(clippy::type_complexity, reason = "stream markers")]
677 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
678 &self,
679 from: &External<L>,
680 ) -> (
681 ExternalBincodeBidi<InT, OutT, NotMany>,
682 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
683 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
684 )
685 where
686 Self: Sized + NoTick,
687 {
688 let next_external_port_id = from
689 .flow_state
690 .borrow_mut()
691 .next_external_port
692 .get_and_increment();
693
694 let (fwd_ref, to_sink) =
695 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
696 let mut flow_state_borrow = self.flow_state().borrow_mut();
697
698 let root = get_this_crate();
699
700 let out_t_type = quote_type::<OutT>();
701 let ser_fn: syn::Expr = syn::parse_quote! {
702 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
703 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
704 )
705 };
706
707 flow_state_borrow.push_root(HydroRoot::SendExternal {
708 to_external_key: from.key,
709 to_port_id: next_external_port_id,
710 to_many: false,
711 unpaired: false,
712 serialize_fn: Some(ser_fn.into()),
713 instantiate_fn: DebugInstantiate::Building,
714 input: Box::new(to_sink.ir_node.into_inner()),
715 op_metadata: HydroIrOpMetadata::new(),
716 });
717
718 let in_t_type = quote_type::<InT>();
719
720 let deser_fn: syn::Expr = syn::parse_quote! {
721 |res| {
722 let b = res.unwrap();
723 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
724 }
725 };
726
727 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
728 self.clone(),
729 HydroNode::ExternalInput {
730 from_external_key: from.key,
731 from_port_id: next_external_port_id,
732 from_many: false,
733 codec_type: quote_type::<LengthDelimitedCodec>().into(),
734 port_hint: NetworkHint::Auto,
735 instantiate_fn: DebugInstantiate::Building,
736 deserialize_fn: Some(deser_fn.into()),
737 metadata: self.new_node_metadata(Stream::<
738 InT,
739 Self,
740 Unbounded,
741 TotalOrder,
742 ExactlyOnce,
743 >::collection_kind()),
744 },
745 );
746
747 (
748 ExternalBincodeBidi {
749 process_key: from.key,
750 port_id: next_external_port_id,
751 _phantom: PhantomData,
752 },
753 raw_stream,
754 fwd_ref,
755 )
756 }
757
758 #[expect(clippy::type_complexity, reason = "stream markers")]
770 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
771 &self,
772 from: &External<L>,
773 port_hint: NetworkHint,
774 ) -> (
775 ExternalBytesPort<Many>,
776 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
777 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
778 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
779 )
780 where
781 Self: Sized + NoTick,
782 {
783 let next_external_port_id = from
784 .flow_state
785 .borrow_mut()
786 .next_external_port
787 .get_and_increment();
788
789 let (fwd_ref, to_sink) =
790 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
791 let mut flow_state_borrow = self.flow_state().borrow_mut();
792
793 flow_state_borrow.push_root(HydroRoot::SendExternal {
794 to_external_key: from.key,
795 to_port_id: next_external_port_id,
796 to_many: true,
797 unpaired: false,
798 serialize_fn: None,
799 instantiate_fn: DebugInstantiate::Building,
800 input: Box::new(to_sink.entries().ir_node.into_inner()),
801 op_metadata: HydroIrOpMetadata::new(),
802 });
803
804 let raw_stream: Stream<
805 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
806 Self,
807 Unbounded,
808 TotalOrder,
809 ExactlyOnce,
810 > = Stream::new(
811 self.clone(),
812 HydroNode::ExternalInput {
813 from_external_key: from.key,
814 from_port_id: next_external_port_id,
815 from_many: true,
816 codec_type: quote_type::<Codec>().into(),
817 port_hint,
818 instantiate_fn: DebugInstantiate::Building,
819 deserialize_fn: None,
820 metadata: self.new_node_metadata(Stream::<
821 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
822 Self,
823 Unbounded,
824 TotalOrder,
825 ExactlyOnce,
826 >::collection_kind()),
827 },
828 );
829
830 let membership_stream_ident = syn::Ident::new(
831 &format!(
832 "__hydro_deploy_many_{}_{}_membership",
833 from.key, next_external_port_id
834 ),
835 Span::call_site(),
836 );
837 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
838 let raw_membership_stream: KeyedStream<
839 u64,
840 bool,
841 Self,
842 Unbounded,
843 TotalOrder,
844 ExactlyOnce,
845 > = KeyedStream::new(
846 self.clone(),
847 HydroNode::Source {
848 source: HydroSource::Stream(membership_stream_expr.into()),
849 metadata: self.new_node_metadata(KeyedStream::<
850 u64,
851 bool,
852 Self,
853 Unbounded,
854 TotalOrder,
855 ExactlyOnce,
856 >::collection_kind()),
857 },
858 );
859
860 (
861 ExternalBytesPort {
862 process_key: from.key,
863 port_id: next_external_port_id,
864 _phantom: PhantomData,
865 },
866 raw_stream
867 .flatten_ordered() .into_keyed(),
869 raw_membership_stream.map(q!(|join| {
870 if join {
871 MembershipEvent::Joined
872 } else {
873 MembershipEvent::Left
874 }
875 })),
876 fwd_ref,
877 )
878 }
879
880 #[expect(clippy::type_complexity, reason = "stream markers")]
896 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
897 &self,
898 from: &External<L>,
899 ) -> (
900 ExternalBincodeBidi<InT, OutT, Many>,
901 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
902 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
903 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
904 )
905 where
906 Self: Sized + NoTick,
907 {
908 let next_external_port_id = from
909 .flow_state
910 .borrow_mut()
911 .next_external_port
912 .get_and_increment();
913
914 let (fwd_ref, to_sink) =
915 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
916 let mut flow_state_borrow = self.flow_state().borrow_mut();
917
918 let root = get_this_crate();
919
920 let out_t_type = quote_type::<OutT>();
921 let ser_fn: syn::Expr = syn::parse_quote! {
922 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
923 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
924 )
925 };
926
927 flow_state_borrow.push_root(HydroRoot::SendExternal {
928 to_external_key: from.key,
929 to_port_id: next_external_port_id,
930 to_many: true,
931 unpaired: false,
932 serialize_fn: Some(ser_fn.into()),
933 instantiate_fn: DebugInstantiate::Building,
934 input: Box::new(to_sink.entries().ir_node.into_inner()),
935 op_metadata: HydroIrOpMetadata::new(),
936 });
937
938 let in_t_type = quote_type::<InT>();
939
940 let deser_fn: syn::Expr = syn::parse_quote! {
941 |res| {
942 let (id, b) = res.unwrap();
943 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
944 }
945 };
946
947 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
948 KeyedStream::new(
949 self.clone(),
950 HydroNode::ExternalInput {
951 from_external_key: from.key,
952 from_port_id: next_external_port_id,
953 from_many: true,
954 codec_type: quote_type::<LengthDelimitedCodec>().into(),
955 port_hint: NetworkHint::Auto,
956 instantiate_fn: DebugInstantiate::Building,
957 deserialize_fn: Some(deser_fn.into()),
958 metadata: self.new_node_metadata(KeyedStream::<
959 u64,
960 InT,
961 Self,
962 Unbounded,
963 TotalOrder,
964 ExactlyOnce,
965 >::collection_kind()),
966 },
967 );
968
969 let membership_stream_ident = syn::Ident::new(
970 &format!(
971 "__hydro_deploy_many_{}_{}_membership",
972 from.key, next_external_port_id
973 ),
974 Span::call_site(),
975 );
976 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
977 let raw_membership_stream: KeyedStream<
978 u64,
979 bool,
980 Self,
981 Unbounded,
982 TotalOrder,
983 ExactlyOnce,
984 > = KeyedStream::new(
985 self.clone(),
986 HydroNode::Source {
987 source: HydroSource::Stream(membership_stream_expr.into()),
988 metadata: self.new_node_metadata(KeyedStream::<
989 u64,
990 bool,
991 Self,
992 Unbounded,
993 TotalOrder,
994 ExactlyOnce,
995 >::collection_kind()),
996 },
997 );
998
999 (
1000 ExternalBincodeBidi {
1001 process_key: from.key,
1002 port_id: next_external_port_id,
1003 _phantom: PhantomData,
1004 },
1005 raw_stream,
1006 raw_membership_stream.map(q!(|join| {
1007 if join {
1008 MembershipEvent::Joined
1009 } else {
1010 MembershipEvent::Left
1011 }
1012 })),
1013 fwd_ref,
1014 )
1015 }
1016
1017 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1035 where
1036 T: Clone,
1037 Self: Sized,
1038 {
1039 let e = e.splice_untyped_ctx(self);
1040
1041 Singleton::new(
1042 self.clone(),
1043 HydroNode::SingletonSource {
1044 value: e.into(),
1045 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1046 },
1047 )
1048 }
1049
1050 fn source_interval(
1060 &self,
1061 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1062 _nondet: NonDet,
1063 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1064 where
1065 Self: Sized + NoTick,
1066 {
1067 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1068 tokio::time::interval(interval)
1069 )))
1070 }
1071
1072 fn source_interval_delayed(
1083 &self,
1084 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1085 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1086 _nondet: NonDet,
1087 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1088 where
1089 Self: Sized + NoTick,
1090 {
1091 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1092 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1093 )))
1094 }
1095
1096 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1130 where
1131 S: CycleCollection<'a, ForwardRef, Location = Self>,
1132 {
1133 let next_id = self.flow_state().borrow_mut().next_cycle_id();
1134 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
1135
1136 (
1137 ForwardHandle {
1138 completed: false,
1139 ident: ident.clone(),
1140 expected_location: Location::id(self),
1141 _phantom: PhantomData,
1142 },
1143 S::create_source(ident, self.clone()),
1144 )
1145 }
1146}
1147
1148#[cfg(feature = "deploy")]
1149#[cfg(test)]
1150mod tests {
1151 use std::collections::HashSet;
1152
1153 use futures::{SinkExt, StreamExt};
1154 use hydro_deploy::Deployment;
1155 use stageleft::q;
1156 use tokio_util::codec::LengthDelimitedCodec;
1157
1158 use crate::compile::builder::FlowBuilder;
1159 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1160 use crate::location::{Location, NetworkHint};
1161 use crate::nondet::nondet;
1162
1163 #[tokio::test]
1164 async fn top_level_singleton_replay_cardinality() {
1165 let mut deployment = Deployment::new();
1166
1167 let mut flow = FlowBuilder::new();
1168 let node = flow.process::<()>();
1169 let external = flow.external::<()>();
1170
1171 let (in_port, input) =
1172 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1173 let singleton = node.singleton(q!(123));
1174 let tick = node.tick();
1175 let out = input
1176 .batch(&tick, nondet!())
1177 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1178 .cross_singleton(
1179 singleton
1180 .snapshot(&tick, nondet!())
1181 .into_stream()
1182 .count(),
1183 )
1184 .all_ticks()
1185 .send_bincode_external(&external);
1186
1187 let nodes = flow
1188 .with_process(&node, deployment.Localhost())
1189 .with_external(&external, deployment.Localhost())
1190 .deploy(&mut deployment);
1191
1192 deployment.deploy().await.unwrap();
1193
1194 let mut external_in = nodes.connect(in_port).await;
1195 let mut external_out = nodes.connect(out).await;
1196
1197 deployment.start().await.unwrap();
1198
1199 external_in.send(1).await.unwrap();
1200 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1201
1202 external_in.send(2).await.unwrap();
1203 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1204 }
1205
1206 #[tokio::test]
1207 async fn tick_singleton_replay_cardinality() {
1208 let mut deployment = Deployment::new();
1209
1210 let mut flow = FlowBuilder::new();
1211 let node = flow.process::<()>();
1212 let external = flow.external::<()>();
1213
1214 let (in_port, input) =
1215 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1216 let tick = node.tick();
1217 let singleton = tick.singleton(q!(123));
1218 let out = input
1219 .batch(&tick, nondet!())
1220 .cross_singleton(singleton.clone())
1221 .cross_singleton(singleton.into_stream().count())
1222 .all_ticks()
1223 .send_bincode_external(&external);
1224
1225 let nodes = flow
1226 .with_process(&node, deployment.Localhost())
1227 .with_external(&external, deployment.Localhost())
1228 .deploy(&mut deployment);
1229
1230 deployment.deploy().await.unwrap();
1231
1232 let mut external_in = nodes.connect(in_port).await;
1233 let mut external_out = nodes.connect(out).await;
1234
1235 deployment.start().await.unwrap();
1236
1237 external_in.send(1).await.unwrap();
1238 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1239
1240 external_in.send(2).await.unwrap();
1241 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1242 }
1243
1244 #[tokio::test]
1245 async fn external_bytes() {
1246 let mut deployment = Deployment::new();
1247
1248 let mut flow = FlowBuilder::new();
1249 let first_node = flow.process::<()>();
1250 let external = flow.external::<()>();
1251
1252 let (in_port, input) = first_node.source_external_bytes(&external);
1253 let out = input.send_bincode_external(&external);
1254
1255 let nodes = flow
1256 .with_process(&first_node, deployment.Localhost())
1257 .with_external(&external, deployment.Localhost())
1258 .deploy(&mut deployment);
1259
1260 deployment.deploy().await.unwrap();
1261
1262 let mut external_in = nodes.connect(in_port).await.1;
1263 let mut external_out = nodes.connect(out).await;
1264
1265 deployment.start().await.unwrap();
1266
1267 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1268
1269 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1270 }
1271
1272 #[tokio::test]
1273 async fn multi_external_source() {
1274 let mut deployment = Deployment::new();
1275
1276 let mut flow = FlowBuilder::new();
1277 let first_node = flow.process::<()>();
1278 let external = flow.external::<()>();
1279
1280 let (in_port, input, _membership, complete_sink) =
1281 first_node.bidi_external_many_bincode(&external);
1282 let out = input.entries().send_bincode_external(&external);
1283 complete_sink.complete(
1284 first_node
1285 .source_iter::<(u64, ()), _>(q!([]))
1286 .into_keyed()
1287 .weaken_ordering(),
1288 );
1289
1290 let nodes = flow
1291 .with_process(&first_node, deployment.Localhost())
1292 .with_external(&external, deployment.Localhost())
1293 .deploy(&mut deployment);
1294
1295 deployment.deploy().await.unwrap();
1296
1297 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1298 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1299 let external_out = nodes.connect(out).await;
1300
1301 deployment.start().await.unwrap();
1302
1303 external_in_1.send(123).await.unwrap();
1304 external_in_2.send(456).await.unwrap();
1305
1306 assert_eq!(
1307 external_out.take(2).collect::<HashSet<_>>().await,
1308 vec![(0, 123), (1, 456)].into_iter().collect()
1309 );
1310 }
1311
1312 #[tokio::test]
1313 async fn second_connection_only_multi_source() {
1314 let mut deployment = Deployment::new();
1315
1316 let mut flow = FlowBuilder::new();
1317 let first_node = flow.process::<()>();
1318 let external = flow.external::<()>();
1319
1320 let (in_port, input, _membership, complete_sink) =
1321 first_node.bidi_external_many_bincode(&external);
1322 let out = input.entries().send_bincode_external(&external);
1323 complete_sink.complete(
1324 first_node
1325 .source_iter::<(u64, ()), _>(q!([]))
1326 .into_keyed()
1327 .weaken_ordering(),
1328 );
1329
1330 let nodes = flow
1331 .with_process(&first_node, deployment.Localhost())
1332 .with_external(&external, deployment.Localhost())
1333 .deploy(&mut deployment);
1334
1335 deployment.deploy().await.unwrap();
1336
1337 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1339 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1340 let mut external_out = nodes.connect(out).await;
1341
1342 deployment.start().await.unwrap();
1343
1344 external_in_2.send(456).await.unwrap();
1345
1346 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1347 }
1348
1349 #[tokio::test]
1350 async fn multi_external_bytes() {
1351 let mut deployment = Deployment::new();
1352
1353 let mut flow = FlowBuilder::new();
1354 let first_node = flow.process::<()>();
1355 let external = flow.external::<()>();
1356
1357 let (in_port, input, _membership, complete_sink) = first_node
1358 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1359 let out = input.entries().send_bincode_external(&external);
1360 complete_sink.complete(
1361 first_node
1362 .source_iter(q!([]))
1363 .into_keyed()
1364 .weaken_ordering(),
1365 );
1366
1367 let nodes = flow
1368 .with_process(&first_node, deployment.Localhost())
1369 .with_external(&external, deployment.Localhost())
1370 .deploy(&mut deployment);
1371
1372 deployment.deploy().await.unwrap();
1373
1374 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1375 let mut external_in_2 = nodes.connect(in_port).await.1;
1376 let external_out = nodes.connect(out).await;
1377
1378 deployment.start().await.unwrap();
1379
1380 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1381 external_in_2.send(vec![4, 5].into()).await.unwrap();
1382
1383 assert_eq!(
1384 external_out.take(2).collect::<HashSet<_>>().await,
1385 vec![
1386 (0, (&[1u8, 2, 3] as &[u8]).into()),
1387 (1, (&[4u8, 5] as &[u8]).into())
1388 ]
1389 .into_iter()
1390 .collect()
1391 );
1392 }
1393
1394 #[tokio::test]
1395 async fn single_client_external_bytes() {
1396 let mut deployment = Deployment::new();
1397 let mut flow = FlowBuilder::new();
1398 let first_node = flow.process::<()>();
1399 let external = flow.external::<()>();
1400 let (port, input, complete_sink) = first_node
1401 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1402 complete_sink.complete(input.map(q!(|data| {
1403 let mut resp: Vec<u8> = data.into();
1404 resp.push(42);
1405 resp.into() })));
1407
1408 let nodes = flow
1409 .with_process(&first_node, deployment.Localhost())
1410 .with_external(&external, deployment.Localhost())
1411 .deploy(&mut deployment);
1412
1413 deployment.deploy().await.unwrap();
1414 deployment.start().await.unwrap();
1415
1416 let (mut external_out, mut external_in) = nodes.connect(port).await;
1417
1418 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1419 assert_eq!(
1420 external_out.next().await.unwrap().unwrap(),
1421 vec![1, 2, 3, 42]
1422 );
1423 }
1424
1425 #[tokio::test]
1426 async fn echo_external_bytes() {
1427 let mut deployment = Deployment::new();
1428
1429 let mut flow = FlowBuilder::new();
1430 let first_node = flow.process::<()>();
1431 let external = flow.external::<()>();
1432
1433 let (port, input, _membership, complete_sink) = first_node
1434 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1435 complete_sink
1436 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1437
1438 let nodes = flow
1439 .with_process(&first_node, deployment.Localhost())
1440 .with_external(&external, deployment.Localhost())
1441 .deploy(&mut deployment);
1442
1443 deployment.deploy().await.unwrap();
1444
1445 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1446 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1447
1448 deployment.start().await.unwrap();
1449
1450 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1451 external_in_2.send(vec![4, 5].into()).await.unwrap();
1452
1453 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1454 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1455 }
1456
1457 #[tokio::test]
1458 async fn echo_external_bincode() {
1459 let mut deployment = Deployment::new();
1460
1461 let mut flow = FlowBuilder::new();
1462 let first_node = flow.process::<()>();
1463 let external = flow.external::<()>();
1464
1465 let (port, input, _membership, complete_sink) =
1466 first_node.bidi_external_many_bincode(&external);
1467 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1468
1469 let nodes = flow
1470 .with_process(&first_node, deployment.Localhost())
1471 .with_external(&external, deployment.Localhost())
1472 .deploy(&mut deployment);
1473
1474 deployment.deploy().await.unwrap();
1475
1476 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1477 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1478
1479 deployment.start().await.unwrap();
1480
1481 external_in_1.send("hi".to_owned()).await.unwrap();
1482 external_in_2.send("hello".to_owned()).await.unwrap();
1483
1484 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1485 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1486 }
1487}