| /////////////////////////////////////////////////////////////////////////////// |
| // |
| /// \file stream_encoder_mt.c |
| /// \brief Multithreaded .xz Stream encoder |
| // |
| // Author: Lasse Collin |
| // |
| // This file has been put into the public domain. |
| // You can do whatever you want with this file. |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include "filter_encoder.h" |
| #include "easy_preset.h" |
| #include "block_encoder.h" |
| #include "index_encoder.h" |
| #include "outqueue.h" |
| |
| |
| /// Maximum supported block size. This makes it simpler to prevent integer |
| /// overflows if we are given unusually large block size. |
| #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) |
| |
| |
| typedef enum { |
| /// Waiting for work. |
| THR_IDLE, |
| |
| /// Encoding is in progress. |
| THR_RUN, |
| |
| /// Encoding is in progress but no more input data will |
| /// be read. |
| THR_FINISH, |
| |
| /// The main thread wants the thread to stop whatever it was doing |
| /// but not exit. |
| THR_STOP, |
| |
| /// The main thread wants the thread to exit. We could use |
| /// cancellation but since there's stopped anyway, this is lazier. |
| THR_EXIT, |
| |
| } worker_state; |
| |
| |
| typedef struct worker_thread_s worker_thread; |
| struct worker_thread_s { |
| worker_state state; |
| |
| /// Input buffer of coder->block_size bytes. The main thread will |
| /// put new input into this and update in_size accordingly. Once |
| /// no more input is coming, state will be set to THR_FINISH. |
| uint8_t *in; |
| |
| /// Amount of data available in the input buffer. This is modified |
| /// only by the main thread. |
| size_t in_size; |
| |
| /// Output buffer for this thread. This is set by the main |
| /// thread every time a new Block is started with this thread |
| /// structure. |
| lzma_outbuf *outbuf; |
| |
| /// Pointer to the main structure is needed when putting this |
| /// thread back to the stack of free threads. |
| lzma_coder *coder; |
| |
| /// The allocator is set by the main thread. Since a copy of the |
| /// pointer is kept here, the application must not change the |
| /// allocator before calling lzma_end(). |
| const lzma_allocator *allocator; |
| |
| /// Amount of uncompressed data that has already been compressed. |
| uint64_t progress_in; |
| |
| /// Amount of compressed data that is ready. |
| uint64_t progress_out; |
| |
| /// Block encoder |
| lzma_next_coder block_encoder; |
| |
| /// Compression options for this Block |
| lzma_block block_options; |
| |
| /// Next structure in the stack of free worker threads. |
| worker_thread *next; |
| |
| pthread_mutex_t mutex; |
| pthread_cond_t cond; |
| |
| /// The ID of this thread is used to join the thread |
| /// when it's not needed anymore. |
| pthread_t thread_id; |
| }; |
| |
| |
| struct lzma_coder_s { |
| enum { |
| SEQ_STREAM_HEADER, |
| SEQ_BLOCK, |
| SEQ_INDEX, |
| SEQ_STREAM_FOOTER, |
| } sequence; |
| |
| /// Start a new Block every block_size bytes of input unless |
| /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. |
| size_t block_size; |
| |
| /// The filter chain currently in use |
| lzma_filter filters[LZMA_FILTERS_MAX + 1]; |
| |
| |
| /// Index to hold sizes of the Blocks |
| lzma_index *index; |
| |
| /// Index encoder |
| lzma_next_coder index_encoder; |
| |
| |
| /// Stream Flags for encoding the Stream Header and Stream Footer. |
| lzma_stream_flags stream_flags; |
| |
| /// Buffer to hold Stream Header and Stream Footer. |
| uint8_t header[LZMA_STREAM_HEADER_SIZE]; |
| |
| /// Read position in header[] |
| size_t header_pos; |
| |
| |
| /// Output buffer queue for compressed data |
| lzma_outq outq; |
| |
| |
| /// True if wait_max is used. |
| bool has_timeout; |
| |
| /// Maximum wait time if cannot use all the input and cannot |
| /// fill the output buffer. |
| struct timespec wait_max; |
| |
| |
| /// Error code from a worker thread |
| lzma_ret thread_error; |
| |
| /// Array of allocated thread-specific structures |
| worker_thread *threads; |
| |
| /// Number of structures in "threads" above. This is also the |
| /// number of threads that will be created at maximum. |
| uint32_t threads_max; |
| |
| /// Number of thread structures that have been initialized, and |
| /// thus the number of worker threads actually created so far. |
| uint32_t threads_initialized; |
| |
| /// Stack of free threads. When a thread finishes, it puts itself |
| /// back into this stack. This starts as empty because threads |
| /// are created only when actually needed. |
| worker_thread *threads_free; |
| |
| /// The most recent worker thread to which the main thread writes |
| /// the new input from the application. |
| worker_thread *thr; |
| |
| |
| /// Amount of uncompressed data in Blocks that have already |
| /// been finished. |
| uint64_t progress_in; |
| |
| /// Amount of compressed data in Stream Header + Blocks that |
| /// have already been finished. |
| uint64_t progress_out; |
| |
| |
| pthread_mutex_t mutex; |
| mythread_cond cond; |
| }; |
| |
| |
| /// Tell the main thread that something has gone wrong. |
| static void |
| worker_error(worker_thread *thr, lzma_ret ret) |
| { |
| assert(ret != LZMA_OK); |
| assert(ret != LZMA_STREAM_END); |
| |
| mythread_sync(thr->coder->mutex) { |
| if (thr->coder->thread_error == LZMA_OK) |
| thr->coder->thread_error = ret; |
| |
| mythread_cond_signal(&thr->coder->cond); |
| } |
| |
| return; |
| } |
| |
| |
| static worker_state |
| worker_encode(worker_thread *thr, worker_state state) |
| { |
| assert(thr->progress_in == 0); |
| assert(thr->progress_out == 0); |
| |
| // Set the Block options. |
| thr->block_options = (lzma_block){ |
| .version = 0, |
| .check = thr->coder->stream_flags.check, |
| .compressed_size = thr->coder->outq.buf_size_max, |
| .uncompressed_size = thr->coder->block_size, |
| |
| // TODO: To allow changing the filter chain, the filters |
| // array must be copied to each worker_thread. |
| .filters = thr->coder->filters, |
| }; |
| |
| // Calculate maximum size of the Block Header. This amount is |
| // reserved in the beginning of the buffer so that Block Header |
| // along with Compressed Size and Uncompressed Size can be |
| // written there. |
| lzma_ret ret = lzma_block_header_size(&thr->block_options); |
| if (ret != LZMA_OK) { |
| worker_error(thr, ret); |
| return THR_STOP; |
| } |
| |
| // Initialize the Block encoder. |
| ret = lzma_block_encoder_init(&thr->block_encoder, |
| thr->allocator, &thr->block_options); |
| if (ret != LZMA_OK) { |
| worker_error(thr, ret); |
| return THR_STOP; |
| } |
| |
| size_t in_pos = 0; |
| size_t in_size = 0; |
| |
| thr->outbuf->size = thr->block_options.header_size; |
| const size_t out_size = thr->coder->outq.buf_size_max; |
| |
| do { |
| mythread_sync(thr->mutex) { |
| // Store in_pos and out_pos into *thr so that |
| // an application may read them via |
| // lzma_get_progress() to get progress information. |
| // |
| // NOTE: These aren't updated when the encoding |
| // finishes. Instead, the final values are taken |
| // later from thr->outbuf. |
| thr->progress_in = in_pos; |
| thr->progress_out = thr->outbuf->size; |
| |
| while (in_size == thr->in_size |
| && thr->state == THR_RUN) |
| pthread_cond_wait(&thr->cond, &thr->mutex); |
| |
| state = thr->state; |
| in_size = thr->in_size; |
| } |
| |
| // Return if we were asked to stop or exit. |
| if (state >= THR_STOP) |
| return state; |
| |
| lzma_action action = state == THR_FINISH |
| ? LZMA_FINISH : LZMA_RUN; |
| |
| // Limit the amount of input given to the Block encoder |
| // at once. This way this thread can react fairly quickly |
| // if the main thread wants us to stop or exit. |
| static const size_t in_chunk_max = 16384; |
| size_t in_limit = in_size; |
| if (in_size - in_pos > in_chunk_max) { |
| in_limit = in_pos + in_chunk_max; |
| action = LZMA_RUN; |
| } |
| |
| ret = thr->block_encoder.code( |
| thr->block_encoder.coder, thr->allocator, |
| thr->in, &in_pos, in_limit, thr->outbuf->buf, |
| &thr->outbuf->size, out_size, action); |
| } while (ret == LZMA_OK); |
| |
| if (ret != LZMA_STREAM_END) { |
| worker_error(thr, ret); |
| return THR_STOP; |
| } |
| |
| assert(state == THR_FINISH); |
| |
| // Encode the Block Header. By doing it after the compression, |
| // we can store the Compressed Size and Uncompressed Size fields. |
| ret = lzma_block_header_encode(&thr->block_options, thr->outbuf->buf); |
| if (ret != LZMA_OK) { |
| worker_error(thr, ret); |
| return THR_STOP; |
| } |
| |
| // Set the size information that will be read by the main thread |
| // to write the Index field. |
| thr->outbuf->unpadded_size |
| = lzma_block_unpadded_size(&thr->block_options); |
| assert(thr->outbuf->unpadded_size != 0); |
| thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; |
| |
| return THR_FINISH; |
| } |
| |
| |
| static void * |
| worker_start(void *thr_ptr) |
| { |
| worker_thread *thr = thr_ptr; |
| worker_state state = THR_IDLE; // Init to silence a warning |
| |
| while (true) { |
| // Wait for work. |
| mythread_sync(thr->mutex) { |
| while (true) { |
| // The thread is already idle so if we are |
| // requested to stop, just set the state. |
| if (thr->state == THR_STOP) { |
| thr->state = THR_IDLE; |
| pthread_cond_signal(&thr->cond); |
| } |
| |
| state = thr->state; |
| if (state != THR_IDLE) |
| break; |
| |
| pthread_cond_wait(&thr->cond, &thr->mutex); |
| } |
| } |
| |
| assert(state != THR_IDLE); |
| assert(state != THR_STOP); |
| |
| if (state <= THR_FINISH) |
| state = worker_encode(thr, state); |
| |
| if (state == THR_EXIT) |
| break; |
| |
| // Mark the thread as idle unless the main thread has |
| // told us to exit. Signal is needed for the case |
| // where the main thread is waiting for the threads to stop. |
| mythread_sync(thr->mutex) { |
| if (thr->state != THR_EXIT) { |
| thr->state = THR_IDLE; |
| pthread_cond_signal(&thr->cond); |
| } |
| } |
| |
| mythread_sync(thr->coder->mutex) { |
| // Mark the output buffer as finished if |
| // no errors occurred. |
| thr->outbuf->finished = state == THR_FINISH; |
| |
| // Update the main progress info. |
| thr->coder->progress_in |
| += thr->outbuf->uncompressed_size; |
| thr->coder->progress_out += thr->outbuf->size; |
| thr->progress_in = 0; |
| thr->progress_out = 0; |
| |
| // Return this thread to the stack of free threads. |
| thr->next = thr->coder->threads_free; |
| thr->coder->threads_free = thr; |
| |
| mythread_cond_signal(&thr->coder->cond); |
| } |
| } |
| |
| // Exiting, free the resources. |
| pthread_mutex_destroy(&thr->mutex); |
| pthread_cond_destroy(&thr->cond); |
| |
| lzma_next_end(&thr->block_encoder, thr->allocator); |
| lzma_free(thr->in, thr->allocator); |
| return NULL; |
| } |
| |
| |
| /// Make the threads stop but not exit. Optionally wait for them to stop. |
| static void |
| threads_stop(lzma_coder *coder, bool wait) |
| { |
| // Tell the threads to stop. |
| for (uint32_t i = 0; i < coder->threads_initialized; ++i) { |
| mythread_sync(coder->threads[i].mutex) { |
| coder->threads[i].state = THR_STOP; |
| pthread_cond_signal(&coder->threads[i].cond); |
| } |
| } |
| |
| if (!wait) |
| return; |
| |
| // Wait for the threads to settle in the idle state. |
| for (uint32_t i = 0; i < coder->threads_initialized; ++i) { |
| mythread_sync(coder->threads[i].mutex) { |
| while (coder->threads[i].state != THR_IDLE) |
| pthread_cond_wait(&coder->threads[i].cond, |
| &coder->threads[i].mutex); |
| } |
| } |
| |
| return; |
| } |
| |
| |
| /// Stop the threads and free the resources associated with them. |
| /// Wait until the threads have exited. |
| static void |
| threads_end(lzma_coder *coder, const lzma_allocator *allocator) |
| { |
| for (uint32_t i = 0; i < coder->threads_initialized; ++i) { |
| mythread_sync(coder->threads[i].mutex) { |
| coder->threads[i].state = THR_EXIT; |
| pthread_cond_signal(&coder->threads[i].cond); |
| } |
| } |
| |
| for (uint32_t i = 0; i < coder->threads_initialized; ++i) { |
| int ret = pthread_join(coder->threads[i].thread_id, NULL); |
| assert(ret == 0); |
| (void)ret; |
| } |
| |
| lzma_free(coder->threads, allocator); |
| return; |
| } |
| |
| |
| /// Initialize a new worker_thread structure and create a new thread. |
| static lzma_ret |
| initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator) |
| { |
| worker_thread *thr = &coder->threads[coder->threads_initialized]; |
| |
| thr->in = lzma_alloc(coder->block_size, allocator); |
| if (thr->in == NULL) |
| return LZMA_MEM_ERROR; |
| |
| if (pthread_mutex_init(&thr->mutex, NULL)) |
| goto error_mutex; |
| |
| if (pthread_cond_init(&thr->cond, NULL)) |
| goto error_cond; |
| |
| thr->state = THR_IDLE; |
| thr->allocator = allocator; |
| thr->coder = coder; |
| thr->progress_in = 0; |
| thr->progress_out = 0; |
| thr->block_encoder = LZMA_NEXT_CODER_INIT; |
| |
| if (mythread_create(&thr->thread_id, &worker_start, thr)) |
| goto error_thread; |
| |
| ++coder->threads_initialized; |
| coder->thr = thr; |
| |
| return LZMA_OK; |
| |
| error_thread: |
| pthread_cond_destroy(&thr->cond); |
| |
| error_cond: |
| pthread_mutex_destroy(&thr->mutex); |
| |
| error_mutex: |
| lzma_free(thr->in, allocator); |
| return LZMA_MEM_ERROR; |
| } |
| |
| |
| static lzma_ret |
| get_thread(lzma_coder *coder, const lzma_allocator *allocator) |
| { |
| // If there are no free output subqueues, there is no |
| // point to try getting a thread. |
| if (!lzma_outq_has_buf(&coder->outq)) |
| return LZMA_OK; |
| |
| // If there is a free structure on the stack, use it. |
| mythread_sync(coder->mutex) { |
| if (coder->threads_free != NULL) { |
| coder->thr = coder->threads_free; |
| coder->threads_free = coder->threads_free->next; |
| } |
| } |
| |
| if (coder->thr == NULL) { |
| // If there are no uninitialized structures left, return. |
| if (coder->threads_initialized == coder->threads_max) |
| return LZMA_OK; |
| |
| // Initialize a new thread. |
| return_if_error(initialize_new_thread(coder, allocator)); |
| } |
| |
| // Reset the parts of the thread state that have to be done |
| // in the main thread. |
| mythread_sync(coder->thr->mutex) { |
| coder->thr->state = THR_RUN; |
| coder->thr->in_size = 0; |
| coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); |
| pthread_cond_signal(&coder->thr->cond); |
| } |
| |
| return LZMA_OK; |
| } |
| |
| |
| static lzma_ret |
| stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator, |
| const uint8_t *restrict in, size_t *restrict in_pos, |
| size_t in_size, lzma_action action) |
| { |
| while (*in_pos < in_size |
| || (coder->thr != NULL && action != LZMA_RUN)) { |
| if (coder->thr == NULL) { |
| // Get a new thread. |
| const lzma_ret ret = get_thread(coder, allocator); |
| if (coder->thr == NULL) |
| return ret; |
| } |
| |
| // Copy the input data to thread's buffer. |
| size_t thr_in_size = coder->thr->in_size; |
| lzma_bufcpy(in, in_pos, in_size, coder->thr->in, |
| &thr_in_size, coder->block_size); |
| |
| // Tell the Block encoder to finish if |
| // - it has got block_size bytes of input; or |
| // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, |
| // or LZMA_FULL_BARRIER was used. |
| // |
| // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. |
| const bool finish = thr_in_size == coder->block_size |
| || (*in_pos == in_size && action != LZMA_RUN); |
| |
| bool block_error = false; |
| |
| mythread_sync(coder->thr->mutex) { |
| if (coder->thr->state == THR_IDLE) { |
| // Something has gone wrong with the Block |
| // encoder. It has set coder->thread_error |
| // which we will read a few lines later. |
| block_error = true; |
| } else { |
| // Tell the Block encoder its new amount |
| // of input and update the state if needed. |
| coder->thr->in_size = thr_in_size; |
| |
| if (finish) |
| coder->thr->state = THR_FINISH; |
| |
| pthread_cond_signal(&coder->thr->cond); |
| } |
| } |
| |
| if (block_error) { |
| lzma_ret ret; |
| |
| mythread_sync(coder->mutex) { |
| ret = coder->thread_error; |
| } |
| |
| return ret; |
| } |
| |
| if (finish) |
| coder->thr = NULL; |
| } |
| |
| return LZMA_OK; |
| } |
| |
| |
| /// Wait until more input can be consumed, more output can be read, or |
| /// an optional timeout is reached. |
| static bool |
| wait_for_work(lzma_coder *coder, struct timespec *wait_abs, |
| bool *has_blocked, bool has_input) |
| { |
| if (coder->has_timeout && !*has_blocked) { |
| // Every time when stream_encode_mt() is called via |
| // lzma_code(), *has_block starts as false. We set it |
| // to true here and calculate the absolute time when |
| // we must return if there's nothing to do. |
| // |
| // The idea of *has_blocked is to avoid unneeded calls |
| // to mythread_cond_abstime(), which may do a syscall |
| // depending on the operating system. |
| *has_blocked = true; |
| *wait_abs = coder->wait_max; |
| mythread_cond_abstime(&coder->cond, wait_abs); |
| } |
| |
| bool timed_out = false; |
| |
| mythread_sync(coder->mutex) { |
| // There are four things that we wait. If one of them |
| // becomes possible, we return. |
| // - If there is input left, we need to get a free |
| // worker thread and an output buffer for it. |
| // - Data ready to be read from the output queue. |
| // - A worker thread indicates an error. |
| // - Time out occurs. |
| while ((!has_input || coder->threads_free == NULL |
| || !lzma_outq_has_buf(&coder->outq)) |
| && !lzma_outq_is_readable(&coder->outq) |
| && coder->thread_error == LZMA_OK |
| && !timed_out) { |
| if (coder->has_timeout) |
| timed_out = mythread_cond_timedwait( |
| &coder->cond, &coder->mutex, |
| wait_abs) != 0; |
| else |
| mythread_cond_wait(&coder->cond, |
| &coder->mutex); |
| } |
| } |
| |
| return timed_out; |
| } |
| |
| |
| static lzma_ret |
| stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator, |
| const uint8_t *restrict in, size_t *restrict in_pos, |
| size_t in_size, uint8_t *restrict out, |
| size_t *restrict out_pos, size_t out_size, lzma_action action) |
| { |
| switch (coder->sequence) { |
| case SEQ_STREAM_HEADER: |
| lzma_bufcpy(coder->header, &coder->header_pos, |
| sizeof(coder->header), |
| out, out_pos, out_size); |
| if (coder->header_pos < sizeof(coder->header)) |
| return LZMA_OK; |
| |
| coder->header_pos = 0; |
| coder->sequence = SEQ_BLOCK; |
| |
| // Fall through |
| |
| case SEQ_BLOCK: { |
| // Initialized to silence warnings. |
| lzma_vli unpadded_size = 0; |
| lzma_vli uncompressed_size = 0; |
| lzma_ret ret = LZMA_OK; |
| |
| // These are for wait_for_work(). |
| bool has_blocked = false; |
| struct timespec wait_abs; |
| |
| while (true) { |
| mythread_sync(coder->mutex) { |
| // Check for Block encoder errors. |
| ret = coder->thread_error; |
| if (ret != LZMA_OK) { |
| assert(ret != LZMA_STREAM_END); |
| break; |
| } |
| |
| // Try to read compressed data to out[]. |
| ret = lzma_outq_read(&coder->outq, |
| out, out_pos, out_size, |
| &unpadded_size, |
| &uncompressed_size); |
| } |
| |
| if (ret == LZMA_STREAM_END) { |
| // End of Block. Add it to the Index. |
| ret = lzma_index_append(coder->index, |
| allocator, unpadded_size, |
| uncompressed_size); |
| |
| // If we didn't fill the output buffer yet, |
| // try to read more data. Maybe the next |
| // outbuf has been finished already too. |
| if (*out_pos < out_size) |
| continue; |
| } |
| |
| if (ret != LZMA_OK) { |
| // coder->thread_error was set or |
| // lzma_index_append() failed. |
| threads_stop(coder, false); |
| return ret; |
| } |
| |
| // Check if the last Block was finished. |
| if (action == LZMA_FINISH |
| && *in_pos == in_size |
| && lzma_outq_is_empty( |
| &coder->outq)) |
| break; |
| |
| // Try to give uncompressed data to a worker thread. |
| ret = stream_encode_in(coder, allocator, |
| in, in_pos, in_size, action); |
| if (ret != LZMA_OK) { |
| threads_stop(coder, false); |
| return ret; |
| } |
| |
| // Return if |
| // - we have used all the input and expect to |
| // get more input; or |
| // - the output buffer has been filled. |
| // |
| // TODO: Support flushing. |
| if ((*in_pos == in_size && action != LZMA_FINISH) |
| || *out_pos == out_size) |
| return LZMA_OK; |
| |
| // Neither in nor out has been used completely. |
| // Wait until there's something we can do. |
| if (wait_for_work(coder, &wait_abs, &has_blocked, |
| *in_pos < in_size)) |
| return LZMA_TIMED_OUT; |
| } |
| |
| // All Blocks have been encoded and the threads have stopped. |
| // Prepare to encode the Index field. |
| return_if_error(lzma_index_encoder_init( |
| &coder->index_encoder, allocator, |
| coder->index)); |
| coder->sequence = SEQ_INDEX; |
| |
| // Update the progress info to take the Index and |
| // Stream Footer into account. Those are very fast to encode |
| // so in terms of progress information they can be thought |
| // to be ready to be copied out. |
| coder->progress_out += lzma_index_size(coder->index) |
| + LZMA_STREAM_HEADER_SIZE; |
| } |
| |
| // Fall through |
| |
| case SEQ_INDEX: { |
| // Call the Index encoder. It doesn't take any input, so |
| // those pointers can be NULL. |
| const lzma_ret ret = coder->index_encoder.code( |
| coder->index_encoder.coder, allocator, |
| NULL, NULL, 0, |
| out, out_pos, out_size, LZMA_RUN); |
| if (ret != LZMA_STREAM_END) |
| return ret; |
| |
| // Encode the Stream Footer into coder->buffer. |
| coder->stream_flags.backward_size |
| = lzma_index_size(coder->index); |
| if (lzma_stream_footer_encode(&coder->stream_flags, |
| coder->header) != LZMA_OK) |
| return LZMA_PROG_ERROR; |
| |
| coder->sequence = SEQ_STREAM_FOOTER; |
| } |
| |
| // Fall through |
| |
| case SEQ_STREAM_FOOTER: |
| lzma_bufcpy(coder->header, &coder->header_pos, |
| sizeof(coder->header), |
| out, out_pos, out_size); |
| return coder->header_pos < sizeof(coder->header) |
| ? LZMA_OK : LZMA_STREAM_END; |
| } |
| |
| assert(0); |
| return LZMA_PROG_ERROR; |
| } |
| |
| |
| static void |
| stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator) |
| { |
| // Threads must be killed before the output queue can be freed. |
| threads_end(coder, allocator); |
| lzma_outq_end(&coder->outq, allocator); |
| |
| for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) |
| lzma_free(coder->filters[i].options, allocator); |
| |
| lzma_next_end(&coder->index_encoder, allocator); |
| lzma_index_end(coder->index, allocator); |
| |
| mythread_cond_destroy(&coder->cond); |
| pthread_mutex_destroy(&coder->mutex); |
| |
| lzma_free(coder, allocator); |
| return; |
| } |
| |
| |
| /// Options handling for lzma_stream_encoder_mt_init() and |
| /// lzma_stream_encoder_mt_memusage() |
| static lzma_ret |
| get_options(const lzma_mt *options, lzma_options_easy *opt_easy, |
| const lzma_filter **filters, uint64_t *block_size, |
| uint64_t *outbuf_size_max) |
| { |
| // Validate some of the options. |
| if (options == NULL) |
| return LZMA_PROG_ERROR; |
| |
| if (options->flags != 0 || options->threads == 0 |
| || options->threads > LZMA_THREADS_MAX) |
| return LZMA_OPTIONS_ERROR; |
| |
| if (options->filters != NULL) { |
| // Filter chain was given, use it as is. |
| *filters = options->filters; |
| } else { |
| // Use a preset. |
| if (lzma_easy_preset(opt_easy, options->preset)) |
| return LZMA_OPTIONS_ERROR; |
| |
| *filters = opt_easy->filters; |
| } |
| |
| // Block size |
| if (options->block_size > 0) { |
| if (options->block_size > BLOCK_SIZE_MAX) |
| return LZMA_OPTIONS_ERROR; |
| |
| *block_size = options->block_size; |
| } else { |
| // Determine the Block size from the filter chain. |
| *block_size = lzma_mt_block_size(*filters); |
| if (*block_size == 0) |
| return LZMA_OPTIONS_ERROR; |
| |
| assert(*block_size <= BLOCK_SIZE_MAX); |
| } |
| |
| // Calculate the maximum amount output that a single output buffer |
| // may need to hold. This is the same as the maximum total size of |
| // a Block. |
| // |
| // FIXME: As long as the encoder keeps the whole input buffer |
| // available and doesn't start writing output before finishing |
| // the Block, it could use lzma_stream_buffer_bound() and use |
| // uncompressed LZMA2 chunks if the data doesn't compress. |
| *outbuf_size_max = *block_size + *block_size / 16 + 16384; |
| |
| return LZMA_OK; |
| } |
| |
| |
| static void |
| get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out) |
| { |
| // Lock coder->mutex to prevent finishing threads from moving their |
| // progress info from the worker_thread structure to lzma_coder. |
| mythread_sync(coder->mutex) { |
| *progress_in = coder->progress_in; |
| *progress_out = coder->progress_out; |
| |
| for (size_t i = 0; i < coder->threads_initialized; ++i) { |
| mythread_sync(coder->threads[i].mutex) { |
| *progress_in += coder->threads[i].progress_in; |
| *progress_out += coder->threads[i] |
| .progress_out; |
| } |
| } |
| } |
| |
| return; |
| } |
| |
| |
| static lzma_ret |
| stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, |
| const lzma_mt *options) |
| { |
| lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); |
| |
| // Get the filter chain. |
| lzma_options_easy easy; |
| const lzma_filter *filters; |
| uint64_t block_size; |
| uint64_t outbuf_size_max; |
| return_if_error(get_options(options, &easy, &filters, |
| &block_size, &outbuf_size_max)); |
| |
| #if SIZE_MAX < UINT64_MAX |
| if (block_size > SIZE_MAX) |
| return LZMA_MEM_ERROR; |
| #endif |
| |
| // FIXME TODO: Validate the filter chain so that we can give |
| // an error in this function instead of delaying it to the first |
| // call to lzma_code(). |
| |
| // Validate the Check ID. |
| if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) |
| return LZMA_PROG_ERROR; |
| |
| if (!lzma_check_is_supported(options->check)) |
| return LZMA_UNSUPPORTED_CHECK; |
| |
| // Allocate and initialize the base structure if needed. |
| if (next->coder == NULL) { |
| next->coder = lzma_alloc(sizeof(lzma_coder), allocator); |
| if (next->coder == NULL) |
| return LZMA_MEM_ERROR; |
| |
| // For the mutex and condition variable initializations |
| // the error handling has to be done here because |
| // stream_encoder_mt_end() doesn't know if they have |
| // already been initialized or not. |
| if (pthread_mutex_init(&next->coder->mutex, NULL)) { |
| lzma_free(next->coder, allocator); |
| next->coder = NULL; |
| return LZMA_MEM_ERROR; |
| } |
| |
| if (mythread_cond_init(&next->coder->cond)) { |
| pthread_mutex_destroy(&next->coder->mutex); |
| lzma_free(next->coder, allocator); |
| next->coder = NULL; |
| return LZMA_MEM_ERROR; |
| } |
| |
| next->code = &stream_encode_mt; |
| next->end = &stream_encoder_mt_end; |
| next->get_progress = &get_progress; |
| // next->update = &stream_encoder_mt_update; |
| |
| next->coder->filters[0].id = LZMA_VLI_UNKNOWN; |
| next->coder->index_encoder = LZMA_NEXT_CODER_INIT; |
| next->coder->index = NULL; |
| memzero(&next->coder->outq, sizeof(next->coder->outq)); |
| next->coder->threads = NULL; |
| next->coder->threads_max = 0; |
| next->coder->threads_initialized = 0; |
| } |
| |
| // Basic initializations |
| next->coder->sequence = SEQ_STREAM_HEADER; |
| next->coder->block_size = (size_t)(block_size); |
| next->coder->thread_error = LZMA_OK; |
| next->coder->thr = NULL; |
| |
| // Allocate the thread-specific base structures. |
| assert(options->threads > 0); |
| if (next->coder->threads_max != options->threads) { |
| threads_end(next->coder, allocator); |
| |
| next->coder->threads = NULL; |
| next->coder->threads_max = 0; |
| |
| next->coder->threads_initialized = 0; |
| next->coder->threads_free = NULL; |
| |
| next->coder->threads = lzma_alloc( |
| options->threads * sizeof(worker_thread), |
| allocator); |
| if (next->coder->threads == NULL) |
| return LZMA_MEM_ERROR; |
| |
| next->coder->threads_max = options->threads; |
| } else { |
| // Reuse the old structures and threads. Tell the running |
| // threads to stop and wait until they have stopped. |
| threads_stop(next->coder, true); |
| } |
| |
| // Output queue |
| return_if_error(lzma_outq_init(&next->coder->outq, allocator, |
| outbuf_size_max, options->threads)); |
| |
| // Timeout |
| if (options->timeout > 0) { |
| next->coder->wait_max.tv_sec = options->timeout / 1000; |
| next->coder->wait_max.tv_nsec |
| = (options->timeout % 1000) * 1000000L; |
| next->coder->has_timeout = true; |
| } else { |
| next->coder->has_timeout = false; |
| } |
| |
| // Free the old filter chain and copy the new one. |
| for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) |
| lzma_free(next->coder->filters[i].options, allocator); |
| |
| return_if_error(lzma_filters_copy(options->filters, |
| next->coder->filters, allocator)); |
| |
| // Index |
| lzma_index_end(next->coder->index, allocator); |
| next->coder->index = lzma_index_init(allocator); |
| if (next->coder->index == NULL) |
| return LZMA_MEM_ERROR; |
| |
| // Stream Header |
| next->coder->stream_flags.version = 0; |
| next->coder->stream_flags.check = options->check; |
| return_if_error(lzma_stream_header_encode( |
| &next->coder->stream_flags, next->coder->header)); |
| |
| next->coder->header_pos = 0; |
| |
| // Progress info |
| next->coder->progress_in = 0; |
| next->coder->progress_out = LZMA_STREAM_HEADER_SIZE; |
| |
| return LZMA_OK; |
| } |
| |
| |
| extern LZMA_API(lzma_ret) |
| lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) |
| { |
| lzma_next_strm_init(stream_encoder_mt_init, strm, options); |
| |
| strm->internal->supported_actions[LZMA_RUN] = true; |
| // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; |
| // strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; |
| // strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; |
| strm->internal->supported_actions[LZMA_FINISH] = true; |
| |
| return LZMA_OK; |
| } |
| |
| |
| // This function name is a monster but it's consistent with the older |
| // monster names. :-( 31 chars is the max that C99 requires so in that |
| // sense it's not too long. ;-) |
| extern LZMA_API(uint64_t) |
| lzma_stream_encoder_mt_memusage(const lzma_mt *options) |
| { |
| lzma_options_easy easy; |
| const lzma_filter *filters; |
| uint64_t block_size; |
| uint64_t outbuf_size_max; |
| |
| if (get_options(options, &easy, &filters, &block_size, |
| &outbuf_size_max) != LZMA_OK) |
| return UINT64_MAX; |
| |
| // Memory usage of the input buffers |
| const uint64_t inbuf_memusage = options->threads * block_size; |
| |
| // Memory usage of the filter encoders |
| uint64_t filters_memusage |
| = lzma_raw_encoder_memusage(options->filters); |
| if (filters_memusage == UINT64_MAX) |
| return UINT64_MAX; |
| |
| filters_memusage *= options->threads; |
| |
| // Memory usage of the output queue |
| const uint64_t outq_memusage = lzma_outq_memusage( |
| outbuf_size_max, options->threads); |
| if (outq_memusage == UINT64_MAX) |
| return UINT64_MAX; |
| |
| // Sum them with overflow checking. |
| uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder) |
| + options->threads * sizeof(worker_thread); |
| |
| if (UINT64_MAX - total_memusage < inbuf_memusage) |
| return UINT64_MAX; |
| |
| total_memusage += inbuf_memusage; |
| |
| if (UINT64_MAX - total_memusage < filters_memusage) |
| return UINT64_MAX; |
| |
| total_memusage += filters_memusage; |
| |
| if (UINT64_MAX - total_memusage < outq_memusage) |
| return UINT64_MAX; |
| |
| return total_memusage + outq_memusage; |
| } |