1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
26 p1: &Self::Process,
27 p1_port: &<Self::Process as Node>::Port,
28 p2: &Self::Process,
29 p2_port: &<Self::Process as Node>::Port,
30 ) -> (syn::Expr, syn::Expr);
31 fn o2o_connect(
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 ) -> Box<dyn FnOnce()>;
37
38 fn o2m_sink_source(
39 p1: &Self::Process,
40 p1_port: &<Self::Process as Node>::Port,
41 c2: &Self::Cluster,
42 c2_port: &<Self::Cluster as Node>::Port,
43 ) -> (syn::Expr, syn::Expr);
44 fn o2m_connect(
45 p1: &Self::Process,
46 p1_port: &<Self::Process as Node>::Port,
47 c2: &Self::Cluster,
48 c2_port: &<Self::Cluster as Node>::Port,
49 ) -> Box<dyn FnOnce()>;
50
51 fn m2o_sink_source(
52 c1: &Self::Cluster,
53 c1_port: &<Self::Cluster as Node>::Port,
54 p2: &Self::Process,
55 p2_port: &<Self::Process as Node>::Port,
56 ) -> (syn::Expr, syn::Expr);
57 fn m2o_connect(
58 c1: &Self::Cluster,
59 c1_port: &<Self::Cluster as Node>::Port,
60 p2: &Self::Process,
61 p2_port: &<Self::Process as Node>::Port,
62 ) -> Box<dyn FnOnce()>;
63
64 fn m2m_sink_source(
65 c1: &Self::Cluster,
66 c1_port: &<Self::Cluster as Node>::Port,
67 c2: &Self::Cluster,
68 c2_port: &<Self::Cluster as Node>::Port,
69 ) -> (syn::Expr, syn::Expr);
70 fn m2m_connect(
71 c1: &Self::Cluster,
72 c1_port: &<Self::Cluster as Node>::Port,
73 c2: &Self::Cluster,
74 c2_port: &<Self::Cluster as Node>::Port,
75 ) -> Box<dyn FnOnce()>;
76
77 fn e2o_many_source(
78 extra_stmts: &mut Vec<syn::Stmt>,
79 p2: &Self::Process,
80 p2_port: &<Self::Process as Node>::Port,
81 codec_type: &syn::Type,
82 shared_handle: String,
83 ) -> syn::Expr;
84 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
85
86 fn e2o_source(
87 extra_stmts: &mut Vec<syn::Stmt>,
88 p1: &Self::External,
89 p1_port: &<Self::External as Node>::Port,
90 p2: &Self::Process,
91 p2_port: &<Self::Process as Node>::Port,
92 codec_type: &syn::Type,
93 shared_handle: String,
94 ) -> syn::Expr;
95 fn e2o_connect(
96 p1: &Self::External,
97 p1_port: &<Self::External as Node>::Port,
98 p2: &Self::Process,
99 p2_port: &<Self::Process as Node>::Port,
100 many: bool,
101 server_hint: NetworkHint,
102 ) -> Box<dyn FnOnce()>;
103
104 fn o2e_sink(
105 p1: &Self::Process,
106 p1_port: &<Self::Process as Node>::Port,
107 p2: &Self::External,
108 p2_port: &<Self::External as Node>::Port,
109 shared_handle: String,
110 ) -> syn::Expr;
111
112 fn cluster_ids(
113 of_cluster: LocationKey,
114 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
115
116 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
117
118 fn cluster_membership_stream(
119 location_id: &LocationId,
120 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121
122 fn register_embedded_input(
127 _env: &mut Self::InstantiateEnv,
128 _ident: &syn::Ident,
129 _element_type: &syn::Type,
130 ) {
131 panic!("register_embedded_input is only supported by EmbeddedDeploy");
132 }
133}
134
135pub trait ProcessSpec<'a, D>
136where
137 D: Deploy<'a> + ?Sized,
138{
139 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
140}
141
142pub trait IntoProcessSpec<'a, D>
143where
144 D: Deploy<'a> + ?Sized,
145{
146 type ProcessSpec: ProcessSpec<'a, D>;
147 fn into_process_spec(self) -> Self::ProcessSpec;
148}
149
150impl<'a, D, T> IntoProcessSpec<'a, D> for T
151where
152 D: Deploy<'a> + ?Sized,
153 T: ProcessSpec<'a, D>,
154{
155 type ProcessSpec = T;
156 fn into_process_spec(self) -> Self::ProcessSpec {
157 self
158 }
159}
160
161pub trait ClusterSpec<'a, D>
162where
163 D: Deploy<'a> + ?Sized,
164{
165 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
166}
167
168pub trait ExternalSpec<'a, D>
169where
170 D: Deploy<'a> + ?Sized,
171{
172 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
173}
174
175pub trait Node {
176 type Port: Clone;
183 type Meta: Default;
184 type InstantiateEnv;
185
186 fn next_port(&self) -> Self::Port;
188
189 fn update_meta(&self, meta: &Self::Meta);
190
191 fn instantiate(
192 &self,
193 env: &mut Self::InstantiateEnv,
194 meta: &mut Self::Meta,
195 graph: DfirGraph,
196 extra_stmts: &[syn::Stmt],
197 sidecars: &[syn::Expr],
198 );
199}
200
201pub type DynSourceSink<Out, In, InErr> = (
202 Pin<Box<dyn Stream<Item = Out>>>,
203 Pin<Box<dyn Sink<In, Error = InErr>>>,
204);
205
206pub trait RegisterPort<'a, D>: Node + Clone
207where
208 D: Deploy<'a> + ?Sized,
209{
210 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
211
212 fn as_bytes_bidi(
213 &self,
214 external_port_id: ExternalPortId,
215 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
216
217 fn as_bincode_bidi<InT, OutT>(
218 &self,
219 external_port_id: ExternalPortId,
220 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
221 where
222 InT: Serialize + 'static,
223 OutT: DeserializeOwned + 'static;
224
225 fn as_bincode_sink<T>(
226 &self,
227 external_port_id: ExternalPortId,
228 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
229 where
230 T: Serialize + 'static;
231
232 fn as_bincode_source<T>(
233 &self,
234 external_port_id: ExternalPortId,
235 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
236 where
237 T: DeserializeOwned + 'static;
238}