00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef SRC_XRDCL_XRDCLXCPSRC_HH_
00026 #define SRC_XRDCL_XRDCLXCPSRC_HH_
00027
00028 #include "XrdCl/XrdClFile.hh"
00029 #include "XrdCl/XrdClSyncQueue.hh"
00030 #include "XrdSys/XrdSysPthread.hh"
00031
00032 namespace XrdCl
00033 {
00034
00035 class XCpCtx;
00036
00037 class XCpSrc
00038 {
00039 friend class ChunkHandler;
00040
00041 public:
00042
00053 XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx );
00054
00058 void Start();
00059
00063 void Stop()
00064 {
00065 pRunning = false;
00066 }
00067
00071 void Delete()
00072 {
00073 XrdSysMutexHelper lck( pMtx );
00074 --pRefCount;
00075 if( !pRefCount )
00076 {
00077 lck.UnLock();
00078 delete this;
00079 }
00080 }
00081
00087 XCpSrc* Self()
00088 {
00089 XrdSysMutexHelper lck( pMtx );
00090 ++pRefCount;
00091 return this;
00092 }
00093
00097 bool IsRunning()
00098 {
00099 return pRunning;
00100 }
00101
00106 bool HasData()
00107 {
00108 XrdSysMutexHelper lck( pMtx );
00109 return pCurrentOffset < pBlkEnd || !pRecovered.empty() || !pOngoing.empty();
00110 }
00111
00112
00113
00119 uint64_t TransferRate();
00120
00126 static void DeleteChunk( ChunkInfo *&chunk )
00127 {
00128 if( chunk )
00129 {
00130 delete[] static_cast<char*>( chunk->buffer );
00131 delete chunk;
00132 chunk = 0;
00133 }
00134 }
00135
00136 private:
00137
00143 virtual ~XCpSrc();
00144
00148 static void* Run( void* arg );
00149
00154 void StartDownloading();
00155
00166 XRootDStatus Initialize();
00167
00175 XRootDStatus Recover();
00176
00189 XRootDStatus ReadChunks();
00190
00205 void Steal( XCpSrc *src );
00206
00215 XRootDStatus GetWork();
00216
00225 void ReportResponse( XRootDStatus *status, ChunkInfo *chunk, File *handle );
00226
00230 template<typename T>
00231 static void DeletePtr( T *&obj )
00232 {
00233 delete obj;
00234 obj = 0;
00235 }
00236
00243 static bool FilesEqual( File *f1, File *f2 )
00244 {
00245 if( !f1 || !f2 ) return false;
00246
00247 const std::string lastURL = "LastURL";
00248 std::string url1, url2;
00249
00250 f1->GetProperty( lastURL, url1 );
00251 f2->GetProperty( lastURL, url2 );
00252
00253
00254 size_t pos = url1.find( '?' );
00255 if( pos != std::string::npos )
00256 url1 = url1.substr( 0 , pos );
00257 pos = url2.find( '?' );
00258 if( pos != std::string::npos )
00259 url2 = url2.substr( 0 , pos );
00260
00261 return url1 == url2;
00262 }
00263
00267 uint32_t pChunkSize;
00268
00272 uint8_t pParallel;
00273
00277 int64_t pFileSize;
00278
00282 pthread_t pThread;
00283
00287 XCpCtx *pCtx;
00288
00292 std::string pUrl;
00293
00297 File *pFile;
00298
00299 std::map<File*, uint8_t> pFailed;
00300
00304 uint64_t pCurrentOffset;
00305
00309 uint64_t pBlkEnd;
00310
00314 uint64_t pDataTransfered;
00315
00320 std::map<uint64_t, uint64_t> pOngoing;
00321
00326 std::map<uint64_t, uint64_t> pRecovered;
00327
00334 SyncQueue<XRootDStatus*> pReports;
00335
00339 XrdSysRecMutex pMtx;
00340
00344 size_t pRefCount;
00345
00351 bool pRunning;
00352
00356 time_t pStartTime;
00357
00362 time_t pTransferTime;
00363 };
00364
00365 }
00366
00367 #endif