之前有一个业务需求:
有一个 Flink 任务代码是动态生成的,新任务生成后,提交到目标服务器集群,启动新任务,读取对应数据流
所以要能够将这个动态文件打为 jar 包,传输到目标服务器,并提交 jar 包到集群
整体流程:
写 3 个脚本:
bat脚本:位于本地 SpringBoot 项目中,只需执行打 jar 包的命令
bat脚本:位于本地 SpringBoot 项目中,将 jar 包传输到目标服务器
shell脚本:位于目标服务器上,提交 jar 包任务
第一个脚本 Java 可以直接调用,后面两个脚本,需要和远程服务器建立ssh链接,再调用指定的脚本
1 脚本文件 1.1 打 jar 包 命名为 .bat 文件,放在项目根目录下
执行时会进行打包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @echo off echo ==========================start to install all modules=============================== if not exist "%M2_HOME%\bin\mvn.cmd" echo Please set the M2_HOME variable in your environment,We need maven! & EXIT /B 2 set SERVER=common-starter eureka-server config-server device-demo message-system report-system udp-server cd %~dp0 do ( call mvn clean package ) timeout /T 3 /NOBREAK exit
1.2 jar 包传输
需要将打好的jar包 从本地上传至 linux 服务器的目标目录下
在此之前需配置ssh无密码登录
1 2 3 4 5 6 打开 windows powershell: 执行生成密钥ssh-keygen的命令: ssh-keygen -t rsa 遇到输入一路回车,返回生成秘钥的位置:回车回车回车 上传秘钥到远端服务器: cat ~/.ssh/id_*.pub | ssh root@hadoop102 'cat >> .ssh/authorized_keys'
创建 .bat 文件,执行该脚本会将对应文件移动到对应地址
1 2 3 echo 连接远程服务器 scp -r jar包地址 root@hadoop102:/opt/XXX/XXX(目标地址目录) exit
1.3 提交任务到集群 当前 jar 包存在服务器目录下,在确认启动了集群后
创建 shell 脚本,执行该脚本会将 jar 包任务提交到 Flink 集群
1 2 3 4 5 6 7 8 9 10 #!/bin/bash case $1 in "start"){ for i in hadoop102 do echo "========== $i ==========" ssh $i "/opt/Environment_DataLake/flink-1.13.3/bin/flink run -m hadoop102:8081 -c com.dwc.test.motor /opt/Environment_DataLake/flink-1.13.3/job/DataToLake-1.0-SNAPSHOT.jar" done };; esac
2 Java调用脚本 2.1 调用本地的脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import java.io.BufferedReader;import java.io.File;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;public class startbat { public static void main (String[] args) { String batPath = "XXX.bat" ; String batPath2 = "XXXX.bat" ; File batFile = new File (batPath); boolean batFileExist = batFile.exists(); System.out.println("batFileExist:" + batFileExist); if (batFileExist) { System.out.println("😊😊😊😊😊😊😊😊😊😊😊😊😊正在执行编译😊😊😊😊😊😊😊😊😊😊😊😊😊" ); callCmd(batPath); System.out.println("😊😊😊😊😊😊😊😊😊😊😊😊😊正在上传至linux系统😊😊😊😊😊😊😊😊😊😊😊😊😊" ); callCmd(batPath2); } System.out.println("😊😊😊😊😊😊😊😊😊😊😊😊😊bat脚本运行完成😊😊😊😊😊😊😊😊😊😊😊😊😊" ); } private static void callCmd (String locationCmd) { StringBuilder sb = new StringBuilder (); try { Process child = Runtime.getRuntime().exec(locationCmd); InputStream in = child.getInputStream(); BufferedReader bufferedReader=new BufferedReader (new InputStreamReader (in)); String line; while ((line=bufferedReader.readLine())!=null ) { sb.append(line + "\n" ); } in.close(); try { child.waitFor(); } catch (InterruptedException e) { System.out.println(e); } System.out.println("sb:" + sb.toString()); System.out.println("callCmd execute finished" ); } catch (IOException e) { System.out.println(e); } } }
2.2 调用远程linux脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 import ch.ethz.ssh2.Connection;import ch.ethz.ssh2.Session;import ch.ethz.ssh2.StreamGobbler;import org.apache.commons.lang3.StringUtils;import org.apache.flink.calcite.shaded.org.apache.commons.io.IOUtils;import java.io.*;public class RemoteExecuteCommand { public static void main (String[] args) throws Exception { String linuxIP = "ip地址" ; String usrName = "用户名" ; String passwd = "密码" ; RemoteExecuteCommand rec = new RemoteExecuteCommand (linuxIP, usrName, passwd); System.out.println("😊😊😊😊😊😊😊😊😊😊😊😊😊正在上传任务至集群😊😊😊😊😊😊😊😊😊😊😊😊😊" ); System.out.println(rec.execute("XXX.shell" )); System.out.println(rec.execute("jps" )); System.out.println("😊😊😊😊😊😊😊😊😊😊😊😊😊job上传完成😊😊😊😊😊😊😊😊😊😊😊😊😊" ); } private static String DEFAULTCHART = "UTF-8" ; private Connection conn; private String ip; private String userName; private String userPwd; public RemoteExecuteCommand (String ip, String userName, String userPwd) { this .ip = ip; this .userName = userName; this .userPwd = userPwd; } public Boolean login () throws Exception { boolean flg = false ; try { conn = new Connection (ip); conn.connect(); flg = conn.authenticateWithPassword(userName, userPwd); } catch (IOException e) { throw new Exception ("远程连接服务器失败" , e); } return flg; } public String execute (String cmd) throws Exception { String result = "" ; Session session = null ; try { if (login()) { session = conn.openSession(); session.execCommand(cmd); result = processStdout(session.getStdout(), DEFAULTCHART); if (StringUtils.isBlank(result)) { result = processStdout(session.getStderr(), DEFAULTCHART); } conn.close(); session.close(); } } catch (IOException e) { throw new Exception ("命令执行失败" , e); } finally { if (conn != null ) { conn.close(); } if (session != null ) { session.close(); } } return result; } private String processStdout (InputStream in, String charset) throws Exception { InputStream stdout = new StreamGobbler (in); StringBuffer buffer = new StringBuffer (); InputStreamReader isr = null ; BufferedReader br = null ; try { isr = new InputStreamReader (stdout, charset); br = new BufferedReader (isr); String line = null ; while ((line = br.readLine()) != null ) { buffer.append(line + "\n" ); } } catch (UnsupportedEncodingException e) { throw new Exception ("不支持的编码字符集异常" , e); } catch (IOException e) { throw new Exception ("读取失败" , e); } finally { IOUtils.close(br); IOUtils.close(isr); IOUtils.close(stdout); } return buffer.toString(); } }