1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package ubic.basecode.util.r;
20  
21  import org.apache.commons.lang3.StringUtils;
22  import org.rosuda.REngine.REXP;
23  import org.rosuda.REngine.REXPMismatchException;
24  import org.rosuda.REngine.REngineException;
25  import org.rosuda.REngine.RList;
26  import org.rosuda.REngine.Rserve.RConnection;
27  import org.rosuda.REngine.Rserve.RserveException;
28  import ubic.basecode.dataStructure.matrix.DenseDoubleMatrix;
29  import ubic.basecode.dataStructure.matrix.DoubleMatrix;
30  import ubic.basecode.util.Configuration;
31  
32  import java.io.File;
33  import java.io.IOException;
34  import java.util.Iterator;
35  import java.util.List;
36  
37  
38  
39  
40  
41  public class RServeClient extends AbstractRClient {
42  
43      
44  
45  
46      private static final int DEFAULT_PORT = 6311;
47  
48      private static final int MAX_CONNECT_TRIES = 10;
49  
50      private static final int MAX_EVAL_TRIES = 3;
51  
52      private final static String os = System.getProperty( "os.name" ).toLowerCase();
53  
54      protected static String findRserveCommand() {
55          String rserveExecutable = Configuration.getString( "rserve.start.command" );
56          if ( StringUtils.isBlank( rserveExecutable ) ) {
57              log.info( "Rserve command not configured? Trying fallbacks" );
58              if ( os.startsWith( "windows" ) ) { 
59                  rserveExecutable = System.getenv( "R_HOME" ) + File.separator + "library" + File.separator + "Rserve"
60                          + File.separator + "Rserve.exe";
61              } else {
62                  rserveExecutable = "R CMD Rserve";
63              }
64          }
65          return rserveExecutable;
66      }
67  
68      private RConnection connection = null;
69  
70      
71  
72  
73  
74  
75      protected RServeClient() throws IOException {
76          if ( !connect() ) {
77              throw new IOException( "Could not connect to Rserve" );
78          }
79      }
80  
81      
82  
83  
84  
85      protected RServeClient( String host ) throws IOException {
86          if ( !connect( host, DEFAULT_PORT ) ) {
87              throw new IOException( "Could not connect to Rserve" );
88          }
89      }
90  
91      
92  
93  
94  
95  
96      @Override
97      public void assign( String argName, double[] arg ) {
98          checkConnection();
99  
100         try {
101             connection.assign( argName, arg );
102         } catch ( REngineException e ) {
103             throw new RuntimeException( e );
104         }
105 
106     }
107 
108     
109 
110 
111 
112 
113     @Override
114     public void assign( String arg0, int[] arg1 ) {
115         if ( StringUtils.isBlank( arg0 ) ) {
116             throw new IllegalArgumentException( "Must supply valid variable name" );
117         }
118         checkConnection();
119         try {
120             connection.assign( arg0, arg1 );
121         } catch ( REngineException e ) {
122             throw new RuntimeException( e );
123         }
124     }
125 
126     
127 
128 
129 
130 
131     
132 
133 
134 
135 
136     @Override
137     public void assign( String sym, String ct ) {
138         if ( StringUtils.isBlank( sym ) ) {
139             throw new IllegalArgumentException( "Must supply valid variable name" );
140         }
141         try {
142             this.checkConnection();
143             connection.assign( sym, ct );
144         } catch ( RserveException e ) {
145             throw new RuntimeException( "Assignment failed: " + sym + " value " + ct, e );
146         }
147     }
148 
149     
150 
151 
152 
153 
154     @Override
155     public void assign( String argName, String[] array ) {
156         if ( array == null || array.length == 0 ) {
157             throw new IllegalArgumentException( "Array must not be null or empty" );
158         }
159         if ( StringUtils.isBlank( argName ) ) {
160             throw new IllegalArgumentException( "Must supply valid variable name" );
161         }
162         try {
163             log.debug( "assign: " + argName + "<-" + array.length + " strings." );
164             this.checkConnection();
165             connection.assign( argName, array );
166         } catch ( REngineException e ) {
167             throw new RuntimeException( "Failure with assignment: " + argName + "<-" + array.length + " strings." + e );
168         }
169     }
170 
171     
172 
173 
174 
175     public boolean connect() {
176         return connect( true );
177     }
178 
179     @Override
180     public void disconnect() {
181         if ( connection != null && connection.isConnected() ) connection.close();
182         connection = null;
183     }
184 
185     
186 
187 
188 
189 
190     @Override
191     public REXP eval( String command ) {
192         log.debug( "eval: " + command );
193 
194         int lockValue = 0;
195         try {
196 
197             
198 
199 
200             for ( int i = 0; i < MAX_EVAL_TRIES + 1; i++ ) {
201                 RuntimeException ex = null;
202                 try {
203                     checkConnection();
204                     lockValue = connection.lock();
205                     REXP r = connection.eval( "try(" + command + ", silent=T)" );
206                     if ( r == null ) return null;
207 
208                     if ( r.inherits( "try-error" ) ) {
209                         
210 
211 
212                         throw new RuntimeException( "Error from R: " + r.asString() );
213                     }
214                     return r;
215 
216                 } catch ( RserveException e ) {
217                     ex = new RuntimeException( "Error excecuting " + command + ": " + e.getMessage(), e );
218                 } catch ( REXPMismatchException e ) {
219                     throw new RuntimeException( "Error processing apparent error object returned by " + command + ": "
220                             + e.getMessage(), e );
221                 }
222 
223                 if ( i == MAX_EVAL_TRIES ) {
224                     throw ex;
225                 }
226 
227                 try {
228                     log.debug( "Eval failed, retrying" );
229                     Thread.sleep( 200 );
230                 } catch ( InterruptedException e ) {
231                     return null;
232                 }
233 
234             }
235 
236             throw new RuntimeException( "Evaluation failed! No details available" );
237         } finally {
238             if ( lockValue != 0 ) connection.unlock( lockValue );
239         }
240     }
241 
242     
243 
244 
245 
246 
247     @Override
248     public void finalize() {
249         this.disconnect();
250     }
251 
252     
253 
254 
255 
256 
257     @Override
258     public String getLastError() {
259         return connection.getLastError();
260     }
261 
262     
263 
264 
265     @Override
266     public boolean isConnected() {
267         if ( connection != null && connection.isConnected() ) return true;
268         return false;
269     }
270 
271     
272 
273 
274 
275 
276     @Override
277     public DoubleMatrix<String, String> retrieveMatrix( String variableName ) {
278         try {
279             log.debug( "Retrieving " + variableName );
280 
281             
282             
283 
284             
285             REXP r = this.eval( "data.frame(t(" + variableName + "))" );
286             if ( r == null ) throw new IllegalArgumentException( variableName + " not found in R context" );
287 
288             RList dataframe = r.asList();
289             int numrows = dataframe.size();
290             double[][] results = new double[numrows][];
291             int i = 0;
292             for ( Iterator<?> it = dataframe.iterator(); it.hasNext(); ) {
293                 REXP next = ( REXP ) it.next();
294                 double[] row = next.asDoubles();
295                 results[i] = row;
296                 i++;
297             }
298 
299             DoubleMatrix<String, String> resultObject = new DenseDoubleMatrix<String, String>( results );
300 
301             retrieveRowAndColumnNames( variableName, resultObject );
302             return resultObject;
303         } catch ( REXPMismatchException e ) {
304             throw new RuntimeException( "Failed to get back matrix for variable " + variableName, e );
305         }
306 
307     }
308 
309     
310 
311 
312 
313 
314     @Override
315     public void voidEval( String command ) {
316         if ( command == null ) throw new IllegalArgumentException( "Null command" );
317         this.checkConnection();
318 
319         log.debug( "voidEval: " + command );
320         eval( command );
321 
322     }
323 
324     
325 
326 
327     private void checkConnection() {
328         if ( !this.isConnected() ) {
329 
330             
331 
332 
333 
334             log.warn( "Not connected, trying to reconnect" );
335             boolean ok = false;
336             for ( int i = 0; i < MAX_CONNECT_TRIES; i++ ) {
337                 try {
338                     Thread.sleep( 200 );
339                 } catch ( InterruptedException e ) {
340                     return;
341                 }
342                 ok = this.connect();
343                 if ( ok ) break;
344             }
345             if ( !ok ) {
346                 throw new RuntimeException( "Not connected" );
347             }
348         }
349     }
350 
351     
352 
353 
354     private boolean connect( boolean beQuiet ) {
355         if ( connection != null && connection.isConnected() ) {
356             return true;
357         }
358         int tries = 3;
359         Exception ex = null;
360         for ( int i = 0; i < tries; i++ ) {
361             try {
362                 connection = new RConnection();
363                 return true;
364             } catch ( RserveException e ) {
365                 ex = e;
366                 try {
367                     Thread.sleep( 100 );
368                 } catch ( InterruptedException e1 ) {
369                     return false;
370                 }
371             }
372         }
373         if ( !beQuiet ) {
374             log.error( "Could not connect to RServe: " + ( ex == null ? "" : ex.getMessage() ) );
375         }
376         return false;
377     }
378 
379     
380 
381 
382 
383 
384     private boolean connect( String host, int port ) {
385         if ( connection != null && connection.isConnected() ) {
386             return true;
387         }
388         try {
389             connection = new RConnection( host, port );
390         } catch ( RserveException e ) {
391             log.error( "Could not connect to RServe: " + e.getMessage() );
392             return false;
393         }
394         log.info( "Connected via RServe." );
395         return true;
396     }
397 
398     
399 
400 
401 
402 
403     private void retrieveRowAndColumnNames( String variableName, DoubleMatrix<String, String> resultObject ) {
404         List<String> rowNames = this.stringListEval( "dimnames(" + variableName + ")[1][[1]]" );
405 
406         if ( rowNames.size() == resultObject.rows() ) {
407             resultObject.setRowNames( rowNames );
408         }
409 
410         List<String> colNames = this.stringListEval( "dimnames(" + variableName + ")[2][[1]]" );
411 
412         if ( colNames.size() == resultObject.columns() ) {
413             resultObject.setColumnNames( colNames );
414         }
415     }
416 
417 }