Skip to content

Commit 0103c5a

Browse files
authored
set job desc with algorithm name (#97)
1 parent 7ee3050 commit 0103c5a

18 files changed

+28
-9
lines changed

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ object BetweennessCentralityAlgo {
2828
dataset: Dataset[Row],
2929
betweennessConfig: BetweennessConfig,
3030
hasWeight: Boolean): DataFrame = {
31+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
3132

3233
var encodeIdDf: DataFrame = null
3334

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BfsAlgo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ object BfsAlgo {
2525
* run the louvain algorithm for nebula graph
2626
*/
2727
def apply(spark: SparkSession, dataset: Dataset[Row], bfsConfig: BfsConfig): DataFrame = {
28+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
29+
2830
var encodeIdDf: DataFrame = null
2931
var finalRoot: Long = 0
3032

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClosenessAlgo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ object ClosenessAlgo {
3535
* run the Closeness algorithm for nebula graph
3636
*/
3737
def apply(spark: SparkSession, dataset: Dataset[Row], hasWeight: Boolean): DataFrame = {
38+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
39+
3840
val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight)
3941
val closenessRDD = execute(graph)
4042
val schema = StructType(

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ object ClusteringCoefficientAlgo {
2424
def apply(spark: SparkSession,
2525
dataset: Dataset[Row],
2626
coefficientConfig: CoefficientConfig): DataFrame = {
27+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
2728

2829
var encodeIdDf: DataFrame = null
2930

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil}
1010
import org.apache.log4j.Logger
1111
import org.apache.spark.graphx.{Graph, VertexId, VertexRDD}
1212
import org.apache.spark.rdd.RDD
13-
import com.vesoft.nebula.algorithm.utils.NebulaUtil
1413
import org.apache.spark.graphx.lib.ConnectedComponents
1514
import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}
1615
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -27,6 +26,7 @@ object ConnectedComponentsAlgo {
2726
dataset: Dataset[Row],
2827
ccConfig: CcConfig,
2928
hasWeight: Boolean): DataFrame = {
29+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
3030

3131
var encodeIdDf: DataFrame = null
3232

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ object DegreeStaticAlgo {
2525
def apply(spark: SparkSession,
2626
dataset: Dataset[Row],
2727
degreeConfig: DegreeStaticConfig = new DegreeStaticConfig): DataFrame = {
28+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
29+
2830
var encodeIdDf: DataFrame = null
2931

3032
val graph: Graph[None.type, Double] = if (degreeConfig.encodeId) {

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DfsAlgo.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ import scala.collection.mutable
2222

2323
object DfsAlgo {
2424
var iterNums = 0
25+
val ALGORITHM = "dfs"
2526

2627
def apply(spark: SparkSession, dataset: Dataset[Row], dfsConfig: DfsConfig): DataFrame = {
28+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
29+
2730
var encodeIdDf: DataFrame = null
2831
var finalRoot: Long = 0
2932

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp
1313
* compute all graph's triangle count
1414
*/
1515
object GraphTriangleCountAlgo {
16+
val ALGORITHM = "graphTriangleCount"
1617

1718
def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = {
19+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
1820

1921
val triangleCount = TriangleCountAlgo(spark, dataset)
2022
val count = triangleCount

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/HanpAlgo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ object HanpAlgo {
2828
hanpConfig: HanpConfig,
2929
hasWeight: Boolean,
3030
preferences: RDD[(VertexId, Double)] = null): DataFrame = {
31+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
3132

3233
var encodeIdDf: DataFrame = null
3334

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/JaccardAlgo.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,11 @@
66
package com.vesoft.nebula.algorithm.lib
77

88
import com.vesoft.nebula.algorithm.config.JaccardConfig
9+
import com.vesoft.nebula.algorithm.lib.HanpAlgo.ALGORITHM
910
import com.vesoft.nebula.algorithm.utils.{DecodeUtil, NebulaUtil}
1011
import org.apache.log4j.Logger
1112
import org.apache.spark.graphx.Graph
12-
import org.apache.spark.ml.feature.{
13-
CountVectorizer,
14-
CountVectorizerModel,
15-
MinHashLSH,
16-
MinHashLSHModel
17-
}
13+
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, MinHashLSH, MinHashLSHModel}
1814
import org.apache.spark.ml.linalg.SparseVector
1915
import org.apache.spark.rdd.RDD
2016
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -30,6 +26,7 @@ object JaccardAlgo {
3026
* run the Jaccard algorithm for nebula graph
3127
*/
3228
def apply(spark: SparkSession, dataset: Dataset[Row], jaccardConfig: JaccardConfig): DataFrame = {
29+
spark.sparkContext.setJobGroup(ALGORITHM, s"Running $ALGORITHM")
3330

3431
var encodeIdDf: DataFrame = null
3532
var data: DataFrame = dataset

0 commit comments

Comments
 (0)