Open Lighting Architecture
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
StreamRpcChannel.h
1 /*
2  * This library is free software; you can redistribute it and/or
3  * modify it under the terms of the GNU Lesser General Public
4  * License as published by the Free Software Foundation; either
5  * version 2.1 of the License, or (at your option) any later version.
6  *
7  * This library is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
10  * Lesser General Public License for more details.
11  *
12  * You should have received a copy of the GNU Lesser General Public
13  * License along with this library; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * StreamRpcChannel.h
17  * Interface for the Stream RPC Channel
18  * Copyright (C) 2005-2008 Simon Newton
19  */
20 
21 #ifndef COMMON_RPC_STREAMRPCCHANNEL_H_
22 #define COMMON_RPC_STREAMRPCCHANNEL_H_
23 
24 #if HAVE_CONFIG_H
25 # include <config.h>
26 #endif
27 
28 #include <stdint.h>
29 #include <google/protobuf/service.h>
30 #include <ola/Callback.h>
31 #include <ola/io/Descriptor.h>
32 #include <ola/io/SelectServer.h>
33 #include <ola/util/SequenceNumber.h>
34 #include <memory>
35 
36 #include "ola/ExportMap.h"
37 
38 #include HASH_MAP_H
39 
40 namespace ola {
41 namespace rpc {
42 
43 using google::protobuf::Message;
44 using google::protobuf::MethodDescriptor;
45 using google::protobuf::RpcChannel;
46 using google::protobuf::RpcController;
47 using google::protobuf::Service;
48 
49 class RpcMessage;
50 
52  /*
53  * These are requests on the server end that haven't completed yet.
54  */
55  public:
57  ~OutstandingRequest() {}
58 
59  int id;
60  RpcController *controller;
61  Message *response;
62 };
63 
65  /*
66  * These are Requests on the client end that haven't completed yet.
67  */
68  public:
71 
72  int id;
73  RpcController *controller;
74  google::protobuf::Closure *callback;
75  Message *reply;
76 };
77 
78 
85 class StreamRpcChannel: public RpcChannel {
86  public :
96  StreamRpcChannel(Service *service,
97  ola::io::ConnectedDescriptor *descriptor,
98  ExportMap *export_map = NULL);
99 
104 
109  void SetService(Service *service) { m_service = service; }
110 
117  bool PendingRPCs() const { return !m_requests.empty(); }
118 
122  void DescriptorReady();
123 
136 
140  void CallMethod(
141  const MethodDescriptor *method,
142  RpcController *controller,
143  const Message *request,
144  Message *response,
145  google::protobuf::Closure *done);
146 
152  void RequestComplete(OutstandingRequest *request);
153 
157  static const unsigned int PROTOCOL_VERSION = 1;
158 
159  private:
160  typedef HASH_NAMESPACE::HASH_MAP_CLASS<int, OutstandingResponse*>
161  ResponseMap;
162 
163  bool SendMsg(RpcMessage *msg);
164  int AllocateMsgBuffer(unsigned int size);
165  int ReadHeader(unsigned int *version, unsigned int *size) const;
166  bool HandleNewMsg(uint8_t *buffer, unsigned int size);
167  void HandleRequest(RpcMessage *msg);
168  void HandleStreamRequest(RpcMessage *msg);
169 
170  // server end
171  void SendRequestFailed(OutstandingRequest *request);
172  void SendNotImplemented(int msg_id);
173  void DeleteOutstandingRequest(OutstandingRequest *request);
174 
175  // client end
176  void HandleResponse(RpcMessage *msg);
177  void HandleFailedResponse(RpcMessage *msg);
178  void HandleCanceledResponse(RpcMessage *msg);
179  void HandleNotImplemented(RpcMessage *msg);
180 
181  void HandleChannelClose();
182 
183  Service *m_service; // service to dispatch requests to
184  std::auto_ptr<SingleUseCallback0<void> > m_on_close;
185  // the descriptor to read/write to.
186  class ola::io::ConnectedDescriptor *m_descriptor;
187  SequenceNumber<uint32_t> m_sequence;
188  uint8_t *m_buffer; // buffer for incomming msgs
189  unsigned int m_buffer_size; // size of the buffer
190  unsigned int m_expected_size; // the total size of the current msg
191  unsigned int m_current_size; // the amount of data read for the current msg
192  HASH_NAMESPACE::HASH_MAP_CLASS<int, OutstandingRequest*> m_requests;
193  ResponseMap m_responses;
194  ExportMap *m_export_map;
195  UIntMap *m_recv_type_map;
196 
197  static const char K_RPC_RECEIVED_TYPE_VAR[];
198  static const char K_RPC_RECEIVED_VAR[];
199  static const char K_RPC_SENT_ERROR_VAR[];
200  static const char K_RPC_SENT_VAR[];
201  static const char *K_RPC_VARIABLES[];
202  static const char STREAMING_NO_RESPONSE[];
203  static const unsigned int INITIAL_BUFFER_SIZE = 1 << 11; // 2k
204  static const unsigned int MAX_BUFFER_SIZE = 1 << 20; // 1M
205 };
206 } // namespace rpc
207 } // namespace ola
208 #endif // COMMON_RPC_STREAMRPCCHANNEL_H_