Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
network_loop.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/network_loop.h
10//! @brief Network event loop thread.
11
12#ifndef ROC_NETIO_NETWORK_LOOP_H_
13#define ROC_NETIO_NETWORK_LOOP_H_
14
15#include <uv.h>
16
18#include "roc_core/atomic.h"
19#include "roc_core/attributes.h"
20#include "roc_core/iarena.h"
21#include "roc_core/ipool.h"
22#include "roc_core/list.h"
23#include "roc_core/mpsc_queue.h"
25#include "roc_core/optional.h"
26#include "roc_core/semaphore.h"
27#include "roc_core/thread.h"
30#include "roc_netio/iconn.h"
36#include "roc_netio/resolver.h"
39#include "roc_netio/udp_port.h"
40#include "roc_packet/iwriter.h"
42
43namespace roc {
44namespace netio {
45
46//! Network event loop thread.
47//! @remarks
48//! This class is a task-based facade for the whole roc_netio module.
50 private ICloseHandler,
52 private core::Thread {
53public:
54 //! Opaque port handle.
55 typedef struct PortHandle* PortHandle;
56
57 //! Subclasses for specific tasks.
58 class Tasks {
59 public:
60 //! Add UDP datagram sender/receiver port.
61 class AddUdpPort : public NetworkTask {
62 public:
63 //! Set task parameters.
64 //! @remarks
65 //! Updates @p config with the actual bind address.
67
68 //! Get created port handle.
69 //! @pre
70 //! Should be called only after success() is true.
72
73 private:
74 friend class NetworkLoop;
75
76 UdpConfig* config_;
77 };
78
79 //! Start sending on UDP port.
80 class StartUdpSend : public NetworkTask {
81 public:
82 //! Set task parameters.
83 //! @remarks
84 //! get_outbound_writer() returns a writer for packets to be send. It may be
85 //! used from another thread. It doesn't block the caller
87
88 //! Get created writer for outbound packets.
89 //! @pre
90 //! Should be called only after success() is true.
92
93 private:
94 friend class NetworkLoop;
95
96 packet::IWriter* outbound_writer_;
97 };
98
99 //! Start receiving on UDP port.
100 class StartUdpRecv : public NetworkTask {
101 public:
102 //! Set task parameters.
103 //! @remarks
104 //! Received packets will be passed to @p inbound_writer.
105 //! It is invoked from network thread. It should not block the caller.
106 StartUdpRecv(PortHandle handle, packet::IWriter& inbound_writer);
107
108 private:
109 friend class NetworkLoop;
110
111 packet::IWriter* inbound_writer_;
112 };
113
114 //! Add TCP server port.
115 class AddTcpServerPort : public NetworkTask {
116 public:
117 //! Set task parameters.
118 //! @remarks
119 //! - Updates @p config with the actual bind address.
120 //! - Listens for incoming connections and passes new connections
121 //! to @p conn_acceptor. It should return handler that will be
122 //! notified when connection state changes.
124
125 //! Get created port handle.
126 //! @pre
127 //! Should be called only after success() is true.
129
130 private:
131 friend class NetworkLoop;
132
133 TcpServerConfig* config_;
134 IConnAcceptor* conn_acceptor_;
135 };
136
137 //! Add TCP client port.
138 class AddTcpClientPort : public NetworkTask {
139 public:
140 //! Set task parameters.
141 //! @remarks
142 //! - Updates @p config with the actual bind address.
143 //! - Notofies @p conn_handler when connection state changes.
145
146 //! Get created port handle.
147 //! @pre
148 //! Should be called only after success() is true.
150
151 private:
152 friend class NetworkLoop;
153
154 TcpClientConfig* config_;
155 IConnHandler* conn_handler_;
156 };
157
158 //! Remove port.
159 class RemovePort : public NetworkTask {
160 public:
161 //! Set task parameters.
163
164 private:
165 friend class NetworkLoop;
166 };
167
168 //! Resolve endpoint address.
169 class ResolveEndpointAddress : public NetworkTask {
170 public:
171 //! Set task parameters.
172 //! @remarks
173 //! Gets endpoint hostname, resolves it, and writes the resolved IP address
174 //! and the port from the endpoint to the resulting SocketAddr.
176
177 //! Get resolved address.
178 //! @pre
179 //! Should be called only after success() is true.
181
182 private:
183 friend class NetworkLoop;
184
185 ResolverRequest resolve_req_;
186 };
187 };
188
189 //! Initialize.
190 //! @remarks
191 //! Start background thread if the object was successfully constructed.
192 NetworkLoop(core::IPool& packet_pool, core::IPool& buffer_pool, core::IArena& arena);
193
194 //! Destroy. Stop all receivers and senders.
195 //! @remarks
196 //! Wait until background thread finishes.
197 virtual ~NetworkLoop();
198
199 //! Check if the object was successfully constructed.
200 bool is_valid() const;
201
202 //! Get number of receiver and sender ports.
203 size_t num_ports() const;
204
205 //! Enqueue a task for asynchronous execution and return.
206 //! The task should not be destroyed until the callback is called.
207 //! The @p completer will be invoked on event loop thread after the
208 //! task completes.
210
211 //! Enqueue a task for asynchronous execution and wait for its completion.
212 //! The task should not be destroyed until this method returns.
213 //! Should not be called from schedule() callback.
214 //! @returns
215 //! true if the task succeeded or false if it failed.
217
218private:
219 static void task_sem_cb_(uv_async_t* handle);
220 static void stop_sem_cb_(uv_async_t* handle);
221
222 virtual void handle_terminate_completed(IConn&, void*);
223 virtual void handle_close_completed(BasicPort&, void*);
224 virtual void handle_resolved(ResolverRequest& req);
225
226 virtual void run();
227
228 void process_pending_tasks_();
229 void finish_task_(NetworkTask&);
230
231 void async_terminate_conn_port_(const core::SharedPtr<TcpConnectionPort>& port,
232 NetworkTask* task);
233 AsyncOperationStatus async_close_port_(const core::SharedPtr<BasicPort>& port,
234 NetworkTask* task);
235 void finish_closing_port_(const core::SharedPtr<BasicPort>& port, NetworkTask* task);
236
237 void update_num_ports_();
238
239 void close_all_sems_();
240 void close_all_ports_();
241
242 void task_add_udp_port_(NetworkTask&);
243 void task_start_udp_send_(NetworkTask&);
244 void task_start_udp_recv_(NetworkTask&);
245 void task_add_tcp_server_(NetworkTask&);
246 void task_add_tcp_client_(NetworkTask&);
247 void task_remove_port_(NetworkTask&);
248 void task_resolve_endpoint_address_(NetworkTask&);
249
250 packet::PacketFactory packet_factory_;
251 core::IArena& arena_;
252
253 bool started_;
254
255 uv_loop_t loop_;
256 bool loop_initialized_;
257
258 uv_async_t stop_sem_;
259 bool stop_sem_initialized_;
260
261 uv_async_t task_sem_;
262 bool task_sem_initialized_;
263
265
266 Resolver resolver_;
267
268 core::List<BasicPort> open_ports_;
269 core::List<BasicPort> closing_ports_;
270
271 core::Atomic<int> num_open_ports_;
272};
273
274} // namespace netio
275} // namespace roc
276
277#endif // ROC_NETIO_NETWORK_LOOP_H_
Atomic.
Compiler attributes.
#define ROC_ATTR_NODISCARD
Emit warning if function result is not checked.
Definition attributes.h:31
Base class for ports.
Network endpoint URI.
Atomic integer. Provides sequential consistency. For a fine-grained memory order control,...
Definition atomic.h:26
Memory arena interface.
Definition iarena.h:23
Memory pool interface.
Definition ipool.h:23
Intrusive doubly-linked list.
Definition list.h:40
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition mpsc_queue.h:45
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Base class for thread objects.
Definition thread.h:27
virtual void run()=0
Method to be executed in thread.
Base class for ports.
Definition basic_port.h:40
Close handler interface.
Connection acceptor interface.
Connection event handler interface.
Connection interface.
Definition iconn.h:30
Network task completion handler.
Resolver request result handler interface.
Termination handler interface.
PortHandle get_handle() const
Get created port handle.
AddTcpClientPort(TcpClientConfig &config, IConnHandler &conn_handler)
Set task parameters.
AddTcpServerPort(TcpServerConfig &config, IConnAcceptor &conn_acceptor)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
AddUdpPort(UdpConfig &config)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
RemovePort(PortHandle handle)
Set task parameters.
const address::SocketAddr & get_address() const
Get resolved address.
ResolveEndpointAddress(const address::EndpointUri &endpoint_uri)
Set task parameters.
StartUdpRecv(PortHandle handle, packet::IWriter &inbound_writer)
Set task parameters.
StartUdpSend(PortHandle handle)
Set task parameters.
packet::IWriter & get_outbound_writer() const
Get created writer for outbound packets.
Subclasses for specific tasks.
bool is_valid() const
Check if the object was successfully constructed.
NetworkLoop(core::IPool &packet_pool, core::IPool &buffer_pool, core::IArena &arena)
Initialize.
size_t num_ports() const
Get number of receiver and sender ports.
ROC_ATTR_NODISCARD bool schedule_and_wait(NetworkTask &task)
Enqueue a task for asynchronous execution and wait for its completion. The task should not be destroy...
void schedule(NetworkTask &task, INetworkTaskCompleter &completer)
Enqueue a task for asynchronous execution and return. The task should not be destroyed until the call...
virtual ~NetworkLoop()
Destroy. Stop all receivers and senders.
struct PortHandle * PortHandle
Opaque port handle.
Base class for network loop tasks.
Hostname resolver.
Definition resolver.h:25
Packet writer interface.
Definition iwriter.h:23
Memory arena interface.
Close handler interface.
Connection interface.
Connection acceptor interface.
Connection event handler interface.
Network task completion handler.
Memory pool interface.
Termination handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
MpscQueue node.
Network I/O.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Network task.
Optionally constructed object.
Packet factory.
Hostname resolver.
Socket address.
TCP connection parameters.
TCP server parameters.
UDP port parameters.
Definition udp_port.h:32
TCP connection.
Thread.
UDP port.