00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN) 00003 // Author: Lukasz Janyst <ljanyst@cern.ch> 00004 //------------------------------------------------------------------------------ 00005 // XRootD is free software: you can redistribute it and/or modify 00006 // it under the terms of the GNU Lesser General Public License as published by 00007 // the Free Software Foundation, either version 3 of the License, or 00008 // (at your option) any later version. 00009 // 00010 // XRootD is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 // GNU General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU Lesser General Public License 00016 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00017 //------------------------------------------------------------------------------ 00018 00019 #ifndef __XRD_CL_STREAM_HH__ 00020 #define __XRD_CL_STREAM_HH__ 00021 00022 #include "XrdCl/XrdClPoller.hh" 00023 #include "XrdCl/XrdClStatus.hh" 00024 #include "XrdCl/XrdClURL.hh" 00025 #include "XrdCl/XrdClPostMasterInterfaces.hh" 00026 #include "XrdCl/XrdClChannelHandlerList.hh" 00027 #include "XrdCl/XrdClJobManager.hh" 00028 #include "XrdCl/XrdClInQueue.hh" 00029 #include "XrdCl/XrdClUtils.hh" 00030 00031 #include "XrdSys/XrdSysPthread.hh" 00032 #include "XrdNet/XrdNetAddr.hh" 00033 #include <list> 00034 #include <vector> 00035 00036 namespace XrdCl 00037 { 00038 class Message; 00039 class Channel; 00040 class TransportHandler; 00041 class TaskManager; 00042 struct SubStreamData; 00043 00044 //---------------------------------------------------------------------------- 00046 //---------------------------------------------------------------------------- 00047 class Stream 00048 { 00049 public: 00050 //------------------------------------------------------------------------ 00052 //------------------------------------------------------------------------ 00053 enum StreamStatus 00054 { 00055 Disconnected = 0, 00056 Connected = 1, 00057 Connecting = 2, 00058 Error = 3 00059 }; 00060 00061 //------------------------------------------------------------------------ 00063 //------------------------------------------------------------------------ 00064 Stream( const URL *url, uint16_t streamNum ); 00065 00066 //------------------------------------------------------------------------ 00068 //------------------------------------------------------------------------ 00069 ~Stream(); 00070 00071 //------------------------------------------------------------------------ 00073 //------------------------------------------------------------------------ 00074 Status Initialize(); 00075 00076 //------------------------------------------------------------------------ 00078 //------------------------------------------------------------------------ 00079 Status Send( Message *msg, 00080 OutgoingMsgHandler *handler, 00081 bool stateful, 00082 time_t expires ); 00083 00084 //------------------------------------------------------------------------ 00086 //------------------------------------------------------------------------ 00087 void SetTransport( TransportHandler *transport ) 00088 { 00089 pTransport = transport; 00090 } 00091 00092 //------------------------------------------------------------------------ 00094 //------------------------------------------------------------------------ 00095 void SetPoller( Poller *poller ) 00096 { 00097 pPoller = poller; 00098 } 00099 00100 //------------------------------------------------------------------------ 00102 //------------------------------------------------------------------------ 00103 void SetIncomingQueue( InQueue *incomingQueue ) 00104 { 00105 pIncomingQueue = incomingQueue; 00106 delete pQueueIncMsgJob; 00107 pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue ); 00108 } 00109 00110 //------------------------------------------------------------------------ 00112 //------------------------------------------------------------------------ 00113 void SetChannelData( AnyObject *channelData ) 00114 { 00115 pChannelData = channelData; 00116 } 00117 00118 //------------------------------------------------------------------------ 00120 //------------------------------------------------------------------------ 00121 void SetTaskManager( TaskManager *taskManager ) 00122 { 00123 pTaskManager = taskManager; 00124 } 00125 00126 //------------------------------------------------------------------------ 00128 //------------------------------------------------------------------------ 00129 void SetJobManager( JobManager *jobManager ) 00130 { 00131 pJobManager = jobManager; 00132 } 00133 00134 //------------------------------------------------------------------------ 00138 //------------------------------------------------------------------------ 00139 Status EnableLink( PathID &path ); 00140 00141 //------------------------------------------------------------------------ 00143 //------------------------------------------------------------------------ 00144 void Disconnect( bool force = false ); 00145 00146 //------------------------------------------------------------------------ 00149 //------------------------------------------------------------------------ 00150 void Tick( time_t now ); 00151 00152 //------------------------------------------------------------------------ 00154 //------------------------------------------------------------------------ 00155 const URL *GetURL() const 00156 { 00157 return pUrl; 00158 } 00159 00160 //------------------------------------------------------------------------ 00162 //------------------------------------------------------------------------ 00163 uint16_t GetStreamNumber() const 00164 { 00165 return pStreamNum; 00166 } 00167 00168 //------------------------------------------------------------------------ 00170 //------------------------------------------------------------------------ 00171 void ForceConnect(); 00172 00173 //------------------------------------------------------------------------ 00175 //------------------------------------------------------------------------ 00176 const std::string &GetName() const 00177 { 00178 return pStreamName; 00179 } 00180 00181 //------------------------------------------------------------------------ 00183 //------------------------------------------------------------------------ 00184 void DisableIfEmpty( uint16_t subStream ); 00185 00186 //------------------------------------------------------------------------ 00188 //------------------------------------------------------------------------ 00189 void OnIncoming( uint16_t subStream, 00190 Message *msg, 00191 uint32_t bytesReceived ); 00192 00193 //------------------------------------------------------------------------ 00194 // Call when one of the sockets is ready to accept a new message 00195 //------------------------------------------------------------------------ 00196 std::pair<Message *, OutgoingMsgHandler *> 00197 OnReadyToWrite( uint16_t subStream ); 00198 00199 //------------------------------------------------------------------------ 00200 // Call when a message is written to the socket 00201 //------------------------------------------------------------------------ 00202 void OnMessageSent( uint16_t subStream, 00203 Message *msg, 00204 uint32_t bytesSent ); 00205 00206 //------------------------------------------------------------------------ 00208 //------------------------------------------------------------------------ 00209 void OnConnect( uint16_t subStream ); 00210 00211 //------------------------------------------------------------------------ 00213 //------------------------------------------------------------------------ 00214 void OnConnectError( uint16_t subStream, Status status ); 00215 00216 //------------------------------------------------------------------------ 00218 //------------------------------------------------------------------------ 00219 void OnError( uint16_t subStream, Status status ); 00220 00221 //------------------------------------------------------------------------ 00223 //------------------------------------------------------------------------ 00224 void ForceError( Status status ); 00225 00226 //------------------------------------------------------------------------ 00228 //------------------------------------------------------------------------ 00229 void OnReadTimeout( uint16_t subStream, bool &isBroken ); 00230 00231 //------------------------------------------------------------------------ 00233 //------------------------------------------------------------------------ 00234 void OnWriteTimeout( uint16_t subStream ); 00235 00236 //------------------------------------------------------------------------ 00238 //------------------------------------------------------------------------ 00239 void RegisterEventHandler( ChannelEventHandler *handler ); 00240 00241 //------------------------------------------------------------------------ 00243 //------------------------------------------------------------------------ 00244 void RemoveEventHandler( ChannelEventHandler *handler ); 00245 00246 //------------------------------------------------------------------------ 00255 //------------------------------------------------------------------------ 00256 std::pair<IncomingMsgHandler *, bool> 00257 InstallIncHandler( Message *msg, uint16_t stream ); 00258 00259 private: 00260 00261 //------------------------------------------------------------------------ 00262 // Job queuing the incoming messages 00263 //------------------------------------------------------------------------ 00264 class QueueIncMsgJob: public Job 00265 { 00266 public: 00267 QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {}; 00268 virtual ~QueueIncMsgJob() {}; 00269 virtual void Run( void *arg ) 00270 { 00271 Message *msg = (Message *)arg; 00272 pQueue->AddMessage( msg ); 00273 } 00274 private: 00275 InQueue *pQueue; 00276 }; 00277 00278 //------------------------------------------------------------------------ 00279 // Job handling the incoming messages 00280 //------------------------------------------------------------------------ 00281 class HandleIncMsgJob: public Job 00282 { 00283 public: 00284 HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {}; 00285 virtual ~HandleIncMsgJob() {}; 00286 virtual void Run( void *arg ) 00287 { 00288 Message *msg = (Message *)arg; 00289 pHandler->Process( msg ); 00290 delete this; 00291 } 00292 private: 00293 IncomingMsgHandler *pHandler; 00294 }; 00295 00296 //------------------------------------------------------------------------ 00298 //------------------------------------------------------------------------ 00299 void OnFatalError( uint16_t subStream, 00300 Status status, 00301 XrdSysMutexHelper &lock ); 00302 00303 //------------------------------------------------------------------------ 00305 //------------------------------------------------------------------------ 00306 void MonitorDisconnection( Status status ); 00307 00308 //------------------------------------------------------------------------ 00310 //------------------------------------------------------------------------ 00311 Status RequestClose( Message *resp ); 00312 00313 typedef std::vector<SubStreamData*> SubStreamList; 00314 00315 //------------------------------------------------------------------------ 00316 // Data members 00317 //------------------------------------------------------------------------ 00318 const URL *pUrl; 00319 uint16_t pStreamNum; 00320 std::string pStreamName; 00321 TransportHandler *pTransport; 00322 Poller *pPoller; 00323 TaskManager *pTaskManager; 00324 JobManager *pJobManager; 00325 XrdSysRecMutex pMutex; 00326 InQueue *pIncomingQueue; 00327 AnyObject *pChannelData; 00328 uint32_t pLastStreamError; 00329 Status pLastFatalError; 00330 uint16_t pStreamErrorWindow; 00331 uint16_t pConnectionCount; 00332 uint16_t pConnectionRetry; 00333 time_t pConnectionInitTime; 00334 uint16_t pConnectionWindow; 00335 SubStreamList pSubStreams; 00336 std::vector<XrdNetAddr> pAddresses; 00337 Utils::AddressType pAddressType; 00338 ChannelHandlerList pChannelEvHandlers; 00339 uint64_t pSessionId; 00340 00341 //------------------------------------------------------------------------ 00342 // Jobs 00343 //------------------------------------------------------------------------ 00344 QueueIncMsgJob *pQueueIncMsgJob; 00345 00346 //------------------------------------------------------------------------ 00347 // Monitoring info 00348 //------------------------------------------------------------------------ 00349 timeval pConnectionStarted; 00350 timeval pConnectionDone; 00351 uint64_t pBytesSent; 00352 uint64_t pBytesReceived; 00353 }; 00354 } 00355 00356 #endif // __XRD_CL_STREAM_HH__