22
33import ch .qos .logback .classic .Level ;
44import ch .qos .logback .classic .Logger ;
5- import com .devshawn .kafka .gitops .config .KafkaGitopsConfig ;
65import com .devshawn .kafka .gitops .config .KafkaGitopsConfigLoader ;
76import com .devshawn .kafka .gitops .config .ManagerConfig ;
87import com .devshawn .kafka .gitops .domain .confluent .ServiceAccount ;
@@ -61,19 +60,20 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {
6160 initializeLogger (managerConfig .isVerboseRequested ());
6261 this .managerConfig = managerConfig ;
6362 this .objectMapper = initializeObjectMapper ();
64- KafkaGitopsConfig config = KafkaGitopsConfigLoader .load ();
65- this .kafkaService = new KafkaService ( config );
63+ this . kafkaService = new KafkaService ( KafkaGitopsConfigLoader .load () );
64+ this .schemaRegistryService = new SchemaRegistryService ( SchemaRegistryConfigLoader . load () );
6665 this .parserService = parserService ;
6766 this .roleService = new RoleService ();
6867 this .confluentCloudService = new ConfluentCloudService (objectMapper );
69- this .planManager = new PlanManager (managerConfig , kafkaService , objectMapper );
70- this .applyManager = new ApplyManager (managerConfig , kafkaService );
68+ this .planManager = new PlanManager (managerConfig , kafkaService , schemaRegistryService , objectMapper );
69+ this .applyManager = new ApplyManager (managerConfig , kafkaService , schemaRegistryService );
7170 }
7271
7372 public DesiredStateFile getAndValidateStateFile () {
7473 DesiredStateFile desiredStateFile = parserService .parseStateFile ();
7574 validateTopics (desiredStateFile );
7675 validateCustomAcls (desiredStateFile );
76+ validateSchemas (desiredStateFile );
7777 this .describeAclEnabled = StateUtil .isDescribeTopicAclEnabled (desiredStateFile );
7878 return desiredStateFile ;
7979 }
@@ -92,6 +92,7 @@ private DesiredPlan generatePlan() {
9292 planManager .planAcls (desiredState , desiredPlan );
9393 }
9494 planManager .planTopics (desiredState , desiredPlan );
95+ planManager .planSchemas (desiredState , desiredPlan );
9596 return desiredPlan .build ();
9697 }
9798
@@ -107,6 +108,7 @@ public DesiredPlan apply() {
107108 if (!managerConfig .isSkipAclsDisabled ()) {
108109 applyManager .applyAcls (desiredPlan );
109110 }
111+ applyManager .applySchemas (desiredPlan );
110112
111113 return desiredPlan ;
112114 }
@@ -147,6 +149,7 @@ private DesiredState getDesiredState() {
147149 .addAllPrefixedTopicsToIgnore (getPrefixedTopicsToIgnore (desiredStateFile ));
148150
149151 generateTopicsState (desiredState , desiredStateFile );
152+ generateSchemasState (desiredState , desiredStateFile );
150153
151154 if (isConfluentCloudEnabled (desiredStateFile )) {
152155 generateConfluentCloudServiceAcls (desiredState , desiredStateFile );
@@ -171,6 +174,10 @@ private void generateTopicsState(DesiredState.Builder desiredState, DesiredState
171174 }
172175 }
173176
177+ private void generateSchemasState (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
178+ desiredState .putAllSchemas (desiredStateFile .getSchemas ());
179+ }
180+
174181 private void generateConfluentCloudServiceAcls (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
175182 List <ServiceAccount > serviceAccounts = confluentCloudService .getServiceAccounts ();
176183 desiredStateFile .getServices ().forEach ((name , service ) -> {
@@ -323,6 +330,47 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
323330 }
324331 }
325332
333+ private void validateSchemas (DesiredStateFile desiredStateFile ) {
334+ if (!desiredStateFile .getSchemas ().isEmpty ()) {
335+ SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader .load ();
336+ desiredStateFile .getSchemas ().forEach ((s , schemaDetails ) -> {
337+ if (!schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
338+ throw new ValidationException (String .format ("Schema type %s is currently not supported." , schemaDetails .getType ()));
339+ }
340+ if (!Files .exists (Paths .get (schemaRegistryConfig .getConfig ().get ("SCHEMA_DIRECTORY" ) + "/" + schemaDetails .getFile ()))) {
341+ throw new ValidationException (String .format ("Schema file %s not found in schema directory at %s" , schemaDetails .getFile (), schemaRegistryConfig .getConfig ().get ("SCHEMA_DIRECTORY" )));
342+ }
343+ if (schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
344+ AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider ();
345+ if (schemaDetails .getReferences ().isEmpty () && schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
346+ Optional <ParsedSchema > parsedSchema = avroSchemaProvider .parseSchema (schemaRegistryService .loadSchemaFromDisk (schemaDetails .getFile ()), Collections .emptyList ());
347+ if (!parsedSchema .isPresent ()) {
348+ throw new ValidationException (String .format ("Avro schema %s could not be parsed." , schemaDetails .getFile ()));
349+ }
350+ } else {
351+ List <SchemaReference > schemaReferences = new ArrayList <>();
352+ schemaDetails .getReferences ().forEach (referenceDetails -> {
353+ SchemaReference schemaReference = new SchemaReference (referenceDetails .getName (), referenceDetails .getSubject (), referenceDetails .getVersion ());
354+ schemaReferences .add (schemaReference );
355+ });
356+ // we need to pass a schema registry client as a config because the underlying code validates against the current state
357+ avroSchemaProvider .configure (Collections .singletonMap (SchemaProvider .SCHEMA_VERSION_FETCHER_CONFIG , schemaRegistryService .createSchemaRegistryClient ()));
358+ try {
359+ Optional <ParsedSchema > parsedSchema = avroSchemaProvider .parseSchema (schemaRegistryService .loadSchemaFromDisk (schemaDetails .getFile ()), schemaReferences );
360+ if (!parsedSchema .isPresent ()) {
361+ throw new ValidationException (String .format ("Avro schema %s could not be parsed." , schemaDetails .getFile ()));
362+ }
363+ } catch (IllegalStateException ex ) {
364+ throw new ValidationException (String .format ("Reference validation error: %s" , ex .getMessage ()));
365+ } catch (RuntimeException ex ) {
366+ throw new ValidationException (String .format ("Error thrown when attempting to validate schema with reference" , ex .getMessage ()));
367+ }
368+ }
369+ }
370+ });
371+ }
372+ }
373+
326374 private boolean isConfluentCloudEnabled (DesiredStateFile desiredStateFile ) {
327375 if (desiredStateFile .getSettings ().isPresent () && desiredStateFile .getSettings ().get ().getCcloud ().isPresent ()) {
328376 return desiredStateFile .getSettings ().get ().getCcloud ().get ().isEnabled ();
0 commit comments