aboutsummaryrefslogtreecommitdiff
path: root/C/MtDec.h
blob: 9864cc87412410fd4072b04300b223f7ac94c3b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/* MtDec.h -- Multi-thread Decoder
2018-07-04 : Igor Pavlov : Public domain */

#ifndef __MT_DEC_H
#define __MT_DEC_H

#include "7zTypes.h"

#ifndef _7ZIP_ST
#include "Threads.h"
#endif

EXTERN_C_BEGIN

#ifndef _7ZIP_ST

#ifndef _7ZIP_ST
  #define MTDEC__THREADS_MAX 32
#else
  #define MTDEC__THREADS_MAX 1
#endif


typedef struct
{
  ICompressProgress *progress;
  SRes res;
  UInt64 totalInSize;
  UInt64 totalOutSize;
  CCriticalSection cs;
} CMtProgress;

void MtProgress_Init(CMtProgress *p, ICompressProgress *progress);
SRes MtProgress_Progress_ST(CMtProgress *p);
SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize);
SRes MtProgress_GetError(CMtProgress *p);
void MtProgress_SetError(CMtProgress *p, SRes res);

struct _CMtDec;

typedef struct
{
  struct _CMtDec *mtDec;
  unsigned index;
  void *inBuf;

  size_t inDataSize_Start; // size of input data in start block
  UInt64 inDataSize;       // total size of input data in all blocks

  CThread thread;
  CAutoResetEvent canRead;
  CAutoResetEvent canWrite;
  void  *allocaPtr;
} CMtDecThread;

void MtDecThread_FreeInBufs(CMtDecThread *t);


typedef enum
{
  MTDEC_PARSE_CONTINUE, // continue this block with more input data
  MTDEC_PARSE_OVERFLOW, // MT buffers overflow, need switch to single-thread
  MTDEC_PARSE_NEW,      // new block
  MTDEC_PARSE_END       // end of block threading. But we still can return to threading after Write(&needContinue)
} EMtDecParseState;

typedef struct
{
  // in
  int startCall;
  const Byte *src;
  size_t srcSize;
      // in  : (srcSize == 0) is allowed
      // out : it's allowed to return less that actually was used ?
  int srcFinished;

  // out
  EMtDecParseState state;
  BoolInt canCreateNewThread;
  UInt64 outPos; // check it (size_t)
} CMtDecCallbackInfo;


typedef struct
{
  void (*Parse)(void *p, unsigned coderIndex, CMtDecCallbackInfo *ci);
  
  // PreCode() and Code():
  // (SRes_return_result != SZ_OK) means stop decoding, no need another blocks
  SRes (*PreCode)(void *p, unsigned coderIndex);
  SRes (*Code)(void *p, unsigned coderIndex,
      const Byte *src, size_t srcSize, int srcFinished,
      UInt64 *inCodePos, UInt64 *outCodePos, int *stop);
  // stop - means stop another Code calls


  /* Write() must be called, if Parse() was called
      set (needWrite) if
      {
         && (was not interrupted by progress)
         && (was not interrupted in previous block)
      }

    out:
      if (*needContinue), decoder still need to continue decoding with new iteration,
         even after MTDEC_PARSE_END
      if (*canRecode), we didn't flush current block data, so we still can decode current block later.
  */
  SRes (*Write)(void *p, unsigned coderIndex,
      BoolInt needWriteToStream,
      const Byte *src, size_t srcSize,
      // int srcFinished,
      BoolInt *needContinue,
      BoolInt *canRecode);
} IMtDecCallback;



typedef struct _CMtDec
{
  /* input variables */
  
  size_t inBufSize;        /* size of input block */
  unsigned numThreadsMax;
  // size_t inBlockMax;
  unsigned numThreadsMax_2;

  ISeqInStream *inStream;
  // const Byte *inData;
  // size_t inDataSize;

  ICompressProgress *progress;
  ISzAllocPtr alloc;

  IMtDecCallback *mtCallback;
  void *mtCallbackObject;

  
  /* internal variables */
  
  size_t allocatedBufsSize;

  BoolInt exitThread;
  WRes exitThreadWRes;

  UInt64 blockIndex;
  BoolInt isAllocError;
  BoolInt overflow;
  SRes threadingErrorSRes;

  BoolInt needContinue;

  // CAutoResetEvent finishedEvent;

  SRes readRes;
  SRes codeRes;

  BoolInt wasInterrupted;

  unsigned numStartedThreads_Limit;
  unsigned numStartedThreads;

  Byte *crossBlock;
  size_t crossStart;
  size_t crossEnd;
  UInt64 readProcessed;
  BoolInt readWasFinished;
  UInt64 inProcessed;

  unsigned filledThreadStart;
  unsigned numFilledThreads;

  #ifndef _7ZIP_ST
  BoolInt needInterrupt;
  UInt64 interruptIndex;
  CMtProgress mtProgress;
  CMtDecThread threads[MTDEC__THREADS_MAX];
  #endif
} CMtDec;


void MtDec_Construct(CMtDec *p);
void MtDec_Destruct(CMtDec *p);

/*
MtDec_Code() returns:
  SZ_OK - in most cases
  MY_SRes_HRESULT_FROM_WRes(WRes_error) - in case of unexpected error in threading function
*/
  
SRes MtDec_Code(CMtDec *p);
Byte *MtDec_GetCrossBuff(CMtDec *p);

int MtDec_PrepareRead(CMtDec *p);
const Byte *MtDec_Read(CMtDec *p, size_t *inLim);

#endif

EXTERN_C_END

#endif