OpenJPH
Open-source implementation of JPEG2000 Part-15
Loading...
Searching...
No Matches
ojph_stream_expand.cpp
Go to the documentation of this file.
1//***************************************************************************/
2// This software is released under the 2-Clause BSD license, included
3// below.
4//
5// Copyright (c) 2024, Aous Naman
6// Copyright (c) 2024, Kakadu Software Pty Ltd, Australia
7// Copyright (c) 2024, The University of New South Wales, Australia
8//
9// Redistribution and use in source and binary forms, with or without
10// modification, are permitted provided that the following conditions are
11// met:
12//
13// 1. Redistributions of source code must retain the above copyright
14// notice, this list of conditions and the following disclaimer.
15//
16// 2. Redistributions in binary form must reproduce the above copyright
17// notice, this list of conditions and the following disclaimer in the
18// documentation and/or other materials provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
21// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
23// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
26// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31//***************************************************************************/
32// This file is part of the OpenJPH software implementation.
33// File: ojph_stream_expand.cpp
34// Author: Aous Naman
35// Date: 17 April 2024
36//***************************************************************************/
37
38#include <iostream>
39#include "ojph_message.h"
40#include "ojph_arg.h"
41#include "ojph_sockets.h"
42#include "ojph_threads.h"
44
45#ifdef OJPH_OS_WINDOWS
46
47#else
48 #include <arpa/inet.h>
49#endif
50
52static
53bool get_arguments(int argc, char *argv[],
54 char *&recv_addr, char *&recv_port,
55 char *&src_addr, char *&src_port,
56 char *&target_name, ojph::ui32& num_threads,
57 ojph::ui32& num_inflight_packets,
58 ojph::ui32& recvfrm_buf_size, bool& blocking,
59 bool& quiet)
60{
61 ojph::cli_interpreter interpreter;
62 interpreter.init(argc, argv);
63
64 interpreter.reinterpret("-addr", recv_addr);
65 interpreter.reinterpret("-port", recv_port);
66 interpreter.reinterpret("-src_addr", src_addr);
67 interpreter.reinterpret("-src_port", src_port);
68 interpreter.reinterpret("-o", target_name);
69 interpreter.reinterpret("-num_threads", num_threads);
70 interpreter.reinterpret("-num_packets", num_inflight_packets);
71 interpreter.reinterpret("-recv_buf_size", recvfrm_buf_size);
72
73 blocking = interpreter.reinterpret("-blocking");
74 quiet = interpreter.reinterpret("-quiet");
75
76 if (interpreter.is_exhausted() == false) {
77 printf("The following arguments were not interpreted:\n");
78 ojph::argument t = interpreter.get_argument_zero();
79 t = interpreter.get_next_avail_argument(t);
80 while (t.is_valid()) {
81 printf("%s\n", t.arg);
82 t = interpreter.get_next_avail_argument(t);
83 }
84 return false;
85 }
86
87 if (recv_addr == NULL)
88 {
89 printf("Please use \"-addr\" to provide a receiving address, "
90 "\"localhost\" or a local network card IPv4 address.\n");
91 return false;
92 }
93 if (recv_port == NULL)
94 {
95 printf("Please use \"-port\" to provide a port number.\n");
96 return false;
97 }
98 if (num_threads < 1)
99 {
100 printf("Please set \"-num_threads\" to 1 or more.\n");
101 return false;
102 }
103 if (num_inflight_packets < 1)
104 {
105 printf("Please set \"-num_packets\" to 1 or more.\n");
106 return false;
107 }
108
109 return true;
110}
111
113int main(int argc, char* argv[])
114{
115 char *recv_addr = NULL;
116 char *recv_port = NULL;
117 char *src_addr = NULL;
118 char *src_port = NULL;
119 char *target_name = NULL;
120 ojph::ui32 num_threads = 2;
121 ojph::ui32 num_inflight_packets = 5;
122 ojph::ui32 recvfrm_buf_size = 65536;
123 bool blocking = false;
124 bool quiet = false;
125
126 if (argc <= 1) {
127 printf(
128 "\n"
129 "The following arguments are necessary:\n"
130 " -addr <receiving IPv4 address>, or\n"
131 " The address should be either localhost, or\n"
132 " a local network card IPv4 address\n"
133 " example: -addr 127.0.0.1\n"
134 " -port <listening port>\n"
135 "\n"
136 "The following arguments are options:\n"
137 " -src_addr <source ipv4 address>, packets from other sources\n"
138 " will be ignored. If not specified, then packets\n"
139 " from any source are accepted.\n"
140 " -src_port <source port>, packets from other source ports are\n"
141 " ignored. If not specified, then packets from any\n"
142 " port are accepted -- I would recommend not leaving\n"
143 " this one out.\n"
144 " -recv_buf_size <integer> recvfrom buffer size; default is 65536.\n"
145 " This is the size of the operating system's receive\n"
146 " buffer, before packets are picked by the program.\n"
147 " Larger buffers reduces the likelihood that a packet\n"
148 " is dropped before the program has a chance to pick it.\n"
149 " -blocking sets the receiving socket blocking mode to blocking.\n"
150 " The default mode is non-blocking. A blocking socket\n"
151 " increases the likelihood of not receiving some\n"
152 " packets; this is because the thread get into sleep\n"
153 " state, and therefore takes sometime to wakeup. A\n"
154 " non-blocking socket increase power consumption,\n"
155 " because it prevents the thread from sleeping.\n"
156 " -num_threads <integer> number of threads for decoding and\n"
157 " displaying files. This number also determines the\n"
158 " number of in-flight files, not completely\n"
159 " saved/processed yet. The number of files is set to\n"
160 " number of threads + 1\n"
161 " -num_packets <integer> number of in-flight packets; this is a\n"
162 " window of packets in which packets can be re-ordered.\n"
163 " -o <string> target file name without extension; the same\n"
164 " printf formating can be used. For example,\n"
165 " output_%%05d. An extension will be added, either .j2c\n"
166 " for original frames, or .ppm for decoded images.\n"
167 " -quiet use to stop printing informative messages.\n."
168 "\n"
169 );
170 exit(-1);
171 }
172 if (!get_arguments(argc, argv, recv_addr, recv_port, src_addr, src_port,
173 target_name, num_threads, num_inflight_packets,
174 recvfrm_buf_size, blocking, quiet))
175 {
176 exit(-1);
177 }
178
179 try {
180 ojph::thds::thread_pool thread_pool;
181 thread_pool.init(num_threads);
182 ojph::stex::frames_handler frames_handler;
183 frames_handler.init(quiet, target_name, &thread_pool);
184 ojph::stex::packets_handler packets_handler;
185 packets_handler.init(quiet, num_inflight_packets, &frames_handler);
187
188 // listening address/port
189 struct sockaddr_in server;
190 {
191 server.sin_family = AF_INET;
192 const char *p = recv_addr;
193 const char localhost[] = "127.0.0.1";
194 if (strcmp(recv_addr, "localhost") == 0)
195 p = localhost;
196 int result = inet_pton(AF_INET, p, &server.sin_addr);
197 if (result != 1)
198 OJPH_ERROR(0x02000001, "Please provide a valid IPv4 address when "
199 "using \"-addr,\" the provided address %s is not valid",
200 recv_addr);
201 ojph::ui16 port_number = 0;
202 port_number = (ojph::ui16)atoi(recv_port);
203 if (port_number == 0)
204 OJPH_ERROR(0x02000002, "Please provide a valid port number. "
205 "The number you provided is %d", recv_port);
206 server.sin_port = htons(port_number);
207 }
208
209 // create a socket
211 s = smanager.create_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
212 if(s.intern() == OJPH_INVALID_SOCKET)
213 {
214 std::string err = smanager.get_last_error_message();
215 OJPH_ERROR(0x02000003, "Could not create socket: %s", err.data());
216 }
217
218 // change recv buffer size; default is 65536
219 if (::setsockopt(s.intern(), SOL_SOCKET, SO_RCVBUF,
220 (char*)&recvfrm_buf_size, sizeof(recvfrm_buf_size)) == -1)
221 {
222 std::string err = smanager.get_last_error_message();
223 OJPH_INFO(0x02000001,
224 "Failed to expand receive buffer: %s", err.data());
225 }
226
227 // set socket to non-blocking
228 if (s.set_blocking_mode(blocking) == false)
229 {
230 std::string err = smanager.get_last_error_message();
231 OJPH_INFO(0x02000002,
232 "Failed to set the socket's blocking mode to %s, with error %s",
233 blocking ? "blocking" : "non-blocking", err.data());
234 }
235
236 // bind to listening address
237 if(bind(s.intern(), (struct sockaddr *)&server, sizeof(server)) == -1)
238 {
239 std::string err = smanager.get_last_error_message();
240 OJPH_ERROR(0x02000004,
241 "Could not bind address to socket: %s", err.data());
242 }
243
244 if (!quiet) {
245 constexpr int buf_size = 128;
246 char buf[buf_size];
247 ojph::ui32 addr = smanager.get_addr(server);
248 const char* t = inet_ntop(AF_INET, &addr, buf, buf_size);
249 if (t == NULL) {
250 std::string err = smanager.get_last_error_message();
251 OJPH_INFO(0x02000005,
252 "Error converting source address: %s", err.data());
253 }
254 printf("Listining on %s, port %d\n", t, ntohs(server.sin_port));
255 }
256
257 // process the source IPv4 address and port
258 ojph::ui32 saddr = 0;
259 if (src_addr)
260 {
261 const char *p = src_addr;
262 const char localhost[] = "127.0.0.1";
263 if (strcmp(src_addr, "localhost") == 0)
264 p = localhost;
265 struct sockaddr_in t;
266 int result = inet_pton(AF_INET, p, &t.sin_addr);
267 if (result != 1)
268 OJPH_ERROR(0x02000005, "Please provide a valid IPv4 address when "
269 "using \"-src_addr,\" the provided address %s is not valid",
270 src_addr);
271 saddr = smanager.get_addr(t);
272 }
273 ojph::ui16 sport = 0;
274 if (src_addr)
275 {
276 sport = (ojph::ui16)atoi(src_port);
277 if (sport == 0)
278 OJPH_ERROR(0x02000006, "Please provide a valid port number. "
279 "The number you provided is %d", src_port);
280 }
281
282 // listen to incoming data, and forward it to packet_handler
283 struct sockaddr_in si_other;
284 socklen_t socklen = sizeof(si_other);
285 bool src_printed = false;
286 ojph::stex::rtp_packet* packet = NULL;
287 ojph::ui32 last_time_stamp = 0;
288 while (1)
289 {
290 if (packet == NULL || packet->num_bytes != 0)
291 packet = packets_handler.exchange(packet);
292 if (packet == NULL)
293 continue;
294 packet->num_bytes = 0;
295
296 // receive data
297 int num_bytes = (int)recvfrom(s.intern(), (char*)packet->data,
298 packet->max_size, 0, (struct sockaddr*)&si_other, &socklen);
299
300 if (num_bytes < 0) // error or non-blocking call
301 {
302 int last_error = smanager.get_last_error();
303 if (last_error != OJPH_EWOULDBLOCK)
304 {
305 std::string err = smanager.get_error_message(last_error);
306 OJPH_INFO(0x02000003, "Failed to receive data: %s", err.data());
307 }
308 continue; // if we wish to continue
309 }
310
311 if ((src_addr && saddr != smanager.get_addr(si_other)) ||
312 (src_port && sport != si_other.sin_port)) {
313 constexpr int buf_size = 128;
314 char buf[buf_size];
315 ojph::ui32 addr = smanager.get_addr(si_other);
316 const char* t = inet_ntop(AF_INET, &addr, buf, buf_size);
317 if (t == NULL) {
318 std::string err = smanager.get_last_error_message();
319 OJPH_INFO(0x02000004,
320 "Error converting source address: %s", err.data());
321 }
322 printf("Source mistmatch %s, port %d\n",
323 t, ntohs(si_other.sin_port));
324 continue;
325 }
326
327 packet->num_bytes = (ojph::ui32)num_bytes;
328
329 if (last_time_stamp == 0)
330 last_time_stamp = packet->get_time_stamp();
331
332 if (!quiet && !src_printed)
333 {
334 constexpr int buf_size = 128;
335 char buf[buf_size];
336 ojph::ui32 addr = smanager.get_addr(si_other);
337 const char* t = inet_ntop(AF_INET, &addr, buf, buf_size);
338 if (t == NULL) {
339 std::string err = smanager.get_last_error_message();
340 OJPH_INFO(0x02000005,
341 "Error converting source address: %s", err.data());
342 }
343 printf("Receiving data from %s, port %d\n",
344 t, ntohs(si_other.sin_port));
345 src_printed = true;
346 }
347
348 if (!quiet)
349 if (packet->get_time_stamp() >= last_time_stamp + 45000)
350 { // One second is 90000
351 last_time_stamp = packet->get_time_stamp();
352 ojph::ui32 lost_packets = packets_handler.get_num_lost_packets();
353 ojph::ui32 total_frames = 0, trunc_frames = 0, lost_frames = 0;
354 frames_handler.get_stats(total_frames, trunc_frames, lost_frames);
355
356 printf("Total frame %d, tuncated frames %d, lost frames %d, "
357 "packets lost %d\n",
358 total_frames, trunc_frames, lost_frames, lost_packets);
359 }
360 }
361 s.close();
362 }
363 catch (const std::exception& e)
364 {
365 const char *p = e.what();
366 if (strncmp(p, "ojph error", 10) != 0)
367 printf("%s\n", p);
368 exit(-1);
369 }
370
371 return 0;
372}
373
bool is_valid()
Definition ojph_arg.h:58
void init(int argc, char *argv[])
Definition ojph_arg.h:73
void reinterpret(const char *str, int &val)
Definition ojph_arg.h:146
argument get_argument_zero()
Definition ojph_arg.h:126
argument get_next_avail_argument(const argument &arg)
Definition ojph_arg.h:133
A small wrapper for some Winsock2 functionality.
std::string get_error_message(int errnum)
Abstructs obtaining a textual message for an errnum.
static ui32 get_addr(const sockaddr_in &addr)
Abstractly obtains the 32-bit IPv4 address integer.
socket create_socket(int domain, int type, int protocol)
Abstructs socket creation.
std::string get_last_error_message()
Abstructs obtaining a textual message for GetLastError/errno.
int get_last_error()
Abstructs get last error or errno.
A small wrapper for socket that only abstract Winsock2.
ojph_socket intern()
provides access to the internal socket handle
void close()
Abstracts socket closing function.
bool set_blocking_mode(bool block)
Sets the blocking mode.
Assumes packets arrive in order.
void get_stats(ui32 &total_frames, ui32 &trunc_frames, ui32 &lost_frames)
call this function to collect statistics about frames
void init(bool quiet, const char *target_name, thds::thread_pool *thread_pool)
call this function to initialize this object
Interprets new packets, buffers them if needed.
rtp_packet * exchange(rtp_packet *p)
Call this function to get a packet from the packet chain.
ui32 get_num_lost_packets() const
This function provides information about the observed number of lost packets.
void init(bool quiet, ui32 num_packets, frames_handler *frames)
call this to initialize packets_handler
Implements a pool of threads, and can queue tasks.
void init(size_t num_threads)
Initializes the thread pool.
uint16_t ui16
Definition ojph_defs.h:52
uint32_t ui32
Definition ojph_defs.h:54
#define OJPH_INFO(t,...)
MACROs to insert file and line number for info, warning, and error.
#define OJPH_ERROR(t,...)
#define OJPH_INVALID_SOCKET
#define OJPH_EWOULDBLOCK
int main(int argc, char *argv[])
static bool get_arguments(int argc, char *argv[], char *&recv_addr, char *&recv_port, char *&src_addr, char *&src_port, char *&target_name, ojph::ui32 &num_threads, ojph::ui32 &num_inflight_packets, ojph::ui32 &recvfrm_buf_size, bool &blocking, bool &quiet)
inteprets RTP header and payload, and holds received packets.
ui32 num_bytes
number of bytes
ui8 data[max_size]
data in the packet
static constexpr int max_size
maximum packet size