Skip to content

Commit be7a732

Browse files
committed
feat(backend): fix
1 parent 60fdfb5 commit be7a732

File tree

4 files changed

+19
-348
lines changed

4 files changed

+19
-348
lines changed

backend/modules/observability/infra/repo/ck/annotation.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ func (a *AnnotationCkDaoImpl) List(ctx context.Context, params *ListAnnotationsP
121121
return annotations, nil
122122
}
123123

124-
// annoSqlParam 内部SQL构建参数
125124
type annoSqlParam struct {
126125
Tables []string
127126
StartTime int64
@@ -167,37 +166,28 @@ func (a *AnnotationCkDaoImpl) buildSql(ctx context.Context, param *annoSqlParam)
167166

168167
// buildSingleSql 构建单表查询SQL
169168
func (a *AnnotationCkDaoImpl) buildSingleSql(ctx context.Context, db *gorm.DB, tableName string, param *annoSqlParam) (*gorm.DB, error) {
170-
sqlQuery := db.Table(tableName)
171-
if param.StartTime > 0 {
172-
sqlQuery = sqlQuery.Where("start_time >= ?", param.StartTime)
173-
}
174-
if param.EndTime > 0 {
175-
sqlQuery = sqlQuery.Where("start_time <= ?", param.EndTime)
176-
}
169+
sqlQuery := db.
170+
Table(tableName).
171+
Where("deleted_at = 0")
172+
177173
if param.ID != "" {
178174
sqlQuery = sqlQuery.Where("id = ?", param.ID)
179175
}
180176
if len(param.SpanIDs) > 0 {
181177
sqlQuery = sqlQuery.Where("span_id IN (?)", param.SpanIDs)
182178
}
179+
sqlQuery = sqlQuery.
180+
Where("start_time >= ?", param.StartTime).
181+
Where("start_time <= ?", param.EndTime)
183182
if param.DescByUpdatedAt {
184183
sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{
185184
{Column: clause.Column{Name: "updated_at"}, Desc: true},
186185
}})
186+
} else {
187+
sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{
188+
{Column: clause.Column{Name: "created_at"}, Desc: false},
189+
}})
187190
}
188-
// 添加软删除过滤
189-
sqlQuery = sqlQuery.Where("deleted_at = 0")
190-
191-
// 添加分区优化查询
192-
if param.StartTime > 0 && param.EndTime > 0 {
193-
partitions := convertIntoPartitions(param.StartTime, param.EndTime)
194-
sqlQuery = sqlQuery.
195-
Where("start_date >= ?", partitions[0]).
196-
Where("start_date <= ?", partitions[len(partitions)-1])
197-
}
198-
199-
if param.Limit > 0 {
200-
sqlQuery = sqlQuery.Limit(int(param.Limit))
201-
}
191+
sqlQuery = sqlQuery.Limit(int(param.Limit))
202192
return sqlQuery, nil
203-
}
193+
}

backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/modules/observability/infra/repo/ck/spans.go

Lines changed: 5 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"fmt"
1010
"strconv"
1111
"strings"
12-
"time"
1312

1413
"gorm.io/gorm"
1514
"gorm.io/gorm/clause"
@@ -26,19 +25,11 @@ const (
2625
QueryTypeGetTrace = "get_trace"
2726
QueryTypeListSpans = "list_spans"
2827

29-
// 自动评估标签
30-
AnnotationAutoEvaluateFieldPrefix = "evaluator_version_"
3128
// 人工标注标签
3229
AnnotationManualFeedbackFieldPrefix = "manual_feedback_"
33-
// Coze会话反馈标签
34-
AnnotationCozeChatFeedbackField = "feedback_coze"
3530

36-
// 自动评估标签类型
37-
AnnotationAutoEvaluateType = "auto_evaluate"
3831
// 人工标注标签类型
3932
AnnotationManualFeedbackType = "manual_feedback"
40-
// Coze反馈标签类型
41-
AnnotationCozeChatFeedbackType = "coze_feedback"
4233
)
4334

