00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "stdafx.h"
00032 #include "HTTPStreamingFileSource.h"
00033
00034
00035 HTTPStreamingFileSource::HTTPStreamingFileSource(void)
00036 : mBufferLock(NULL)
00037 , mIsChunked(false)
00038 , mIsFirstChunk(true)
00039 , mChunkRemains(0)
00040 , mNumLeftovers(0)
00041 , mCurrentAbsoluteReadPosition(0)
00042 , mMemoryBuffer(NULL)
00043 , mContentLength(-1)
00044 , mIsBufferFilling(false)
00045 , mFirstPass(true)
00046 , mStreamStartBuffer(NULL)
00047 , mStreamStartBufferLength(0)
00048 , mApparentReadPosition(0)
00049 {
00050 mBufferLock = new CCritSec;
00051 #ifdef OGGCODECS_LOGGING
00052 debugLog.open("d:\\zen\\logs\\htttp.log", ios_base::out | ios_base::app);
00053 fileDump.open("d:\\zen\\logs\\filedump.ogg", ios_base::out|ios_base::binary);
00054 rawDump.open("D:\\zen\\logs\\rawdump.out", ios_base::out|ios_base::binary);
00055 #endif
00056
00057 mInterBuff = new unsigned char[RECV_BUFF_SIZE* 2];
00058 mMemoryBuffer = new CircularBuffer(MEMORY_BUFFER_SIZE);
00059 mStreamStartBuffer = new unsigned char[STREAM_START_BUFFER_SIZE];
00060
00061 }
00062
00063 HTTPStreamingFileSource::~HTTPStreamingFileSource(void)
00064 {
00065
00066 close();
00067
00068 #ifdef OGGCODECS_LOGGING
00069 debugLog.close();
00070 fileDump.close();
00071 rawDump.close();
00072 #endif
00073 delete mBufferLock;
00074 delete[] mInterBuff;
00075
00076 delete mMemoryBuffer;
00077 delete[] mStreamStartBuffer;
00078 }
00079
00080 void HTTPStreamingFileSource::unChunk(unsigned char* inBuff, unsigned long inNumBytes)
00081 {
00082
00083
00084 ASSERT(inNumBytes > 2);
00085 rawDump.write((char*)inBuff, inNumBytes);
00086 debugLog<<"UnChunk"<<endl;
00087 unsigned long locNumBytesLeft = inNumBytes;
00088
00089 memcpy((void*)(mInterBuff + mNumLeftovers), (const void*)inBuff, inNumBytes);
00090 locNumBytesLeft += mNumLeftovers;
00091 mNumLeftovers = 0;
00092 unsigned char* locWorkingBuffPtr = mInterBuff;
00093
00094 debugLog<<"inNumBytes = "<<inNumBytes<<endl;
00095
00096 while (locNumBytesLeft > 8) {
00097 debugLog<<"---"<<endl;
00098 debugLog<<"Bytes left = "<<locNumBytesLeft<<endl;
00099 debugLog<<"ChunkRemaining = "<<mChunkRemains<<endl;
00100
00101 if (mChunkRemains == 0) {
00102 debugLog<<"Zero bytes of chunk remains"<<endl;
00103
00104
00105 string locTemp;
00106
00107 if (mIsFirstChunk) {
00108 debugLog<<"+++++++++++++++ It's the first chunk ++++++++++++"<<endl;
00109 mIsFirstChunk = false;
00110 locTemp = (char*)locWorkingBuffPtr;
00111 } else {
00112 debugLog<<"Not the first chunk"<<endl;
00113 debugLog<<"Skip bytes = "<<(int)locWorkingBuffPtr[0]<<(int)locWorkingBuffPtr[1]<<endl;
00114 locTemp = (char*)(locWorkingBuffPtr + 2);
00115 locWorkingBuffPtr+=2;
00116 locNumBytesLeft -= 2;
00117 }
00118
00119
00120
00121
00122
00123
00124
00125 size_t locChunkSizePos = locTemp.find("\r\n");
00126
00127
00128 if (locChunkSizePos != string::npos) {
00129 debugLog<<"Found the size bytes "<<endl;
00130
00131 string locChunkSizeStr = locTemp.substr(0, locChunkSizePos);
00132 debugLog<<"!!!!! Sizing bytes " << locChunkSizeStr<<endl;
00133 char* locDummyPtr = NULL;
00134
00135
00136 mChunkRemains = strtol(locChunkSizeStr.c_str(), &locDummyPtr, 16);
00137
00138
00139
00140 unsigned long locGuffSize = (unsigned long)(locChunkSizeStr.size() + 2);
00141 locWorkingBuffPtr += locGuffSize;
00142 locNumBytesLeft -= locGuffSize;
00143 } else {
00144 debugLog<<"******************* FAILED TO FIND SIZE BYTES "<<endl;
00145
00146
00147 }
00148 }
00149
00150
00151 if (mChunkRemains == 0) {
00152 debugLog<<" ??? EOF ???"<<endl;
00153 return;
00154 }
00155
00156 debugLog<<"locNumBytesLeft = "<<locNumBytesLeft<<endl;
00157 debugLog<<"mChunkRemains = "<<mChunkRemains<<endl;
00158
00159 if (locNumBytesLeft < mChunkRemains) {
00160
00161
00162 mMemoryBuffer->write((const unsigned char*)locWorkingBuffPtr, locNumBytesLeft );
00163 fileDump.write((char*)locWorkingBuffPtr, locNumBytesLeft);
00164 locWorkingBuffPtr += locNumBytesLeft;
00165 mChunkRemains -= locNumBytesLeft;
00166 locNumBytesLeft = 0;
00167 } else {
00168
00169 mMemoryBuffer->write((const unsigned char*)locWorkingBuffPtr, mChunkRemains );
00170 fileDump.write((char*)locWorkingBuffPtr, mChunkRemains);
00171 locWorkingBuffPtr += mChunkRemains;
00172 locNumBytesLeft -= mChunkRemains;
00173 mChunkRemains = 0;
00174 }
00175
00176 }
00177
00178 if (locNumBytesLeft != 0) {
00179 debugLog<<"There is a non- zero amount of bytes leftover... buffer them up for next time..."<<endl;
00180 memcpy((void*)mInterBuff, (const void*)locWorkingBuffPtr, locNumBytesLeft);
00181 mNumLeftovers = locNumBytesLeft;
00182 }
00183
00184
00185 if (strcmp((const char*)mInterBuff, "1000") == 0) {
00186 debugLog<<"---- Probably failure point"<<endl;
00187 int x= x;
00188 }
00189 }
00190 void HTTPStreamingFileSource::DataProcessLoop() {
00191
00192 int locNumRead = 0;
00193 char* locBuff = NULL;
00194 DWORD locCommand = 0;
00195 bool locSeenAny = false;
00196 debugLog<<"Starting dataprocessloop"<<endl;
00197
00198 mIsBufferFilling = false;
00199
00200 locBuff = new char[RECV_BUFF_SIZE];
00201
00202 while(true) {
00203 if (mMemoryBuffer->numBytesAvail() <= MEMORY_BUFFER_LOW_TIDE) {
00204
00205 if(CheckRequest(&locCommand) == TRUE) {
00206 if (GetRequest() == THREAD_EXIT) {
00207 debugLog<<"Thread Data Process loop received breakout signal..."<<endl;
00208 delete[] locBuff;
00209 return;
00210 } else {
00211
00212 Reply(S_OK);
00213 }
00214 }
00215 } else {
00216
00217
00218 mIsBufferFilling = false;
00219 if (GetRequest() == THREAD_EXIT) {
00220 debugLog<<"Thread Data Process loop received breakout signal..."<<endl;
00221 delete[] locBuff;
00222 return;
00223 } else {
00224 Reply(S_OK);
00225 }
00226 }
00227
00228 mIsBufferFilling = true;
00229
00230
00231
00232
00233
00234
00235 locNumRead = recv(mSocket, locBuff, RECV_BUFF_SIZE, 0);
00236
00237 if (locNumRead == SOCKET_ERROR) {
00238 int locErr = WSAGetLastError();
00239 debugLog<<"Socket error receiving - Err No = "<<locErr<<endl;
00240 mWasError = true;
00241 break;
00242 }
00243
00244 if (locNumRead == 0) {
00245 debugLog<<"Read last bytes..."<<endl;
00246 mIsEOF = true;
00247 delete[] locBuff;
00248 return;
00249 }
00250
00251 {
00252 CAutoLock locLock(mBufferLock);
00253
00254 if (mSeenResponse) {
00255
00256
00257 if (mIsChunked) {
00258 unChunk((unsigned char*)locBuff, locNumRead);
00259 } else {
00260 mMemoryBuffer->write((const unsigned char*)locBuff, locNumRead);
00261 }
00262
00263
00264
00265 } else {
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 string locTemp = locBuff;
00276
00277 size_t locPos = locTemp.find("\r\n\r\n");
00278 if (locPos != string::npos) {
00279
00280
00281 mSeenResponse = true;
00282 mLastResponse = locTemp.substr(0, locPos);
00283 debugLog<<"HTTP Response:"<<endl;
00284 debugLog<<mLastResponse<<endl;
00285
00286 unsigned short locResponseCode = getHTTPResponseCode(mLastResponse);
00287
00288 mRetryAt = "";
00289 if (locResponseCode == 301) {
00290 size_t locLocPos = mLastResponse.find("Location: ");
00291 if (locLocPos != string::npos) {
00292 locLocPos += 10;
00293 size_t locEndPos = mLastResponse.find("\r", locLocPos);
00294 if (locEndPos != string::npos) {
00295 if (locEndPos > locLocPos) {
00296 mRetryAt = mLastResponse.substr(locLocPos, locEndPos - locLocPos);
00297 debugLog<<"Retry URL = "<<mRetryAt<<endl;
00298 }
00299 }
00300 }
00301
00302 debugLog<<"Setting error to true"<<endl;
00303 mIsEOF = true;
00304 mWasError = true;
00305
00306 } else if (locResponseCode >= 300) {
00307 debugLog<<"Setting error to true"<<endl;
00308 mIsEOF = true;
00309 mWasError = true;
00310
00311 } else {
00312
00313
00314
00315 mContentLength = -1;
00316 size_t locContentLengthPos = mLastResponse.find("Content-Length: ");
00317 if (locContentLengthPos != string::npos) {
00318 locContentLengthPos += 16;
00319
00320 size_t locEndPos = mLastResponse.find("\r", locContentLengthPos);
00321 if ((locEndPos != string::npos) && (locEndPos > locContentLengthPos)) {
00322 string locLengthString = mLastResponse.substr(locContentLengthPos, locEndPos - locContentLengthPos);
00323
00324
00325 try {
00326 __int64 locContentLength = StringHelper::stringToNum(locLengthString);
00327 mContentLength = locContentLength;
00328 } catch(...) {
00329 mContentLength = -1;
00330 }
00331 }
00332 }
00333 }
00334
00335 if (locTemp.find("Transfer-Encoding: chunked") != string::npos) {
00336 mIsChunked = true;
00337 }
00338
00339 char* locBuff2 = locBuff + locPos + 4;
00340 locTemp = locBuff2;
00341
00342 if (mIsChunked) {
00343 if (locNumRead - locPos - 4 > 0) {
00344 unChunk((unsigned char*)locBuff2, locNumRead - locPos - 4);
00345 }
00346 } else {
00347
00348 if (locNumRead - locPos - 4 > 0) {
00349 mMemoryBuffer->write((const unsigned char*)locBuff2, (locNumRead - (locPos + 4)));
00350 }
00351 }
00352 }
00353 }
00354 }
00355 }
00356
00357 delete[] locBuff;
00358 }
00359
00360 unsigned short HTTPStreamingFileSource::getHTTPResponseCode(string inHTTPResponse)
00361 {
00362 size_t locPos = inHTTPResponse.find(" ");
00363 if (locPos != string::npos) {
00364 string locCodeString = inHTTPResponse.substr(locPos + 1, 3);
00365 try {
00366 unsigned short locCode = (unsigned short)StringHelper::stringToNum(locCodeString);
00367 return locCode;
00368 } catch(...) {
00369 return 0;
00370 }
00371 } else {
00372 return 0;
00373 }
00374 }
00375 string HTTPStreamingFileSource::shouldRetryAt()
00376 {
00377 return mRetryAt;
00378 }
00379
00380 DWORD HTTPStreamingFileSource::ThreadProc(void) {
00381
00382 while(true) {
00383 DWORD locThreadCommand = GetRequest();
00384
00385 switch(locThreadCommand) {
00386 case THREAD_EXIT:
00387
00388 Reply(S_OK);
00389 return S_OK;
00390
00391 case THREAD_RUN:
00392
00393 Reply(S_OK);
00394 DataProcessLoop();
00395 break;
00396
00397 }
00398
00399
00400 }
00401 return S_OK;
00402 }
00403 unsigned long HTTPStreamingFileSource::seek(unsigned long inPos)
00404 {
00405
00406
00407
00408
00409
00410
00411
00412 mFirstPass = false;
00413 if (mCurrentAbsoluteReadPosition <= mStreamStartBufferLength) {
00414
00415 mApparentReadPosition = inPos;
00416 return inPos;
00417 } else {
00418
00419 mCurrentAbsoluteReadPosition = inPos;
00420
00421
00422
00423
00424
00425 if ((mContentLength != -1) || (inPos == 0)) {
00426 close();
00427 closeSocket();
00428 clear();
00429
00430 open(mSourceLocation, inPos);
00431 return inPos;
00432
00433 } else {
00434 return (unsigned long) -1;
00435 }
00436 }
00437
00438
00439
00440
00441
00442
00443 }
00444
00445
00446 void HTTPStreamingFileSource::close() {
00447
00448
00449 if (ThreadExists() == TRUE) {
00450
00451 CallWorker(THREAD_EXIT);
00452
00453 Close();
00454
00455 }
00456
00457
00458
00459
00460
00461 closeSocket();
00462 }
00463
00464 bool HTTPStreamingFileSource::startThread() {
00465 if (ThreadExists() == FALSE) {
00466 Create();
00467 }
00468 CallWorker(THREAD_RUN);
00469 return true;
00470 }
00471 bool HTTPStreamingFileSource::open(string inSourceLocation, unsigned long inStartByte) {
00472
00473
00474 mSeenResponse = false;
00475 mLastResponse = "";
00476
00477
00478 {
00479 CAutoLock locLock(mBufferLock);
00480
00482
00483
00484
00485
00486
00487
00488
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499 mMemoryBuffer->reset();
00500 }
00501
00502 bool locIsOK = setupSocket(inSourceLocation);
00503
00504 if (!locIsOK) {
00505
00506 closeSocket();
00507 return false;
00508 }
00509
00510
00511
00512
00513 httpRequest(assembleRequest(mFileName, inStartByte));
00514
00515 locIsOK = startThread();
00516
00517
00518 return locIsOK;
00519 }
00520 void HTTPStreamingFileSource::clear() {
00521
00522 debugLog<<"Setting error to false";
00523 mIsEOF = false;
00524 mWasError = false;
00525 mRetryAt = "";
00526 mIsFirstChunk = true;
00527 mChunkRemains = 0;
00528 mNumLeftovers = 0;
00529
00530
00531 mFirstPass = true;
00532
00533 mCurrentAbsoluteReadPosition = 0;
00534 mApparentReadPosition = 0;
00535
00536
00537 }
00538 bool HTTPStreamingFileSource::isError()
00539 {
00540 return mWasError;
00541 }
00542 bool HTTPStreamingFileSource::isEOF() {
00543 {
00544 CAutoLock locLock(mBufferLock);
00545 unsigned long locSizeBuffed = mMemoryBuffer->numBytesAvail();
00546
00547
00548 if ((locSizeBuffed == 0) && mIsEOF) {
00549
00550 return true;
00551 } else {
00552
00553 return false;
00554 }
00555 }
00556
00557 }
00558 unsigned long HTTPStreamingFileSource::read(char* outBuffer, unsigned long inNumBytes) {
00559
00560
00561
00562 unsigned long locNumRead = 0;
00563 {
00564 CAutoLock locLock(mBufferLock);
00565
00566 if (!mFirstPass && (mApparentReadPosition < mStreamStartBufferLength)) {
00567
00568 unsigned long locBytesStillBuffered = mStreamStartBufferLength - mApparentReadPosition;
00569
00570 unsigned long locBytesToRead = (inNumBytes <= locBytesStillBuffered) ? inNumBytes
00571 : locBytesStillBuffered;
00572
00573 memcpy((void*)outBuffer, (const void*)(mStreamStartBuffer + mApparentReadPosition), locBytesToRead);
00574 mApparentReadPosition += locBytesToRead;
00575 locNumRead = locBytesToRead;
00576
00577 } else {
00578
00579 if((mMemoryBuffer->numBytesAvail() == 0) || mWasError) {
00580
00581
00582 } else {
00583
00584
00585
00586 locNumRead = mMemoryBuffer->read((unsigned char*)outBuffer, inNumBytes, true);
00587
00588
00589 if (mFirstPass && (locNumRead > 0) && (mStreamStartBufferLength < STREAM_START_BUFFER_SIZE)) {
00590
00591
00592
00593 unsigned long locSpaceLeftInStartBuffer = STREAM_START_BUFFER_SIZE - mStreamStartBufferLength;
00594
00595
00596 unsigned long locBytesToCopy = (locNumRead <= locSpaceLeftInStartBuffer) ? locNumRead
00597 : locSpaceLeftInStartBuffer;
00598
00599 memcpy((void*)(mStreamStartBuffer + mStreamStartBufferLength), (const void*)outBuffer, locBytesToCopy);
00600 mStreamStartBufferLength += locBytesToCopy;
00601 }
00602
00603
00604 if (locNumRead > 0) {
00605 debugLog<<locNumRead<<" bytes read from buffer at "<<mCurrentAbsoluteReadPosition<< endl;
00606 }
00607
00608 mCurrentAbsoluteReadPosition += locNumRead;
00609
00610
00611
00612
00613
00614
00615 }
00616 }
00617 }
00618
00619 if ((mMemoryBuffer->numBytesAvail() <= MEMORY_BUFFER_LOW_TIDE) && (!mIsBufferFilling) && (!mIsEOF)) {
00620 CallWorker(THREAD_RUN);
00621 }
00622
00623
00624 return locNumRead;
00625 }