From 7cb867319d0f8b1b0af3e7eb3f9833b091281320 Mon Sep 17 00:00:00 2001 From: Mathieu Baudier Date: Thu, 25 Feb 2010 10:23:06 +0000 Subject: [PATCH] Improve SSH support git-svn-id: https://svn.argeo.org/slc/trunk@3406 4cfe0d0a-d680-48aa-b62c-e0a02a3f76cc --- .../slc/osgi/OsgiExecutionResources.java | 2 +- .../org/argeo/slc/jsch/AbstractJschTask.java | 4 +- .../java/org/argeo/slc/jsch/RemoteExec.java | 259 ++++++++++++++---- .../main/java/org/argeo/slc/jsch/ScpTo.java | 106 ++++--- .../argeo/slc/jsch/SshFilesDeployment.java | 10 +- .../java/org/argeo/slc/jsch/SshShell.java | 125 +++++++++ .../java/org/argeo/slc/jsch/SshTarget.java | 2 +- 7 files changed, 405 insertions(+), 103 deletions(-) create mode 100644 runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshShell.java diff --git a/runtime/org.argeo.slc.support.osgi/src/main/java/org/argeo/slc/osgi/OsgiExecutionResources.java b/runtime/org.argeo.slc.support.osgi/src/main/java/org/argeo/slc/osgi/OsgiExecutionResources.java index 7ef87fcde..5582d9ca0 100644 --- a/runtime/org.argeo.slc.support.osgi/src/main/java/org/argeo/slc/osgi/OsgiExecutionResources.java +++ b/runtime/org.argeo.slc.support.osgi/src/main/java/org/argeo/slc/osgi/OsgiExecutionResources.java @@ -64,7 +64,7 @@ public class OsgiExecutionResources extends FileExecutionResources implements throw new SlcException("Cannot determine canonical path for " + path, e); } - if (log.isDebugEnabled()) + if (log.isTraceEnabled()) log.debug("OSGi local resource: " + file + " from " + resource); return file; } diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/AbstractJschTask.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/AbstractJschTask.java index 1fff474ee..f26108f4a 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/AbstractJschTask.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/AbstractJschTask.java @@ -20,8 +20,8 @@ public abstract class AbstractJschTask implements Runnable { if (sshTarget.getSession() != null) { Session session = sshTarget.getSession(); if (session.isConnected()) { - if (log.isDebugEnabled()) - log.debug("Using cached sesison to " + getSshTarget() + if (log.isTraceEnabled()) + log.debug("Using cached session to " + getSshTarget() + " via SSH"); return session; } diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/RemoteExec.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/RemoteExec.java index 129520986..094b77fc1 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/RemoteExec.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/RemoteExec.java @@ -1,22 +1,32 @@ package org.argeo.slc.jsch; import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; import java.util.List; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.argeo.slc.SlcException; import org.argeo.slc.core.execution.tasks.SystemCall; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.io.Resource; +import org.springframework.util.StringUtils; import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.ChannelShell; import com.jcraft.jsch.Session; -public class RemoteExec extends AbstractJschTask { +public class RemoteExec extends AbstractJschTask implements InitializingBean { private final static Log log = LogFactory.getLog(RemoteExec.class); private Boolean failOnBadExitStatus = true; @@ -25,105 +35,220 @@ public class RemoteExec extends AbstractJschTask { private String command; private SystemCall systemCall; private List systemCalls = new ArrayList(); + private Resource script; + private Boolean xForwarding = false; + private Boolean agentForwarding = false; + private Boolean forceShell = false; + private Map env = new HashMap(); public void run(Session session) { + List commandsToUse = new ArrayList(commands); + String commandToUse = command; // convert system calls if (systemCall != null) { if (command != null) throw new SlcException("Cannot specify command AND systemCall"); - command = convertSystemCall(systemCall); + commandToUse = convertSystemCall(systemCall); } if (systemCalls.size() != 0) { - if (commands.size() != 0) + if (commandsToUse.size() != 0) throw new SlcException( "Cannot specify commands AND systemCalls"); for (SystemCall systemCall : systemCalls) - commands.add(convertSystemCall(systemCall)); + commandsToUse.add(convertSystemCall(systemCall)); + } + + if (script != null) { + if (commandsToUse.size() != 0) + throw new SlcException("Cannot specify commands and script"); + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(script + .getInputStream())); + String line = null; + while ((line = reader.readLine()) != null) { + if (!StringUtils.hasText(line)) + continue; + commandsToUse.add(line); + } + } catch (IOException e) { + throw new SlcException("Cannot read script " + script, e); + } finally { + IOUtils.closeQuietly(reader); + } + } + + if (forceShell) { + commandsToUse.add(commandToUse); + commandToUse = null; } // execute command(s) - if (command != null) { - if (commands.size() != 0) + if (commandToUse != null) { + if (commandsToUse.size() != 0) throw new SlcException( "Specify either a single command or a list of commands."); - remoteExec(session, command); + remoteExec(session, commandToUse); } else { - if (commands.size() == 0) + if (commandsToUse.size() == 0) throw new SlcException( "Neither a single command or a list of commands has been specified."); - for (String cmd : commands) { - remoteExec(session, cmd); - } + remoteExec(session, commandsToUse); } } protected String convertSystemCall(SystemCall systemCall) { - // TODO: prepend environemnt variables + // TODO: prepend environment variables // TODO: deal with exec dir return systemCall.asCommand(); } - protected void remoteExec(Session session, String command) { - BufferedReader execIn = null; + protected void remoteExec(Session session, final List commands) { try { - Channel channel = session.openChannel("exec"); - ((ChannelExec) channel).setCommand(command); + final ChannelShell channel = (ChannelShell) session + .openChannel("shell"); + channel.setInputStream(null); + channel.setXForwarding(xForwarding); + channel.setAgentForwarding(agentForwarding); + channel.setEnv(new Hashtable(env)); - // X Forwarding - // channel.setXForwarding(true); + /* + * // Choose the pty-type "vt102". + * ((ChannelShell)channel).setPtyType("vt102"); + */ + // Writer thread + final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(channel.getOutputStream())); - // channel.setInputStream(System.in); - channel.setInputStream(null); + channel.connect(); - ((ChannelExec) channel).setErrStream(System.err); + // write commands to shell + Thread writerThread = new Thread("Shell writer " + getSshTarget()) { + @Override + public void run() { + try { + for (String line : commands) { + if (!StringUtils.hasText(line)) + continue; + writer.write(line); + writer.newLine(); + } + writer.append("exit"); + writer.newLine(); + writer.flush(); + // channel.disconnect(); + } catch (IOException e) { + throw new SlcException("Cannot write to shell on " + + getSshTarget(), e); + } finally { + IOUtils.closeQuietly(writer); + } + } + }; + writerThread.start(); - InputStream in = channel.getInputStream(); + readStdOut(channel); + checkExitStatus(channel); + channel.disconnect(); - if (log.isDebugEnabled()) - log.debug("Run '" + command + "' on " + getSshTarget() + "..."); + } catch (Exception e) { + throw new SlcException("Cannot use SSH shell on " + getSshTarget(), + e); + } - channel.connect(); + } - // byte[] tmp = new byte[1024]; - while (true) { - execIn = new BufferedReader(new InputStreamReader(in)); - String line = null; - while ((line = execIn.readLine()) != null) { - if (!line.trim().equals("")) - log.info(line); - } + protected void remoteExec(Session session, String command) { + try { + final ChannelExec channel = (ChannelExec) session + .openChannel("exec"); + channel.setCommand(command); - if (channel.isClosed()) { - int exitStatus = channel.getExitStatus(); - if (exitStatus == 0) { - if (log.isTraceEnabled()) - log.trace("Remote execution exit status: " - + exitStatus); - } else { - String msg = "Remote execution failed with " - + " exit status: " + exitStatus; - if (failOnBadExitStatus) - throw new SlcException(msg); - else - log.error(msg); - } + channel.setInputStream(null); + channel.setXForwarding(xForwarding); + channel.setAgentForwarding(agentForwarding); + channel.setEnv(new Hashtable(env)); + channel.setErrStream(null); - break; - } - try { - Thread.sleep(1000); - } catch (Exception ee) { - } - } + // Standard Error + readStdErr(channel); + + if (log.isDebugEnabled()) + log.debug("Run '" + command + "' on " + getSshTarget() + "..."); + channel.connect(); + readStdOut(channel); + checkExitStatus(channel); channel.disconnect(); } catch (Exception e) { throw new SlcException("Cannot execute remotely '" + command + "' on " + getSshTarget(), e); + } + } + + protected void readStdErr(final ChannelExec channel) { + new Thread("stderr " + getSshTarget()) { + public void run() { + BufferedReader stdErr = null; + try { + InputStream in = channel.getErrStream(); + stdErr = new BufferedReader(new InputStreamReader(in)); + String line = null; + while ((line = stdErr.readLine()) != null) { + if (!line.trim().equals("")) + log.error(line); + } + } catch (IOException e) { + if (log.isDebugEnabled()) + log.error("Cannot read stderr from " + getSshTarget(), + e); + } finally { + IOUtils.closeQuietly(stdErr); + } + } + }.start(); + } + + protected void readStdOut(Channel channel) { + BufferedReader stdOut = null; + try { + InputStream in = channel.getInputStream(); + stdOut = new BufferedReader(new InputStreamReader(in)); + String line = null; + while ((line = stdOut.readLine()) != null) { + if (!line.trim().equals("")) + log.info(line); + } + } catch (IOException e) { + if (log.isDebugEnabled()) + log.error("Cannot read stdout from " + getSshTarget(), e); } finally { - IOUtils.closeQuietly(execIn); + IOUtils.closeQuietly(stdOut); + } + } + + protected void checkExitStatus(Channel channel) { + if (channel.isClosed()) { + int exitStatus = channel.getExitStatus(); + if (exitStatus == 0) { + if (log.isTraceEnabled()) + log.trace("Remote execution exit status: " + exitStatus); + } else { + String msg = "Remote execution failed with " + " exit status: " + + exitStatus; + if (failOnBadExitStatus) + throw new SlcException(msg); + else + log.error(msg); + } } + + } + + public void afterPropertiesSet() throws Exception { + // TODO Auto-generated method stub + } public void setCommand(String command) { @@ -146,4 +271,28 @@ public class RemoteExec extends AbstractJschTask { this.systemCalls = systemCalls; } + public void setScript(Resource script) { + this.script = script; + } + + public void setxForwarding(Boolean xForwarding) { + this.xForwarding = xForwarding; + } + + public void setAgentForwarding(Boolean agentForwarding) { + this.agentForwarding = agentForwarding; + } + + public void setEnv(Map env) { + this.env = env; + } + + public void setForceShell(Boolean forceShell) { + this.forceShell = forceShell; + } + + public List getCommands() { + return commands; + } + } diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/ScpTo.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/ScpTo.java index 07f428c4d..2a6da62de 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/ScpTo.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/ScpTo.java @@ -1,14 +1,16 @@ package org.argeo.slc.jsch; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,30 +56,7 @@ public class ScpTo extends AbstractJschTask { } if (localResource != null) { - try { - File lFile = localResource.getFile(); - uploadFile(session, lFile, remotePath); - } catch (IOException e) { - OutputStream out = null; - InputStream in = null; - File tempFile = null; - try { - tempFile = File.createTempFile(getClass().getSimpleName() - + "-" + localResource.getFilename(), null); - out = FileUtils.openOutputStream(tempFile); - in = localResource.getInputStream(); - IOUtils.copy(in, out); - uploadFile(session, tempFile, remotePath); - } catch (IOException e1) { - throw new SlcException("Can neither interpret resource " - + localResource - + " as file, nor create a temporary file", e1); - } finally { - IOUtils.closeQuietly(in); - IOUtils.closeQuietly(out); - FileUtils.deleteQuietly(tempFile); - } - } + uploadResource(session, localResource, remoteDir); } } @@ -127,8 +106,46 @@ public class ScpTo extends AbstractJschTask { return false; } - protected void uploadFile(Session session, File localFile, String remoteFile) { - InputStream in = null; + protected void uploadFile(Session session, File file, String remoteFile) { + try { + uploadFile(session, new FileInputStream(file), file.length(), file + .getPath(), file.toString(), remoteFile); + } catch (FileNotFoundException e) { + throw new SlcException("Cannot upload " + file, e); + } + } + + protected void uploadResource(Session session, Resource resource, + String remoteFile) { + try { + File lFile = resource.getFile(); + uploadFile(session, lFile, remotePath); + } catch (IOException e) { + // no underlying file found + // load the resource in memory before transferring it + InputStream in = null; + try { + in = resource.getInputStream(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IOUtils.copy(in, out); + byte[] arr = out.toByteArray(); + ByteArrayInputStream content = new ByteArrayInputStream(arr); + uploadFile(session, content, arr.length, resource.getURL() + .getPath(), resource.toString(), remotePath); + arr = null; + } catch (IOException e1) { + throw new SlcException("Can neither interpret resource " + + localResource + + " as file, nor create a temporary file", e1); + } finally { + IOUtils.closeQuietly(in); + // no need to close byte arrays streams + } + } + } + + protected void uploadFile(Session session, InputStream in, long size, + String path, String sourceDesc, String remoteFile) { OutputStream channelOut; InputStream channelIn; try { @@ -147,13 +164,13 @@ public class ScpTo extends AbstractJschTask { // send "C0644 filesize filename", where filename should not include // '/' - long filesize = localFile.length(); + long filesize = size; command = "C0644 " + filesize + " "; - int index = localFile.getPath().lastIndexOf('/'); + int index = path.lastIndexOf('/'); if (index > 0) { - command += localFile.getPath().substring(index + 1); + command += path.substring(index + 1); } else { - command += localFile.getPath(); + command += path; } command += "\n"; @@ -162,21 +179,22 @@ public class ScpTo extends AbstractJschTask { checkAck(channelIn); if (log.isTraceEnabled()) - log.debug("Start copy of " + localFile + " to " + remoteFile + log.debug("Start copy of " + sourceDesc + " to " + remoteFile + " on " + getSshTarget() + "..."); final long oneMB = 1024l;// in KB final long tenMB = 10 * oneMB;// in KB // send a content of lfile - in = new FileInputStream(localFile); byte[] buf = new byte[1024]; long cycleCount = 0; + long nbrOfBytes = 0; while (true) { int len = in.read(buf, 0, buf.length); if (len <= 0) break; channelOut.write(buf, 0, len); // out.flush(); + nbrOfBytes = nbrOfBytes + len; if (((cycleCount % oneMB) == 0) && cycleCount != 0)// each 1 MB System.out.print('#'); if (((cycleCount % (tenMB)) == 0) && cycleCount != 0)// each 10 @@ -190,25 +208,31 @@ public class ScpTo extends AbstractJschTask { channelOut.flush(); checkAck(channelIn); - if (log.isTraceEnabled()) - log.debug((cycleCount) + " KB sent to server. (" - + (cycleCount / oneMB + " MB)")); - if (log.isDebugEnabled()) - log.debug("Finished copy to " + remoteFile + " on " - + getSshTarget() + " from " + localFile); + log.debug("Transferred to " + remoteFile + " (" + + sizeDesc(nbrOfBytes) + ") on " + getSshTarget() + + " from " + sourceDesc); IOUtils.closeQuietly(channelOut); channel.disconnect(); } catch (Exception e) { - throw new SlcException("Cannot copy " + localFile + " to " - + remoteFile, e); + throw new SlcException("Cannot copy " + path + " to " + remoteFile, + e); } finally { IOUtils.closeQuietly(in); } } + protected String sizeDesc(Long nbrOfBytes) { + if (nbrOfBytes < 1024) + return nbrOfBytes + " B"; + else if (nbrOfBytes < 1024 * 1024) + return (nbrOfBytes / 1024) + " KB"; + else + return nbrOfBytes / (1024 * 1024) + " MB"; + } + public void setLocalResource(Resource localFile) { this.localResource = localFile; } diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshFilesDeployment.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshFilesDeployment.java index 5809ca844..97ba6c740 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshFilesDeployment.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshFilesDeployment.java @@ -27,11 +27,9 @@ public class SshFilesDeployment extends AbstractJschTask implements Runnable { dir = targetBase + '/' + relPath.substring(0, lastIndexSubDir); else dir = targetBase; + if (!subDirs.contains(dir)) { - RemoteExec remoteExec = new RemoteExec(); - remoteExec.setCommand("mkdir -p " + dir); subDirs.add(dir); - multiTasks.getTasks().add(remoteExec); } // Copy resource @@ -44,6 +42,12 @@ public class SshFilesDeployment extends AbstractJschTask implements Runnable { // TODO: set permissions } + RemoteExec remoteExec = new RemoteExec(); + for (String dir : subDirs) { + remoteExec.getCommands().add("mkdir -p " + dir); + } + multiTasks.getTasks().add(0, remoteExec); + multiTasks.setSshTarget(getSshTarget()); multiTasks.run(session); } diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshShell.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshShell.java new file mode 100644 index 000000000..2ec2b72a1 --- /dev/null +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshShell.java @@ -0,0 +1,125 @@ +package org.argeo.slc.jsch; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.argeo.slc.SlcException; +import org.springframework.core.io.Resource; +import org.springframework.util.StringUtils; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.Session; + +public class SshShell extends AbstractJschTask { + private final static Log log = LogFactory.getLog(SshShell.class); + private Resource input; + + @Override + void run(Session session) { + try { + final Channel channel = session.openChannel("shell"); + + // Enable agent-forwarding. + // ((ChannelShell)channel).setAgentForwarding(true); + + // channel.setInputStream(System.in); + // channel.setInputStream(input.getInputStream()); + /* + * // a hack for MS-DOS prompt on Windows. + * channel.setInputStream(new FilterInputStream(System.in){ public + * int read(byte[] b, int off, int len)throws IOException{ return + * in.read(b, off, (len>1024?1024:len)); } }); + */ + + // channel.setOutputStream(System.out); + + /* + * // Choose the pty-type "vt102". + * ((ChannelShell)channel).setPtyType("vt102"); + */ + + /* + * // Set environment variable "LANG" as "ja_JP.eucJP". + * ((ChannelShell)channel).setEnv("LANG", "ja_JP.eucJP"); + */ + + // Writer thread + final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(channel.getOutputStream())); + + // channel.connect(); + channel.connect(3 * 1000); + + // while (!channel.isConnected()) + // try { + // Thread.sleep(500); + // } catch (InterruptedException e1) { + // // silent + // } + + Thread writerThread = new Thread("Shell writer " + getSshTarget()) { + + @Override + public void run() { + + if (log.isDebugEnabled()) + log.debug("Start writing to shell"); + + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(input + .getInputStream())); + String line = null; + while ((line = reader.readLine()) != null) { + if (!StringUtils.hasText(line)) + continue; + writer.write(line); + writer.newLine(); + } + writer.append("exit"); + writer.newLine(); + writer.flush(); + // channel.disconnect(); + } catch (IOException e) { + throw new SlcException("Cannot write to shell on " + + getSshTarget(), e); + } finally { + IOUtils.closeQuietly(reader); + } + } + }; + writerThread.start(); + + BufferedReader execIn = null; + try { + execIn = new BufferedReader(new InputStreamReader(channel + .getInputStream())); + String line = null; + while ((line = execIn.readLine()) != null) { + if (!line.trim().equals("")) + log.info(line); + } + } catch (Exception e) { + throw new SlcException("Cannot read from shell on " + + getSshTarget(), e); + } finally { + IOUtils.closeQuietly(execIn); + } + + } catch (Exception e) { + throw new SlcException("Cannot use SSH shell on " + getSshTarget(), + e); + } + } + + public void setInput(Resource input) { + this.input = input; + } + +} diff --git a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshTarget.java b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshTarget.java index 03f6990f5..2f7688945 100644 --- a/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshTarget.java +++ b/runtime/org.argeo.slc.support.simple/src/main/java/org/argeo/slc/jsch/SshTarget.java @@ -67,7 +67,7 @@ public class SshTarget { } public String toString() { - return "ssh:" + getUser() + "@" + getHost() + ":" + getPort(); + return getUser() + "@" + getHost() + ":" + getPort(); } public Session getSession() { -- 2.39.2