4435
type QueryParam struct {
@@ -116,7 +107,6 @@ type buildSqlParam struct {
116107
annoTable string
117108
queryParam *QueryParam
118109
db *gorm.DB
119-
partitions []string
120110
omitColumns []string
121111
}
122112

@@ -129,7 +119,6 @@ func (s *SpansCkDaoImpl) buildSql(ctx context.Context, param *QueryParam) (*gorm
129119
annoTable: param.AnnoTableMap[table],
130120
queryParam: param,
131121
db: db,
132-
partitions: convertIntoPartitions(param.StartTime, param.EndTime),
133122
omitColumns: param.OmitColumns,
134123
})
135124
if err != nil {
@@ -163,23 +152,11 @@ func (s *SpansCkDaoImpl) buildSingleSql(ctx context.Context, param *buildSqlPara
163152
if err != nil {
164153
return nil, err
165154
}
166-
queryColumns := getColumnStr(spanColumns, param.omitColumns)
167155
sqlQuery = param.db.
168156
Table(param.spanTable).
169-
Select(queryColumns).
170157
Where(sqlQuery).
171-
Where("start_date >= ?", param.partitions[0]).
172-
Where("start_date <= ?", param.partitions[len(param.partitions)-1]).
173158
Where("start_time >= ?", param.queryParam.StartTime).
174159
Where("start_time <= ?", param.queryParam.EndTime)
175-
if param.queryParam.QueryType == QueryTypeListSpans {
176-
// 针对list spans添加子查询
177-
subQuery, err := s.buildSubQuerySql(ctx, param)
178-
if err != nil {
179-
return nil, err
180-
}
181-
sqlQuery = sqlQuery.Where("start_time in (?)", subQuery)
182-
}
183160
if param.queryParam.OrderByStartTime {
184161
sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{
185162
{Column: clause.Column{Name: "start_time"}, Desc: true},
@@ -200,8 +177,6 @@ func (s *SpansCkDaoImpl) buildSubQuerySql(ctx context.Context, param *buildSqlPa
200177
Table(param.spanTable).
201178
Select("start_time").
202179
Where(sqlQuery).
203-
Where("start_date >= ?", param.partitions[0]).
204-
Where("start_date <= ?", param.partitions[len(param.partitions)-1]).
205180
Where("start_time >= ?", param.queryParam.StartTime).
206181
Where("start_time <= ?", param.queryParam.EndTime)
207182
if param.queryParam.OrderByStartTime {
@@ -298,8 +273,7 @@ func (s *SpansCkDaoImpl) buildFieldCondition(ctx context.Context, db *gorm.DB, f
298273
if len(fieldValues) != 1 {
299274
return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName)
300275
}
301-
// 使用hasTokens函数进行ClickHouse的token匹配
302-
queryChain = queryChain.Where(fmt.Sprintf("hasTokens(%s, ?)", filter.FieldName), fieldValues[0])
276+
queryChain = queryChain.Where(fmt.Sprintf("%s like ?", filter.FieldName), fmt.Sprintf("%%%v%%", fieldValues[0]))
303277
case loop_span.QueryTypeEnumEq:
304278
if len(fieldValues) != 1 {
305279
return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName)
@@ -359,11 +333,7 @@ func (s *SpansCkDaoImpl) buildFieldCondition(ctx context.Context, db *gorm.DB, f
359333
}
360334

361335
func (s *SpansCkDaoImpl) isAnnotationFilter(fieldName string) bool {
362-
if strings.HasPrefix(fieldName, AnnotationAutoEvaluateFieldPrefix) {
363-
return true
364-
} else if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) {
365-
return true
366-
} else if fieldName == AnnotationCozeChatFeedbackField {
336+
if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) {
367337
return true
368338
} else {
369339
return false
@@ -373,17 +343,7 @@ func (s *SpansCkDaoImpl) isAnnotationFilter(fieldName string) bool {
373343
func (s *SpansCkDaoImpl) buildAnnotationSql(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterField) (*gorm.DB, error) {
374344
queryChain := param.db
375345
fieldName := filter.FieldName
376-
if strings.HasPrefix(fieldName, AnnotationAutoEvaluateFieldPrefix) {
377-
// evaluator_version_{version_id}
378-
evaluatorVersionID := fieldName[len(AnnotationAutoEvaluateFieldPrefix):]
379-
if evaluatorVersionID == "" {
380-
return nil, fmt.Errorf("invalid auto evaluator field name %s", fieldName)
381-
}
382-
queryChain = queryChain.
383-
Where("annotation_type = ?", AnnotationAutoEvaluateType).
384-
Where("has(annotation_index, ?)", evaluatorVersionID).
385-
Where("get_json_object(metadata, '$.evaluator_version_id') = ?", evaluatorVersionID)
386-
} else if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) {
346+
if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) {
387347
// manual_feedback_{tag_key_id}
388348
tagKeyId := fieldName[len(AnnotationManualFeedbackFieldPrefix):]
389349
if tagKeyId == "" {
@@ -392,10 +352,6 @@ func (s *SpansCkDaoImpl) buildAnnotationSql(ctx context.Context, param *buildSql
392352
queryChain = queryChain.
393353
Where("annotation_type = ?", AnnotationManualFeedbackType).
394354
Where("key = ?", tagKeyId)
395-
} else if fieldName == AnnotationCozeChatFeedbackField {
396-
queryChain = queryChain.
397-
Where("annotation_type = ?", AnnotationCozeChatFeedbackType).
398-
Where("key = ?", "chat_feedback")
399355
} else {
400356
return nil, fmt.Errorf("field name %s not supported for annotation, not supposed to be here", fieldName)
401357
}
@@ -423,7 +379,7 @@ func (s *SpansCkDaoImpl) buildAnnotationSql(ctx context.Context, param *buildSql
423379
}
424380
queryChain = queryChain.Where(fieldSql)
425381
}
426-
param.queryParam.Filters.Traverse(func(f *loop_span.FilterField) error {
382+
_ = param.queryParam.Filters.Traverse(func(f *loop_span.FilterField) error {
427383
if f.FieldName == loop_span.SpanFieldSpaceId {
428384
commonSql, err := s.buildFieldCondition(ctx, param.db, f)
429385
if err != nil {
@@ -438,8 +394,6 @@ func (s *SpansCkDaoImpl) buildAnnotationSql(ctx context.Context, param *buildSql
438394
Select("span_id").
439395
Where(queryChain).
440396
Where("deleted_at = 0").
441-
Where("start_date >= ?", param.partitions[0]).
442-
Where("start_date <= ?", param.partitions[len(param.partitions)-1]).
443397
Where("start_time >= ?", param.queryParam.StartTime).
444398
Where("start_time <= ?", param.queryParam.EndTime)
445399
return param.db.Where("span_id in (?)", subsql), nil
@@ -559,65 +513,7 @@ func quoteSQLName(data string) string {
559513
return buf.String()
560514
}
561515

562-
// 时间分区转换函数
563-
func convertIntoPartitions(startAt, endAt int64) []string {
564-
// 将微秒时间戳转换为日期分区
565-
startTime := time.Unix(startAt/1000000, 0)
566-
endTime := time.Unix(endAt/1000000, 0)
567-
568-
startDate := startTime.Format("2006-01-02")
569-
endDate := endTime.Format("2006-01-02")
570-
571-
return []string{startDate, endDate}
572-
}
573-
574-
// 获取列字符串
575-
func getColumnStr(allColumns []string, omitColumns []string) string {
576-
if len(omitColumns) == 0 {
577-
return strings.Join(allColumns, ", ")
578-
}
579-
omitMap := make(map[string]bool)
580-
for _, col := range omitColumns {
581-
omitMap[col] = true
582-
}
583-
var resultColumns []string
584-
for _, col := range allColumns {
585-
if !omitMap[col] {
586-
resultColumns = append(resultColumns, col)
587-
}
588-
}
589-
return strings.Join(resultColumns, ", ")
590-
}
591-
592516
var (
593-
spanColumns = []string{
594-
"start_time",
595-
"logid",
596-
"span_id",
597-
"trace_id",
598-
"parent_id",
599-
"duration",
600-
"psm",
601-
"call_type",
602-
"space_id",
603-
"span_type",
604-
"span_name",
605-
"method",
606-
"status_code",
607-
"input",
608-
"output",
609-
"object_storage",
610-
"system_tags_string",
611-
"system_tags_long",
612-
"system_tags_float",
613-
"tags_string",
614-
"tags_long",
615-
"tags_bool",
616-
"tags_float",
617-
"tags_byte",
618-
"reserve_create_time",
619-
"logic_delete_date",
620-
}
621517
defSuperFieldsMap = map[string]bool{
622518
loop_span.SpanFieldStartTime: true,
623519
loop_span.SpanFieldSpanId: true,
@@ -637,4 +533,4 @@ var (
637533
loop_span.SpanFieldObjectStorage: true,
638534
loop_span.SpanFieldLogicDeleteDate: true,
639535
}
640-
)
536+
)

0 commit comments

Comments
 (0)