Skip to main content
Version: 0.11.0

Use of 0.X SDK

Linkis provides a convenient interface for calling JAVA and SCALA, which can be used only by introducing the ujes-client module

1 Introduce dependent modules

<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-ujes-client</artifactId>
<version>0.9.3</version>
</dependency>

2 Java test code

Establish the Java test class UJESClientImplTestJ, the specific interface meaning can be seen in the notes:

package com.wedatasphere.linkis.ujes.test;

import com.webank.wedatasphere.linkis.common.utils.Utils;
import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy;
import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;
import com.webank.wedatasphere.linkis.ujes.client.request.ResultSetAction;
import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
import com.webank.wedatasphere.linkis.ujes.client.response.JobInfoResult;
import com.webank.wedatasphere.linkis.ujes.client.response.JobProgressResult;
import org.apache.commons.io.IOUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class UJESClientTest {

public static void main(String[] args){

String user = "hadoop";
String executeCode = "show databases;";

// 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder
DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
.addUJESServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the linkis server-side gateway, such as http://{ip}:{port}
.connectionTimeout(30000) //connectionTimeOut client connection timeout
.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered
.loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless
.maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent
.retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry
.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method
.setAuthTokenKey("${username}").setAuthTokenValue("${password}"))) //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name
.setDWSVersion("v1").build(); //Linkis background protocol version, the current version is v1

// 2. Get a UJESClient through DWSClientConfig
UJESClient client = new UJESClientImpl(clientConfig);

try {
// 3. Start code execution
System.out.println("user: "+ user + ", code: [" + executeCode + "]");
Map<String, Object> startupMap = new HashMap<String, Object>();
startupMap.put("wds.linkis.yarnqueue", "default"); // Various startup parameters can be stored in startupMap, see linkis management console configuration
JobExecuteResult jobExecuteResult = client.execute(JobExecuteAction.builder()
.setCreator("linkisClient-Test") //creator, the system name of the client requesting linkis, used for system-level isolation
.addExecuteCode(executeCode) //ExecutionCode The code to be executed
.setEngineType((JobExecuteAction.EngineType) JobExecuteAction.EngineType$.MODULE$.HIVE()) // The execution engine type of the linkis that you want to request, such as Spark hive, etc.
.setUser(user) //User, request user; used for user-level multi-tenant isolation
.setStartupParams(startupMap)
.build());
System.out.println("execId: "+ jobExecuteResult.getExecID() + ", taskId:" + jobExecuteResult.taskID());

// 4. Get the execution status of the script
JobInfoResult jobInfoResult = client.getJobInfo(jobExecuteResult);
int sleepTimeMills = 1000;
while(!jobInfoResult.isCompleted()) {
// 5. Get the execution progress of the script
JobProgressResult progress = client.progress(jobExecuteResult);
Utils.sleepQuietly(sleepTimeMills);
jobInfoResult = client.getJobInfo(jobExecuteResult);
}

// 6. Get the job information of the script
JobInfoResult jobInfo = client.getJobInfo(jobExecuteResult);
// 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)
String resultSet = jobInfo.getResultSetList(client)[0];
// 8. Get a specific result set through a result set information
Object fileContents = client.resultSet(ResultSetAction.builder().setPath(resultSet).setUser(jobExecuteResult.getUser()).build()).getFileContent();
System.out.println("fileContents: "+ fileContents);

} catch (Exception e) {
e.printStackTrace();
IOUtils.closeQuietly(client);
}
IOUtils.closeQuietly(client);
}
}

Run the above code to interact with Linkis

3 Scala test code

package com.wedatasphere.linkis.ujes.test

import java.util.concurrent.TimeUnit

import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder
import com.webank.wedatasphere.linkis.ujes.client.UJESClient
import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction.EngineType
import com.webank.wedatasphere.linkis.ujes.client.request.{JobExecuteAction, ResultSetAction}
import org.apache.commons.io.IOUtils

object UJESClientImplTest extends App {

var executeCode = "show databases;"
var user = "hadoop"

// 1. Configure DWSClientBuilder, get a DWSClientConfig through DWSClientBuilder
val clientConfig = DWSClientConfigBuilder.newBuilder()
.addUJESServerUrl("http://${ip}:${port}") //Specify ServerUrl, the address of the Linkis server-side gateway, such as http://{ip}:{port}
.connectionTimeout(30000) //connectionTimeOut client connection timeout
.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) //Whether to enable registration discovery, if enabled, the newly launched Gateway will be automatically discovered
.loadbalancerEnabled(true) // Whether to enable load balancing, if registration discovery is not enabled, load balancing is meaningless
.maxConnectionSize(5) //Specify the maximum number of connections, that is, the maximum number of concurrent
.retryEnabled(false).readTimeout(30000) //execution failed, whether to allow retry
.setAuthenticationStrategy(new StaticAuthenticationStrategy()) //AuthenticationStrategy Linkis authentication method
.setAuthTokenKey("${username}").setAuthTokenValue("${password}") //Authentication key, generally the user name; authentication value, generally the password corresponding to the user name
.setDWSVersion("v1").build() //Linkis backend protocol version, the current version is v1

// 2. Get a UJESClient through DWSClientConfig
val client = UJESClient(clientConfig)

try {
// 3. Start code execution
println("user: "+ user + ", code: [" + executeCode + "]")
val startupMap = new java.util.HashMap[String, Any]()
startupMap.put("wds.linkis.yarnqueue", "default") //Startup parameter configuration
val jobExecuteResult = client.execute(JobExecuteAction.builder()
.setCreator("LinkisClient-Test") //creator, requesting the system name of the Linkis client, used for system-level isolation
.addExecuteCode(executeCode) //ExecutionCode The code to be executed
.setEngineType(EngineType.SPARK) // The execution engine type of Linkis that you want to request, such as Spark hive, etc.
.setStartupParams(startupMap)
.setUser(user).build()) //User, request user; used for user-level multi-tenant isolation
println("execId: "+ jobExecuteResult.getExecID + ", taskId:" + jobExecuteResult.taskID)

// 4. Get the execution status of the script
var jobInfoResult = client.getJobInfo(jobExecuteResult)
val sleepTimeMills: Int = 1000
while (!jobInfoResult.isCompleted) {
// 5. Get the execution progress of the script
val progress = client.progress(jobExecuteResult)
val progressInfo = if (progress.getProgressInfo != null) progress.getProgressInfo.toList else List.empty
println("progress: "+ progress.getProgress + ", progressInfo:" + progressInfo)
Utils.sleepQuietly(sleepTimeMills)
jobInfoResult = client.getJobInfo(jobExecuteResult)
}
if (!jobInfoResult.isSucceed) {
println("Failed to execute job: "+ jobInfoResult.getMessage)
throw new Exception(jobInfoResult.getMessage)
}

// 6. Get the job information of the script
val jobInfo = client.getJobInfo(jobExecuteResult)
// 7. Get the list of result sets (if the user submits multiple SQL at a time, multiple result sets will be generated)
val resultSetList = jobInfoResult.getResultSetList(client)
println("All result set list:")
resultSetList.foreach(println)
val oneResultSet = jobInfo.getResultSetList(client).head
// 8. Get a specific result set through a result set information
val fileContents = client.resultSet(ResultSetAction.builder().setPath(oneResultSet).setUser(jobExecuteResult.getUser).build()).getFileContent
println("First fileContents: ")
println(fileContents)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
IOUtils.closeQuietly(client)
}