1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package ubic.basecode.util.r;
20
21 import org.apache.commons.lang3.StringUtils;
22 import org.rosuda.REngine.REXP;
23 import org.rosuda.REngine.REXPMismatchException;
24 import org.rosuda.REngine.REngineException;
25 import org.rosuda.REngine.RList;
26 import org.rosuda.REngine.Rserve.RConnection;
27 import org.rosuda.REngine.Rserve.RserveException;
28 import ubic.basecode.dataStructure.matrix.DenseDoubleMatrix;
29 import ubic.basecode.dataStructure.matrix.DoubleMatrix;
30 import ubic.basecode.util.Configuration;
31
32 import java.io.File;
33 import java.io.IOException;
34 import java.util.Iterator;
35 import java.util.List;
36
37
38
39
40
41 public class RServeClient extends AbstractRClient {
42
43
44
45
46 private static final int DEFAULT_PORT = 6311;
47
48 private static final int MAX_CONNECT_TRIES = 10;
49
50 private static final int MAX_EVAL_TRIES = 3;
51
52 private final static String os = System.getProperty( "os.name" ).toLowerCase();
53
54 protected static String findRserveCommand() {
55 String rserveExecutable = Configuration.getString( "rserve.start.command" );
56 if ( StringUtils.isBlank( rserveExecutable ) ) {
57 log.info( "Rserve command not configured? Trying fallbacks" );
58 if ( os.startsWith( "windows" ) ) {
59 rserveExecutable = System.getenv( "R_HOME" ) + File.separator + "library" + File.separator + "Rserve"
60 + File.separator + "Rserve.exe";
61 } else {
62 rserveExecutable = "R CMD Rserve";
63 }
64 }
65 return rserveExecutable;
66 }
67
68 private RConnection connection = null;
69
70
71
72
73
74
75 protected RServeClient() throws IOException {
76 if ( !connect() ) {
77 throw new IOException( "Could not connect to Rserve" );
78 }
79 }
80
81
82
83
84
85 protected RServeClient( String host ) throws IOException {
86 if ( !connect( host, DEFAULT_PORT ) ) {
87 throw new IOException( "Could not connect to Rserve" );
88 }
89 }
90
91
92
93
94
95
96 @Override
97 public void assign( String argName, double[] arg ) {
98 checkConnection();
99
100 try {
101 connection.assign( argName, arg );
102 } catch ( REngineException e ) {
103 throw new RuntimeException( e );
104 }
105
106 }
107
108
109
110
111
112
113 @Override
114 public void assign( String arg0, int[] arg1 ) {
115 if ( StringUtils.isBlank( arg0 ) ) {
116 throw new IllegalArgumentException( "Must supply valid variable name" );
117 }
118 checkConnection();
119 try {
120 connection.assign( arg0, arg1 );
121 } catch ( REngineException e ) {
122 throw new RuntimeException( e );
123 }
124 }
125
126
127
128
129
130
131
132
133
134
135
136 @Override
137 public void assign( String sym, String ct ) {
138 if ( StringUtils.isBlank( sym ) ) {
139 throw new IllegalArgumentException( "Must supply valid variable name" );
140 }
141 try {
142 this.checkConnection();
143 connection.assign( sym, ct );
144 } catch ( RserveException e ) {
145 throw new RuntimeException( "Assignment failed: " + sym + " value " + ct, e );
146 }
147 }
148
149
150
151
152
153
154 @Override
155 public void assign( String argName, String[] array ) {
156 if ( array == null || array.length == 0 ) {
157 throw new IllegalArgumentException( "Array must not be null or empty" );
158 }
159 if ( StringUtils.isBlank( argName ) ) {
160 throw new IllegalArgumentException( "Must supply valid variable name" );
161 }
162 try {
163 log.debug( "assign: " + argName + "<-" + array.length + " strings." );
164 this.checkConnection();
165 connection.assign( argName, array );
166 } catch ( REngineException e ) {
167 throw new RuntimeException( "Failure with assignment: " + argName + "<-" + array.length + " strings." + e );
168 }
169 }
170
171
172
173
174
175 public boolean connect() {
176 return connect( true );
177 }
178
179 @Override
180 public void disconnect() {
181 if ( connection != null && connection.isConnected() ) connection.close();
182 connection = null;
183 }
184
185
186
187
188
189
190 @Override
191 public REXP eval( String command ) {
192 log.debug( "eval: " + command );
193
194 int lockValue = 0;
195 try {
196
197
198
199
200 for ( int i = 0; i < MAX_EVAL_TRIES + 1; i++ ) {
201 RuntimeException ex = null;
202 try {
203 checkConnection();
204 lockValue = connection.lock();
205 REXP r = connection.eval( "try(" + command + ", silent=T)" );
206 if ( r == null ) return null;
207
208 if ( r.inherits( "try-error" ) ) {
209
210
211
212 throw new RuntimeException( "Error from R: " + r.asString() );
213 }
214 return r;
215
216 } catch ( RserveException e ) {
217 ex = new RuntimeException( "Error excecuting " + command + ": " + e.getMessage(), e );
218 } catch ( REXPMismatchException e ) {
219 throw new RuntimeException( "Error processing apparent error object returned by " + command + ": "
220 + e.getMessage(), e );
221 }
222
223 if ( i == MAX_EVAL_TRIES ) {
224 throw ex;
225 }
226
227 try {
228 log.debug( "Eval failed, retrying" );
229 Thread.sleep( 200 );
230 } catch ( InterruptedException e ) {
231 return null;
232 }
233
234 }
235
236 throw new RuntimeException( "Evaluation failed! No details available" );
237 } finally {
238 if ( lockValue != 0 ) connection.unlock( lockValue );
239 }
240 }
241
242
243
244
245
246
247 @Override
248 public void finalize() {
249 this.disconnect();
250 }
251
252
253
254
255
256
257 @Override
258 public String getLastError() {
259 return connection.getLastError();
260 }
261
262
263
264
265 @Override
266 public boolean isConnected() {
267 if ( connection != null && connection.isConnected() ) return true;
268 return false;
269 }
270
271
272
273
274
275
276 @Override
277 public DoubleMatrix<String, String> retrieveMatrix( String variableName ) {
278 try {
279 log.debug( "Retrieving " + variableName );
280
281
282
283
284
285 REXP r = this.eval( "data.frame(t(" + variableName + "))" );
286 if ( r == null ) throw new IllegalArgumentException( variableName + " not found in R context" );
287
288 RList dataframe = r.asList();
289 int numrows = dataframe.size();
290 double[][] results = new double[numrows][];
291 int i = 0;
292 for ( Iterator<?> it = dataframe.iterator(); it.hasNext(); ) {
293 REXP next = ( REXP ) it.next();
294 double[] row = next.asDoubles();
295 results[i] = row;
296 i++;
297 }
298
299 DoubleMatrix<String, String> resultObject = new DenseDoubleMatrix<String, String>( results );
300
301 retrieveRowAndColumnNames( variableName, resultObject );
302 return resultObject;
303 } catch ( REXPMismatchException e ) {
304 throw new RuntimeException( "Failed to get back matrix for variable " + variableName, e );
305 }
306
307 }
308
309
310
311
312
313
314 @Override
315 public void voidEval( String command ) {
316 if ( command == null ) throw new IllegalArgumentException( "Null command" );
317 this.checkConnection();
318
319 log.debug( "voidEval: " + command );
320 eval( command );
321
322 }
323
324
325
326
327 private void checkConnection() {
328 if ( !this.isConnected() ) {
329
330
331
332
333
334 log.warn( "Not connected, trying to reconnect" );
335 boolean ok = false;
336 for ( int i = 0; i < MAX_CONNECT_TRIES; i++ ) {
337 try {
338 Thread.sleep( 200 );
339 } catch ( InterruptedException e ) {
340 return;
341 }
342 ok = this.connect();
343 if ( ok ) break;
344 }
345 if ( !ok ) {
346 throw new RuntimeException( "Not connected" );
347 }
348 }
349 }
350
351
352
353
354 private boolean connect( boolean beQuiet ) {
355 if ( connection != null && connection.isConnected() ) {
356 return true;
357 }
358 int tries = 3;
359 Exception ex = null;
360 for ( int i = 0; i < tries; i++ ) {
361 try {
362 connection = new RConnection();
363 return true;
364 } catch ( RserveException e ) {
365 ex = e;
366 try {
367 Thread.sleep( 100 );
368 } catch ( InterruptedException e1 ) {
369 return false;
370 }
371 }
372 }
373 if ( !beQuiet ) {
374 log.error( "Could not connect to RServe: " + ( ex == null ? "" : ex.getMessage() ) );
375 }
376 return false;
377 }
378
379
380
381
382
383
384 private boolean connect( String host, int port ) {
385 if ( connection != null && connection.isConnected() ) {
386 return true;
387 }
388 try {
389 connection = new RConnection( host, port );
390 } catch ( RserveException e ) {
391 log.error( "Could not connect to RServe: " + e.getMessage() );
392 return false;
393 }
394 log.info( "Connected via RServe." );
395 return true;
396 }
397
398
399
400
401
402
403 private void retrieveRowAndColumnNames( String variableName, DoubleMatrix<String, String> resultObject ) {
404 List<String> rowNames = this.stringListEval( "dimnames(" + variableName + ")[1][[1]]" );
405
406 if ( rowNames.size() == resultObject.rows() ) {
407 resultObject.setRowNames( rowNames );
408 }
409
410 List<String> colNames = this.stringListEval( "dimnames(" + variableName + ")[2][[1]]" );
411
412 if ( colNames.size() == resultObject.columns() ) {
413 resultObject.setColumnNames( colNames );
414 }
415 }
416
417 }