OpenJPH
Open-source implementation of JPEG2000 Part-15
stream_expand_support.cpp
Go to the documentation of this file.
1//***************************************************************************/
2// This software is released under the 2-Clause BSD license, included
3// below.
4//
5// Copyright (c) 2024, Aous Naman
6// Copyright (c) 2024, Kakadu Software Pty Ltd, Australia
7// Copyright (c) 2024, The University of New South Wales, Australia
8//
9// Redistribution and use in source and binary forms, with or without
10// modification, are permitted provided that the following conditions are
11// met:
12//
13// 1. Redistributions of source code must retain the above copyright
14// notice, this list of conditions and the following disclaimer.
15//
16// 2. Redistributions in binary form must reproduce the above copyright
17// notice, this list of conditions and the following disclaimer in the
18// documentation and/or other materials provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
21// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
22// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
23// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
26// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31//***************************************************************************/
32// This file is part of the OpenJPH software implementation.
33// File: stream_expand_support.h
34// Author: Aous Naman
35// Date: 18 April 2024
36//***************************************************************************/
37
38#include <cassert>
39#include <cstddef>
40#include "ojph_threads.h"
43
44namespace ojph
45{
46namespace stex
47{
48
50//
51//
52// static comparison functions
53//
54//
56
58// Compares two 32 bit values, A with B, with the possibility A or B has
59// undergone overflow. This problem has no proper solution, but here we
60// assume that the value B approximately divides the space into two regions,
61// a region larger than B and a region smaller than B. This leaves one
62// undetermined value that lies at the opposite end of B, a case we just
63// ignore -- it is part of smaller.
64// NB: This is my current thinking -- I might be wrong
65static inline bool is_greater32(ui32 a, ui32 b)
66{
67 ui32 c = a - b;
68 return (c > 0u && c <= 0x7FFFFFFFu);
69}
70
72// Compares two 32 bit values, A with B, with the possibility A or B has
73// undergone overflow. This problem has no proper solution, but here we
74// assume that the value B approximately divides the space into two regions,
75// a region larger than B and a region smaller than B. This leaves one
76// undetermined value that lies at the opposite end of B, a case we just
77// ignore -- it is part of smaller.
78// NB: This is my current thinking -- I might be wrong
79static inline bool is_smaller32(ui32 a, ui32 b)
80{
81 ui32 c = a - b;
82 return (c >= 0x80000000u && c <= 0xFFFFFFFFu);
83}
84
86static inline bool is_greater24(ui32 a, ui32 b)
87{ return is_greater32(a << 8, b << 8); }
88
90static inline bool is_smaller24(ui32 a, ui32 b)
91{ return is_smaller32(a << 8, b << 8); }
92
94static inline ui32 clip_seq_num(ui32 n) { return (n & 0xFFFFFF); }
95
97//
98//
99//
100//
101//
103
105void packets_handler::init(bool quiet, ui32 num_packets,
106 frames_handler* frames)
107{
108 assert(this->num_packets == 0);
110 ui32 i = 0;
111 for (; i < num_packets - 1; ++i)
112 packet_store[i].init(packet_store + i + 1);
113 packet_store[i].init(NULL);
114 this->quiet = quiet;
115 this->num_packets = num_packets;
116 this->frames = frames;
117}
118
121{
122 assert(num_packets > 0 && p == in_use);
123
124 if (p != NULL) {
125 if (p->num_bytes == 0)
126 return p;
127
128 if (last_seq_num == 0) // initialization
130
131 // packet is old, and is ignored -- no need to included it in the
132 // lost packets, because this packet was considered lost previously.
133 // This also captures the case where the previous packet and this packet
134 // has the same sequence number, which is rather weird but possible
135 // if some intermediate network unit retransmits packets.
137 return p;
138 else if (p->get_seq_num() == clip_seq_num(last_seq_num + 1))
139 {
141 // see if we can push one packet from the top of the buffer
144 }
145 else // sequence larger than expected
146 {
147 // Place the packet in the in_use queue according to its sequence
148 // number; we may have to move it down the queue. The in_use queue is
149 // always arranged in an ascending order, where the top of the queue
150 // (pointed to by in_use) has the smallest sequence number.
151 if (in_use->next != NULL) // we have more than 1 packet in queue
152 {
153 rtp_packet* t = in_use;
154 while (t->next != NULL &&
156 t = t->next;
157
158 if (t->next != NULL && p->get_seq_num() == t->next->get_seq_num())
159 { // this is a repeated packet and must be removed
160 in_use = in_use->next;
161 p->next = avail;
162 avail = p;
163 }
164 else {
165 if (t == in_use) // at front of queue -- exactly where it should be
166 { } // do nothing
167 else if (t->next == NULL) { // at the end of queue
168 in_use = in_use->next; // remove p from the queue
169 t->next = p;
170 p->next = NULL;
171 }
172 else { // in the middle of the queue
173 in_use = in_use->next; // p removed from the start of queue
174 p->next = t->next;
175 t->next = p;
176 }
177 }
178 }
179
180 // If avail == NULL, all packets are being used (in_use), meaning
181 // the queue is already full. We push packets from to the top of in_use
182 // queue.
183 // If avail != NULL, we push one packet from the top of the buffer,
184 // if it has the correct sequence number.
185 if (avail == NULL ||
187 {
188 if (avail == NULL)
189 lost_packets +=
194 }
195 }
196 }
197
198 // move from avail to in_use -- there must be at least one packet in avail
199 assert(avail != NULL);
200 p = avail;
201 avail = avail->next;
202 p->next = in_use;
203 in_use = p;
204 return p;
205}
206
209{
210 // move all packets from in_use to avail
211 while (in_use)
212 {
213 rtp_packet *p = in_use;
214 in_use = in_use->next;
215 p->next = avail;
216 avail = p;
217 }
218}
219
222{
225 // move pack from in_use to avail; the packet must be equal to in_use
226 rtp_packet* p = in_use;
227 in_use = in_use->next;
228 p->next = avail;
229 avail = p;
230}
231
233//
234//
235//
236//
237//
239
242{
243 int t = done.fetch_add(-1, std::memory_order_acq_rel);
244 if (t == 1) // done is 0
246}
247
249//
250//
251//
252//
253//
255
258{
259 if (storers_store)
260 delete[] storers_store;
261 if (files_store)
262 delete[] files_store;
263}
264
266void frames_handler::init(bool quiet, const char *target_name,
267 thds::thread_pool* thread_pool)
268{
269 this->quiet = quiet;
270 this->num_threads = (ui32)thread_pool->get_num_threads();
271 this->target_name = target_name;
275 ui32 i = 0;
276 for (; i < num_files - 1; ++i) {
277 files_store[i].f.open(2 << 20, false);
278 files_store[i].f.close();
279 files_store[i].init(this, files_store + i + 1, storers_store + i,
282 }
283 files_store[i].f.open(2 << 20, false);
284 files_store[i].f.close();
285 files_store[i].init(this, NULL, storers_store + i, target_name);
287 this->thread_pool = thread_pool;
288}
289
292{
296
297 // check if any of the frames processed in other threads are done
299
300 // process newly received packet
302 { // main packet payload
303
304 // The existance of a previous frame means we did not get the marked
305 // packet. Here, we close the frame and move it to processing
306 if (in_use) {
307 ++trunc_frames;
309 }
310
311 // This is where we process a new frame, if there is space
312 if (avail)
313 {
314 // move from avail to in_use
315 in_use = avail;
316 avail = avail->next;
317 in_use->next = NULL;
318
319 assert(in_use->done.load(std::memory_order_acquire) == 0);
323 in_use->f.open();
324 in_use->f.write(p->get_data(), p->get_data_size());
325 }
326 else
327 ++lost_frames;
328
329 ++total_frames;
331 }
332 else
333 { // body packet payload
334 if (in_use != NULL)
335 {
336 if (p->get_time_stamp() == in_use->time_stamp)
337 { // this is a continuation of a previous frame
339 {
341 in_use->f.write(p->get_data(), p->get_data_size());
342 if (p->is_marked())
344 }
345 else {
346 // we must have missed packets
347 ++trunc_frames;
349 }
350 }
351 else
352 {
353 // This is a different frame and we did not get the marked packet.
354 // We close the older frame and send it for processing
355 ++trunc_frames;
357
359 {
360 ++total_frames;
362 }
363 }
364 }
365 else // no frame is being written
366 {
368 {
369 ++total_frames;
371 }
372 }
373 }
374}
375
377void frames_handler::get_stats(ui32& total_frames, ui32& trunc_frames,
378 ui32& lost_frames)
379{
381 trunc_frames = this->trunc_frames;
382 lost_frames = this->lost_frames;
383}
384
387{
388 // check if any of the frames processed in other threads are done
390
391 // check the file in in_use and terminate it
392 if (in_use != NULL)
393 {
394 // move from in_use to avail
395 in_use->f.close();
396 in_use->next = avail;
397 avail = in_use;
398 in_use = NULL;
399 }
400
401 return (processing != NULL);
402}
403
406{
407 // check if any of the frames processed in other threads are done
408 int nf = num_complete_files.load(std::memory_order_acquire);
409 if (nf > 0)
410 {
411 stex_file* f = processing, *pf = NULL;
412 while(f != NULL && nf > 0)
413 {
414 num_complete_files.fetch_add(-1, std::memory_order_relaxed);
415
416 if (f->done.load(std::memory_order_acquire) == 0)
417 {
418 // move f from processing to avail
419 f->time_stamp = 0;
420 f->last_seen_seq = 0;
421 f->frame_idx = 0;
422 if (f == processing)
423 {
425 f->next = avail;
426 avail = f;
427 f = processing; // for next test
428 }
429 else {
430 pf->next = f->next;
431 f->next = avail;
432 avail = f;
433 f = pf->next; // for next test
434 }
435 }
436 else
437 {
438 pf = f;
439 f = f->next;
440 }
441 nf = num_complete_files.load(std::memory_order_acquire);
442 }
443 }
444}
445
448{
449 in_use->f.close();
450 if (target_name) {
453 in_use->done.store(1, std::memory_order_relaxed);
455 }
456 else {
457 in_use->next = avail;
458 avail = in_use;
459 }
460 in_use = NULL;
461}
462
463} // !stex namespace
464} // !ojph namespace
size_t write(const void *ptr, size_t size) override
Call this function to write data to the memory file.
Definition: ojph_file.cpp:166
void open(size_t initial_size=65536, bool clear_mem=false)
Call this function to open a memory file.
Definition: ojph_file.cpp:120
void close() override
Definition: ojph_file.cpp:134
Assumes packets arrive in order.
ui32 num_threads
number of threads used for saving
j2k_frame_storer * storers_store
address for allocated frame storers
ui32 total_frames
total number of frames that were observed
stex_file * in_use
the frame that is being filled with data
void push(rtp_packet *p)
call this function to push rtp_packets to this object
bool flush()
This function is not used, and therefore it is not clear how to use it.
void get_stats(ui32 &total_frames, ui32 &trunc_frames, ui32 &lost_frames)
call this function to collect statistics about frames
stex_file * processing
frames that are being saved
stex_file * avail
available frames structures
ui32 last_time_stamp
last observed time stamp
ui32 last_seq_number
last observed sequence number
ui32 trunc_frames
truncated frames (because of a packet lostt)
void check_files_in_processing()
call this function to process stex_file for which processing is complete
void increment_num_complete_files()
other threads call this function to let frames_handler know that processing is done.
void send_to_processing()
Handles complete/truncated files and send them for storing.
ui32 lost_frames
frames for which main header was not received
void init(bool quiet, const char *target_name, thds::thread_pool *thread_pool)
call this function to initialize this object
const char * target_name
target file name template
thds::thread_pool * thread_pool
thread pool for processing frames
bool quiet
no informational info is printed when true
std::atomic_int32_t num_complete_files
ui32 num_files
maximum number of in-flight files.
stex_file * files_store
address for allocated files
rtp_packet * packet_store
address of packet memory allocation
rtp_packet * exchange(rtp_packet *p)
Call this function to get a packet from the packet chain.
frames_handler * frames
frames object
void consume_packet()
This function sends the packet in in_use (oldest) to frames handler object.
bool quiet
no informational info is printed when true
ui32 num_packets
maximum number of packets in packet_store
rtp_packet * in_use
start of used packet chain
ui32 last_seq_num
the last observed sequence number
ui32 lost_packets
number of lost packets – just statistics
void flush()
This function is not used, and therefore it is not clear how to use it.
rtp_packet * avail
start of available packets chain
void init(bool quiet, ui32 num_packets, frames_handler *frames)
call this to initialize packets_handler
Implements a pool of threads, and can queue tasks.
Definition: ojph_threads.h:98
void add_task(worker_thread_base *task)
Adds a task to the thread pool.
size_t get_num_threads()
Returns the number of threads in the thread pool.
Definition: ojph_threads.h:129
static bool is_smaller32(ui32 a, ui32 b)
static bool is_smaller24(ui32 a, ui32 b)
static bool is_greater32(ui32 a, ui32 b)
static bool is_greater24(ui32 a, ui32 b)
static ui32 clip_seq_num(ui32 n)
uint32_t ui32
Definition: ojph_defs.h:54
Saves a j2k frame to disk without decoding.
void init(stex_file *file, const char *name_template)
call this function to initialize its members
inteprets RTP header and payload, and holds received packets.
ui32 num_bytes
number of bytes
rtp_packet * next
used for linking packets
void init(rtp_packet *next)
Call this to link packets.
holds in memory j2k codestream together with other info
frames_handler * parent
the object holding this frame
ojph::mem_outfile f
holds in-memory j2k codestream
ui32 frame_idx
frame number in the sequence
void init(frames_handler *parent, stex_file *next, j2k_frame_storer *storer, const char *name_template)
call this function to initialize stex_file
void notify_file_completion()
other threads can call this function to signal completion of processing.
ui32 last_seen_seq
the last seen RTP sequence number
stex_file * next
used to create files chain
ui32 time_stamp
time stamp at which this file must be displayed
std::atomic_int done
saving is completed when 0 is reached
j2k_frame_storer * storer
stores a j2k frame using another thread