-
Notifications
You must be signed in to change notification settings - Fork 569
feat(connect): support connect openTelemetry and log for 1.6 #2961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
fcef98a
e59f4c5
e678d7e
b11139d
ddc7d97
78ec2c4
67c1a5c
08eb317
599ebaf
289ce6a
9dea756
faac7ab
f2ecc16
40a00d9
e0fc4e8
c3ee0b0
231b86a
57e4db1
651ab89
9650799
163c2af
a4066df
23abb0f
0e3e1e5
82abcd7
43345d4
febb8b5
a0de21a
0abd7d0
fe0cb08
d440ab2
ac93c25
2e4ac84
4395020
b62b540
7e8f404
b449d92
3ba9024
7342979
8b6c595
b2176e5
d99dd29
adfe066
0f08d0a
801f85e
3316b64
7bdc210
73bb11d
fef7ae5
64d8876
bb91092
9028c55
f73026b
edff8d7
364fbb9
a7e40c8
38dde3f
2a2e4b3
8a6ce0b
9c9f2f6
0c698dc
90e4803
0732256
2df6dab
d98c524
02c8db6
c7d7822
6d3814d
c113dce
d680e10
df2a0dd
5a4bcb7
0602752
fbb1319
5bd7389
0e8bf96
380c066
13b8079
f116cf0
f249a14
2129f92
bdb890a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| # AutoMQ Log Uploader Module | ||
|
|
||
| This module provides asynchronous S3 log upload capability based on Log4j 1.x. Other submodules only need to depend on this module and configure it simply to synchronize logs to object storage. Core components: | ||
|
|
||
| - `com.automq.log.uploader.S3RollingFileAppender`: Extends `RollingFileAppender`, pushes log events to the uploader while writing to local files. | ||
| - `com.automq.log.uploader.LogUploader`: Asynchronously buffers, compresses, and uploads logs; supports configuration switches and periodic cleanup. | ||
| - `com.automq.log.uploader.S3LogConfig`/`S3LogConfigProvider`: Abstracts the configuration required for uploading. The default implementation `PropertiesS3LogConfigProvider` reads from `automq-log.properties`. | ||
|
|
||
| ## Quick Integration | ||
|
|
||
| 1. Add dependency in your module's `build.gradle`: | ||
| ```groovy | ||
| implementation project(':automq-log-uploader') | ||
| ``` | ||
| 2. Create `automq-log.properties` in the resources directory (or customize `S3LogConfigProvider`): | ||
| ```properties | ||
| log.s3.enable=true | ||
| log.s3.bucket=0@s3://your-log-bucket?region=us-east-1 | ||
| log.s3.cluster.id=my-cluster | ||
| log.s3.node.id=1 | ||
| log.s3.selector.type=kafka | ||
| log.s3.selector.kafka.bootstrap.servers=PLAINTEXT://kafka:9092 | ||
| log.s3.selector.kafka.group.id=automq-log-uploader-my-cluster | ||
| ``` | ||
| 3. Reference the Appender in `log4j.properties`: | ||
| ```properties | ||
| log4j.appender.s3_uploader=com.automq.log.uploader.S3RollingFileAppender | ||
| log4j.appender.s3_uploader.File=logs/server.log | ||
| log4j.appender.s3_uploader.MaxFileSize=100MB | ||
| log4j.appender.s3_uploader.MaxBackupIndex=10 | ||
| log4j.appender.s3_uploader.layout=org.apache.log4j.PatternLayout | ||
| log4j.appender.s3_uploader.layout.ConversionPattern=[%d] %p %m (%c)%n | ||
| ``` | ||
| If you need to customize the configuration provider, you can set: | ||
| ```properties | ||
| log4j.appender.s3_uploader.configProviderClass=com.example.CustomS3LogConfigProvider | ||
| ``` | ||
|
|
||
| ## Key Configuration Description | ||
|
|
||
| | Configuration Item | Description | | ||
| | ------ | ---- | | ||
| | `log.s3.enable` | Whether to enable S3 upload function. | ||
| | `log.s3.bucket` | It is recommended to use AutoMQ Bucket URI (e.g. `0@s3://bucket?region=us-east-1&pathStyle=true`). If using a shorthand bucket name, additional fields such as `log.s3.region` need to be provided. | ||
| | `log.s3.cluster.id` / `log.s3.node.id` | Used to construct the object storage path `automq/logs/{cluster}/{node}/{hour}/{uuid}`. | ||
| | `log.s3.selector.type` | Leader election strategy (`static`, `nodeid`, `file`, `kafka`, or custom). | ||
| | `log.s3.primary.node` | Used with `static` strategy to indicate whether the current node is the primary node. | ||
| | `log.s3.selector.kafka.*` | Additional configuration required for Kafka leader election, such as `bootstrap.servers`, `group.id`, etc. | ||
| | `log.s3.active.controller` | **Deprecated**, please use `log.s3.selector.type=static` + `log.s3.primary.node=true`. | ||
|
|
||
| The upload schedule can be overridden by environment variables: | ||
|
|
||
| - `AUTOMQ_OBSERVABILITY_UPLOAD_INTERVAL`: Maximum upload interval (milliseconds). | ||
| - `AUTOMQ_OBSERVABILITY_CLEANUP_INTERVAL`: Retention period (milliseconds), old objects earlier than this time will be cleaned up. | ||
|
|
||
| ### Leader Election Strategies | ||
|
|
||
| To avoid multiple nodes executing S3 cleanup tasks simultaneously, the log uploader has a built-in leader election mechanism consistent with the OpenTelemetry module: | ||
|
|
||
| 1. **static**: Specify which node is the leader using `log.s3.primary.node=true|false`. | ||
| 2. **nodeid**: Becomes the leader node when `log.s3.node.id` equals `primaryNodeId`, which can be set in the URL or properties with `log.s3.selector.primary.node.id`. | ||
| 3. **file**: Uses a shared file for preemptive leader election, configure `log.s3.selector.file.leaderFile=/shared/leader`, `log.s3.selector.file.leaderTimeoutMs=60000`. | ||
| 4. **kafka**: Default strategy. All nodes join the same consumer group of a single-partition topic, the node holding the partition becomes the leader. Necessary configuration: | ||
| ```properties | ||
| log.s3.selector.type=kafka | ||
| log.s3.selector.kafka.bootstrap.servers=PLAINTEXT://kafka:9092 | ||
| log.s3.selector.kafka.topic=__automq_log_uploader_leader_cluster1 | ||
| log.s3.selector.kafka.group.id=automq-log-uploader-cluster1 | ||
| ``` | ||
| Advanced parameters such as security (SASL/SSL), timeout, etc. can be provided through `log.s3.selector.kafka.*`. | ||
| 5. **custom**: Implement `com.automq.log.uploader.selector.LogUploaderNodeSelectorProvider` and register it through SPI to introduce a custom leader election strategy. | ||
|
|
||
| ## Extension | ||
|
|
||
| If the application already has its own dependency injection/configuration method, you can implement `S3LogConfigProvider` and call it at startup: | ||
|
|
||
| ```java | ||
| import com.automq.log.uploader.S3RollingFileAppender; | ||
|
|
||
| S3RollingFileAppender.setConfigProvider(new CustomConfigProvider()); | ||
| ``` | ||
|
|
||
| All `S3RollingFileAppender` instances will share this provider. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| plugins { | ||
|
||
| id 'java-library' | ||
| } | ||
|
|
||
| repositories { | ||
| mavenCentral() | ||
| } | ||
|
|
||
| dependencies { | ||
| api project(':s3stream') | ||
|
|
||
| implementation project(':clients') | ||
| implementation libs.reload4j | ||
| implementation libs.slf4jApi | ||
| implementation libs.slf4jBridge | ||
| implementation libs.nettyBuffer | ||
| implementation libs.guava | ||
| implementation libs.commonLang | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| /* | ||
| * Copyright 2025, AutoMQ HK Limited. | ||
| * | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.automq.log.uploader; | ||
|
|
||
| import com.automq.log.uploader.selector.LogUploaderNodeSelector; | ||
| import com.automq.log.uploader.selector.LogUploaderNodeSelectorFactory; | ||
| import com.automq.stream.s3.operator.BucketURI; | ||
| import com.automq.stream.s3.operator.ObjectStorage; | ||
| import com.automq.stream.s3.operator.ObjectStorageFactory; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.util.HashMap; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
|
|
||
| import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_ACTIVE_CONTROLLER; | ||
| import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_CLUSTER_ID; | ||
| import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_ENABLE; | ||
| import static com.automq.log.uploader.LogConfigConstants.DEFAULT_LOG_S3_NODE_ID; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_PROPERTIES_FILE; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ACCESS_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ACTIVE_CONTROLLER_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_BUCKET_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_CLUSTER_ID_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ENABLE_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_ENDPOINT_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_NODE_ID_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_PRIMARY_NODE_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_REGION_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SECRET_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SELECTOR_PREFIX; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SELECTOR_PRIMARY_NODE_ID_KEY; | ||
| import static com.automq.log.uploader.LogConfigConstants.LOG_S3_SELECTOR_TYPE_KEY; | ||
|
|
||
| public class DefaultS3LogConfig implements S3LogConfig { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3LogConfig.class); | ||
|
|
||
| private final Properties props; | ||
| private ObjectStorage objectStorage; | ||
| private LogUploaderNodeSelector nodeSelector; | ||
|
|
||
| public DefaultS3LogConfig() { | ||
| this(null); | ||
| } | ||
|
|
||
| public DefaultS3LogConfig(Properties overrideProps) { | ||
woshigaopp marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.props = new Properties(); | ||
| if (overrideProps != null) { | ||
| this.props.putAll(overrideProps); | ||
| } | ||
| if (overrideProps == null) { | ||
| try (InputStream input = getClass().getClassLoader().getResourceAsStream(LOG_PROPERTIES_FILE)) { | ||
| if (input != null) { | ||
| props.load(input); | ||
| LOGGER.info("Loaded log configuration from {}", LOG_PROPERTIES_FILE); | ||
| } else { | ||
| LOGGER.warn("Could not find {}, using default log configurations.", LOG_PROPERTIES_FILE); | ||
| } | ||
| } catch (IOException ex) { | ||
| LOGGER.error("Failed to load log configuration from {}.", LOG_PROPERTIES_FILE, ex); | ||
| } | ||
| } | ||
| initializeNodeSelector(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEnabled() { | ||
| return Boolean.parseBoolean(props.getProperty(LOG_S3_ENABLE_KEY, String.valueOf(DEFAULT_LOG_S3_ENABLE))); | ||
| } | ||
|
|
||
| @Override | ||
| public String clusterId() { | ||
| return props.getProperty(LOG_S3_CLUSTER_ID_KEY, DEFAULT_LOG_S3_CLUSTER_ID); | ||
| } | ||
|
|
||
| @Override | ||
| public int nodeId() { | ||
| return Integer.parseInt(props.getProperty(LOG_S3_NODE_ID_KEY, String.valueOf(DEFAULT_LOG_S3_NODE_ID))); | ||
| } | ||
|
|
||
| @Override | ||
| public synchronized ObjectStorage objectStorage() { | ||
| if (this.objectStorage != null) { | ||
| return this.objectStorage; | ||
| } | ||
| String bucket = props.getProperty(LOG_S3_BUCKET_KEY); | ||
| if (StringUtils.isBlank(bucket)) { | ||
| LOGGER.error("Mandatory log config '{}' is not set.", LOG_S3_BUCKET_KEY); | ||
| return null; | ||
| } | ||
|
|
||
| String normalizedBucket = bucket.trim(); | ||
| if (!normalizedBucket.contains("@")) { | ||
| String region = props.getProperty(LOG_S3_REGION_KEY); | ||
| if (StringUtils.isBlank(region)) { | ||
| LOGGER.error("'{}' must be provided when '{}' is not a full AutoMQ bucket URI.", | ||
| LOG_S3_REGION_KEY, LOG_S3_BUCKET_KEY); | ||
| return null; | ||
| } | ||
| String endpoint = props.getProperty(LOG_S3_ENDPOINT_KEY); | ||
| String accessKey = props.getProperty(LOG_S3_ACCESS_KEY); | ||
| String secretKey = props.getProperty(LOG_S3_SECRET_KEY); | ||
|
|
||
| StringBuilder builder = new StringBuilder("0@s3://").append(normalizedBucket) | ||
| .append("?region=").append(region.trim()); | ||
| if (StringUtils.isNotBlank(endpoint)) { | ||
| builder.append("&endpoint=").append(endpoint.trim()); | ||
| } | ||
| if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { | ||
| builder.append("&authType=static") | ||
| .append("&accessKey=").append(accessKey.trim()) | ||
| .append("&secretKey=").append(secretKey.trim()); | ||
| } | ||
| normalizedBucket = builder.toString(); | ||
| } | ||
|
|
||
| BucketURI logBucket = BucketURI.parse(normalizedBucket); | ||
| this.objectStorage = ObjectStorageFactory.instance().builder(logBucket).threadPrefix("s3-log-uploader").build(); | ||
| return this.objectStorage; | ||
| } | ||
|
|
||
| @Override | ||
| public LogUploaderNodeSelector nodeSelector() { | ||
| if (nodeSelector == null) { | ||
| initializeNodeSelector(); | ||
| } | ||
| return nodeSelector; | ||
| } | ||
|
|
||
| private void initializeNodeSelector() { | ||
| String selectorType = props.getProperty(LOG_S3_SELECTOR_TYPE_KEY, "static"); | ||
| Map<String, String> selectorConfig = new HashMap<>(); | ||
| Map<String, String> rawConfig = getPropertiesWithPrefix(LOG_S3_SELECTOR_PREFIX); | ||
| String normalizedType = selectorType == null ? "" : selectorType.toLowerCase(Locale.ROOT); | ||
| for (Map.Entry<String, String> entry : rawConfig.entrySet()) { | ||
| String key = entry.getKey(); | ||
| if (normalizedType.length() > 0 && key.toLowerCase(Locale.ROOT).startsWith(normalizedType + ".")) { | ||
| key = key.substring(normalizedType.length() + 1); | ||
| } | ||
| if ("type".equalsIgnoreCase(key) || key.isEmpty()) { | ||
| continue; | ||
| } | ||
| selectorConfig.putIfAbsent(key, entry.getValue()); | ||
| } | ||
|
|
||
| selectorConfig.putIfAbsent("isPrimaryUploader", | ||
woshigaopp marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| props.getProperty(LOG_S3_PRIMARY_NODE_KEY, | ||
| props.getProperty(LOG_S3_ACTIVE_CONTROLLER_KEY, String.valueOf(DEFAULT_LOG_S3_ACTIVE_CONTROLLER)))); | ||
woshigaopp marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| String primaryNodeId = props.getProperty(LOG_S3_SELECTOR_PRIMARY_NODE_ID_KEY); | ||
| if (StringUtils.isNotBlank(primaryNodeId)) { | ||
| selectorConfig.putIfAbsent("primaryNodeId", primaryNodeId.trim()); | ||
| } | ||
|
|
||
| try { | ||
| this.nodeSelector = LogUploaderNodeSelectorFactory.createSelector(selectorType, clusterId(), nodeId(), selectorConfig); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to create log uploader selector of type {}", selectorType, e); | ||
| this.nodeSelector = LogUploaderNodeSelector.staticSelector(false); | ||
| } | ||
| } | ||
|
|
||
| private Map<String, String> getPropertiesWithPrefix(String prefix) { | ||
| Map<String, String> result = new HashMap<>(); | ||
| if (prefix == null || prefix.isEmpty()) { | ||
| return result; | ||
| } | ||
| for (String key : props.stringPropertyNames()) { | ||
| if (key.startsWith(prefix)) { | ||
| String trimmed = key.substring(prefix.length()); | ||
| if (!trimmed.isEmpty()) { | ||
| result.put(trimmed, props.getProperty(key)); | ||
| } | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * Copyright 2025, AutoMQ HK Limited. | ||
| * | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.automq.log.uploader; | ||
|
|
||
| public class LogConfigConstants { | ||
| private LogConfigConstants() { | ||
| } | ||
|
|
||
| public static final String LOG_PROPERTIES_FILE = "automq-log.properties"; | ||
|
|
||
| public static final String LOG_S3_ENABLE_KEY = "log.s3.enable"; | ||
| public static final boolean DEFAULT_LOG_S3_ENABLE = false; | ||
|
|
||
| public static final String LOG_S3_BUCKET_KEY = "log.s3.bucket"; | ||
| public static final String LOG_S3_REGION_KEY = "log.s3.region"; | ||
| public static final String LOG_S3_ENDPOINT_KEY = "log.s3.endpoint"; | ||
|
|
||
| public static final String LOG_S3_ACCESS_KEY = "log.s3.access.key"; | ||
| public static final String LOG_S3_SECRET_KEY = "log.s3.secret.key"; | ||
|
|
||
| public static final String LOG_S3_CLUSTER_ID_KEY = "log.s3.cluster.id"; | ||
| public static final String DEFAULT_LOG_S3_CLUSTER_ID = "automq-cluster"; | ||
|
|
||
| public static final String LOG_S3_NODE_ID_KEY = "log.s3.node.id"; | ||
| public static final int DEFAULT_LOG_S3_NODE_ID = 0; | ||
|
|
||
| /** | ||
| * @deprecated Use selector configuration instead. | ||
| */ | ||
| @Deprecated | ||
woshigaopp marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public static final String LOG_S3_ACTIVE_CONTROLLER_KEY = "log.s3.active.controller"; | ||
| @Deprecated | ||
| public static final boolean DEFAULT_LOG_S3_ACTIVE_CONTROLLER = true; | ||
|
|
||
| public static final String LOG_S3_PRIMARY_NODE_KEY = "log.s3.primary.node"; | ||
| public static final String LOG_S3_SELECTOR_PRIMARY_NODE_ID_KEY = "log.s3.selector.primary.node.id"; | ||
| public static final String LOG_S3_SELECTOR_TYPE_KEY = "log.s3.selector.type"; | ||
| public static final String LOG_S3_SELECTOR_PREFIX = "log.s3.selector."; | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.