Building a GitOps-Driven Real-Time Configuration Delivery System with Flux CD, etcd, and Flutter


Managing feature flags has long been a pain point in our mobile app development workflow. Our initial approach involved hard-coding them, leading to release cycles that stretched for weeks. We then evolved to a backend API-controlled system, but this introduced new dependencies and latency. Our most recent solution used Kubernetes ConfigMaps, updated via a CI/CD pipeline and read by the application backend. While this decoupled configuration from code, the entire process remained a black box for our mobile developers, fraught with delays and uncertainty. A single flag change request, from Git commit to its effect on a test application, could take anywhere from a few minutes to half an hour, with its status being completely opaque.

Our objective was clear: provide the mobile team with a near real-time, state-transparent feature flag management platform. The core requirements were twofold: the change process must adhere to GitOps principles, with the Git repository as the single source of truth for all configurations, and there had to be an intuitive interface that could reflect the precise state of online flags in real-time. This interface needed to serve not just developers but also product managers, offering at-a-glance clarity.

Initial Concept and Technology Selection

The core of this system lies in reconciling two seemingly contradictory requirements: “real-time” and “GitOps.” The GitOps workflow is inherently asynchronous, whereas a user interface demands synchronous, real-time feedback.

  1. GitOps Engine: We were already heavy users of Flux CD to manage the declarative state of our entire Kubernetes cluster. Therefore, continuing to use Flux CD to manage a Custom Resource for feature flags was the most natural choice. This ensured that the single source of truth for our configuration remained in Git.

  2. State Store & Notification Hub: This was the linchpin of the architecture. Directly polling the Kubernetes API Server for the status of ConfigMaps or Custom Resources was not feasible; it would place immense pressure on the API Server and fail to guarantee real-time updates. We needed a component that could actively “push” changes.

    • Redis Pub/Sub? A possible candidate, but it lacks robust persistence and strong consistency guarantees. We couldn’t risk losing state change events if a subscription service restarted.
    • etcd? This was very appealing. As the cornerstone of Kubernetes itself, etcd provides everything we need: strong consistency, transactional reads/writes, and an extremely powerful Watch API. The Watch mechanism allows any client to subscribe to all changes under a specific key prefix in real-time. We decided to set up a dedicated etcd cluster for our platform, decoupling it from the Kubernetes cluster’s own etcd to prevent any potential performance impact.
  3. Frontend Interface: The team wanted a cross-platform client that could run on iOS, Android, and even the web. Flutter was the best choice here, offering single-codebase, multi-platform deployment.

  4. The Connecting Bridge: How could we push changes from etcd to the Flutter client in real time? HTTP Polling was immediately ruled out. WebSockets were an option, but gRPC Streaming offered superior performance, stronger type constraints, and a more efficient binary protocol. We could build a Go service that acts as a gRPC server. This service would use etcd’s Watch API to monitor state on one end and push any changes to the Flutter client via a gRPC stream on the other.

The final architecture diagram is as follows:

graph TD
    subgraph Git Repository
        A[Developer Commits FeatureFlag YAML]
    end

    subgraph Kubernetes Cluster
        B[Flux CD] --> C{Reconciles CR}
        C --> D[FeatureFlag CR]
        E[Custom Controller] -- Watches --> D
        E -- Writes State --> F[Application etcd Cluster]
    end

    subgraph Backend Service
        G[gRPC Streaming Service] -- Watches --> F
    end

    subgraph Client
        H[Flutter App] -- gRPC Stream --> G
    end

    A --> B

Phased Implementation: From CRD to Real-Time UI

Phase 1: Defining the CRD and Syncing State to etcd

First, we needed to define a FeatureFlag CustomResourceDefinition (CRD) to make it a first-class citizen in our GitOps workflow.

crd/featureflag.crd.yaml:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: featureflags.runtime.mycorp.com
spec:
  group: runtime.mycorp.com
  names:
    kind: FeatureFlag
    listKind: FeatureFlagList
    plural: featureflags
    singular: featureflag
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                appName:
                  type: string
                  description: "The name of the application this flag belongs to."
                flags:
                  type: object
                  description: "A map of feature flags."
                  additionalProperties:
                    type: boolean
              required:
                - appName
                - flags

Next, we wrote a Kubernetes Controller whose sole responsibility is to watch for changes to FeatureFlag resources and write the contents of spec.flags into our separate etcd cluster. We designed the key format to be /featureflags/{appName}/{flagName}.

internal/controller/featureflag_controller.go:

package controller

import (
	"context"
	"fmt"
	"path"
	"time"

	"go.etcd.io/etcd/client/v3"
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	runtimev1alpha1 "mycorp.com/featureflag-operator/api/v1alpha1"
)

// FeatureFlagReconciler reconciles a FeatureFlag object
type FeatureFlagReconciler struct {
	client.Client
	Scheme   *runtime.Scheme
	EtcdClient *clientv3.Client
}

const (
	etcdRequestTimeout = 5 * time.Second
	etcdKeyPrefix      = "/featureflags"
)

//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=runtime.mycorp.com,resources=featureflags/finalizers,verbs=update

func (r *FeatureFlagReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling FeatureFlag")

	var featureFlag runtimev1alpha1.FeatureFlag
	if err := r.Get(ctx, req.NamespacedName, &featureFlag); err != nil {
		// We ignore 'not found' errors, which occur when a resource is deleted after the reconcile request is queued.
		// In a production environment, a finalizer should be used to ensure keys in etcd are cleaned up upon deletion.
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	appName := featureFlag.Spec.AppName
	logger.Info("Syncing flags for application", "appName", appName)

	// Use an etcd transaction to ensure atomic updates.
	// This is critical in a real project to prevent inconsistent states from partial updates.
	ops := []clientv3.Op{}
	for flagName, flagValue := range featureFlag.Spec.Flags {
		key := path.Join(etcdKeyPrefix, appName, flagName)
		value := fmt.Sprintf("%t", flagValue)
		
		// clientv3.OpPut creates a KV write operation.
		ops = append(ops, clientv3.OpPut(key, value))
	}

	if len(ops) > 0 {
		etcdCtx, cancel := context.WithTimeout(ctx, etcdRequestTimeout)
		defer cancel()

		// Execute all Put operations in a single transaction.
		if _, err := r.EtcdClient.Txn(etcdCtx).Then(ops...).Commit(); err != nil {
			logger.Error(err, "Failed to commit transaction to etcd")
			// Return an error to trigger an automatic retry by controller-runtime.
			return ctrl.Result{}, err
		}
	}
	
	logger.Info("Successfully synced flags to etcd", "appName", appName, "flagCount", len(featureFlag.Spec.Flags))

	return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *FeatureFlagReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&runtimev1alpha1.FeatureFlag{}).
		Complete(r)
}

The core logic of this controller is in the Reconcile function. It fetches the FeatureFlag resource, then builds an etcd transaction to atomically write all flag states. Using a transaction is key for production-grade code, as it avoids intermediate states when updating multiple flags.

Now, a developer simply needs to commit a FeatureFlag YAML file to the Git repository. Flux CD will automatically apply it to the cluster, and our controller will listen for the change and update etcd.

apps/my-mobile-app/featureflags.yaml:

apiVersion: runtime.mycorp.com/v1alpha1
kind: FeatureFlag
metadata:
  name: my-mobile-app-flags
  namespace: mobile-apps
spec:
  appName: "my-mobile-app"
  flags:
    newOnboardingFlow: true
    enableChatV2: false
    useNewPaymentGateway: true

Phase 2: Building the gRPC Real-Time Streaming Service

This is the bridge connecting the backend state to the frontend UI. We start by defining the service interface with Protobuf.

proto/flags.proto:

syntax = "proto3";

package flags;

option go_package = "mycorp.com/flag-streamer/gen/flags";

service FlagStreamer {
  // WatchAppFlags establishes a streaming connection to receive feature flag
  // changes for a specific app. When a client initiates a request, the server
  // first pushes a full snapshot of all flags. Subsequently, any changes are
  // pushed in real-time.
  rpc WatchAppFlags(WatchRequest) returns (stream FlagUpdate);
}

message WatchRequest {
  string app_name = 1;
}

message FlagUpdate {
  // EventType: INITIAL, UPDATE, DELETE
  enum EventType {
    UNKNOWN = 0;
    INITIAL = 1; // The initial full dataset.
    UPDATE = 2;  // An incremental update.
    DELETE = 3;  // Deletion (not yet implemented in the controller).
  }

  EventType event_type = 1;
  // Key-value map of flags.
  // For INITIAL, this contains all flags.
  // For UPDATE, it contains only the changed flags.
  map<string, bool> flags = 2;
}

The Go server implementation is the heart of this project. It needs to handle gRPC connections and launch a goroutine for each connection to watch etcd.

cmd/streamer/main.go (Partial Implementation):

package main

import (
	// ... imports
	"go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	pb "mycorp.com/flag-streamer/gen/flags"
)

type server struct {
	pb.UnimplementedFlagStreamerServer
	etcdClient *clientv3.Client
}

const etcdKeyPrefix = "/featureflags"

func (s *server) WatchAppFlags(req *pb.WatchRequest, stream pb.FlagStreamer_WatchAppFlagsServer) error {
	appName := req.GetAppName()
	if appName == "" {
		return status.Error(codes.InvalidArgument, "appName is required")
	}

	ctx := stream.Context()
	logger := log.FromContext(ctx).WithValues("appName", appName)
	logger.Info("New client connected for watching flags")

	// 1. Send the initial full dataset.
	// This is crucial for clients on cold start, allowing them to get the current state immediately.
	keyPrefix := path.Join(etcdKeyPrefix, appName)
	resp, err := s.etcdClient.Get(ctx, keyPrefix, clientv3.WithPrefix())
	if err != nil {
		logger.Error(err, "Failed to get initial flags from etcd")
		return status.Error(codes.Internal, "failed to fetch initial state")
	}

	initialFlags := make(map[string]bool)
	for _, kv := range resp.Kvs {
		flagName := path.Base(string(kv.Key))
		val, _ := strconv.ParseBool(string(kv.Value))
		initialFlags[flagName] = val
	}

	if err := stream.Send(&pb.FlagUpdate{
		EventType: pb.FlagUpdate_INITIAL,
		Flags:     initialFlags,
	}); err != nil {
		logger.Error(err, "Failed to send initial flags")
		return status.Error(codes.Internal, "failed to send initial state")
	}

	// 2. Start a watcher for subsequent changes.
	// We start watching from the etcd revision of the Get response + 1 to ensure no changes are missed.
	watchChan := s.etcdClient.Watch(ctx, keyPrefix, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))

	logger.Info("Starting to watch for updates", "fromRevision", resp.Header.Revision+1)

	for {
		select {
		case <-ctx.Done():
			// Client disconnected.
			logger.Info("Client disconnected")
			return ctx.Err()
		case watchResp, ok := <-watchChan:
			if !ok {
				logger.Info("Watch channel closed")
				return status.Error(codes.Internal, "etcd watch channel closed")
			}
			if err := watchResp.Err(); err != nil {
				logger.Error(err, "Etcd watch error")
				return status.Error(codes.Internal, "etcd watch failed")
			}

			// Batch received events to reduce gRPC message frequency.
			updatedFlags := make(map[string]bool)
			for _, event := range watchResp.Events {
				// We only care about PUT events, as DELETE logic is not yet implemented.
				if event.Type == clientv3.EventTypePut {
					flagName := path.Base(string(event.Kv.Key))
					val, _ := strconv.ParseBool(string(event.Kv.Value))
					updatedFlags[flagName] = val
				}
			}

			if len(updatedFlags) > 0 {
				if err := stream.Send(&pb.FlagUpdate{
					EventType: pb.FlagUpdate_UPDATE,
					Flags:     updatedFlags,
				}); err != nil {
					logger.Error(err, "Failed to send update to client")
					// A send failure usually means the client has disconnected.
					// The loop will exit on the next select via ctx.Done().
					return err
				}
				logger.Info("Sent update to client", "updateCount", len(updatedFlags))
			}
		}
	}
}

// main function to setup and start gRPC server...

The robustness of this code is evident in several details:

  • Get-then-Watch Pattern: Upon connection, the client first Gets the full dataset, then Watches for changes starting from the revision of the Get operation. This is a standard pattern that guarantees seamless data delivery.
  • Context Management: The entire Watch loop is governed by stream.Context(). As soon as the Flutter client disconnects, ctx.Done() is triggered, allowing the goroutine to exit gracefully and release its resources.
  • Error Handling: Errors from both etcd and gRPC are handled, returning appropriate gRPC status codes to the client.

Phase 3: Flutter Client Integration

On the Flutter side, we use the grpc and protobuf packages to communicate with our service. First, we generate the Dart code from our .proto file using the protoc compiler.

lib/services/flag_service.dart:

import 'package:grpc/grpc.dart';
import 'package:rxdart/rxdart.dart';

import '../generated/flags.pbgrpc.dart';

// A simple singleton service to manage feature flag state.
class FlagService {
  FlagService._();
  static final instance = FlagService._();

  late FlagStreamerClient _client;
  // A BehaviorSubject caches the latest value, allowing new subscribers to
  // immediately receive the current state.
  final _flagsSubject = BehaviorSubject<Map<String, bool>>.seeded({});

  // Expose a read-only Stream to the outside world.
  Stream<Map<String, bool>> get flags$ => _flagsSubject.stream;
  Map<String, bool> get currentFlags => _flagssubject.value;

  ClientChannel? _channel;
  ResponseStream<FlagUpdate>? _stream;

  void connect(String appName) {
    // Avoid duplicate connections.
    if (_channel != null) {
      disconnect();
    }

    print('Connecting to FlagStreamer service for app: $appName...');
    _channel = ClientChannel(
      'your-grpc-service-host', // Replace with your gRPC service address
      port: 50051,
      options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
    );
    _client = FlagStreamerClient(_channel!);

    _stream = _client.watchAppFlags(WatchRequest()..appName = appName);

    _stream!.listen(
      (FlagUpdate update) {
        print('Received flag update: ${update.eventType}');
        final current = Map<String, bool>.from(_flagsSubject.value);

        switch (update.eventType) {
          case FlagUpdate_EventType.INITIAL:
            // Received the full dataset, replace the current state.
            _flagsSubject.add(update.flags);
            break;
          case FlagUpdate_EventType.UPDATE:
            // Received an incremental update, merge it into the current state.
            current.addAll(update.flags);
            _flagsSubject.add(current);
            break;
          default:
            // Ignore other event types.
            break;
        }
      },
      onError: (e) {
        print('Flag stream error: $e');
        // In a production environment, implement reconnection logic here.
        disconnect();
      },
      onDone: () {
        print('Flag stream closed by server.');
        disconnect();
      },
    );
  }

  void disconnect() {
    print('Disconnecting from FlagStreamer service...');
    _stream?.cancel();
    _channel?.shutdown();
    _stream = null;
    _channel = null;
  }
}

In the UI layer, a StreamBuilder can be used to consume this Stream, allowing for elegant and automatic UI refreshes.

lib/widgets/flag_display_widget.dart:

import 'package:flutter/material.dart';
import '../services/flag_service.dart';

class FlagDisplayWidget extends StatelessWidget {
  const FlagDisplayWidget({Key? key}) : super(key: key);

  
  Widget build(BuildContext context) {
    return StreamBuilder<Map<String, bool>>(
      stream: FlagService.instance.flags$,
      builder: (context, snapshot) {
        if (!snapshot.hasData || snapshot.data!.isEmpty) {
          return const Center(child: CircularProgressIndicator());
        }

        final flags = snapshot.data!;
        final flagItems = flags.entries.toList()
          ..sort((a, b) => a.key.compareTo(b.key));

        return ListView.builder(
          itemCount: flagItems.length,
          itemBuilder: (context, index) {
            final item = flagItems[index];
            return ListTile(
              title: Text(item.key),
              trailing: Switch(
                value: item.value,
                onChanged: null, // UI is read-only
                activeColor: Colors.green,
              ),
            );
          },
        );
      },
    );
  }
}

Once a developer calls FlagService.instance.connect('my-mobile-app') in main.dart or another application entry point, the entire data flow is established. Now, any state change committed via Git to etcd will be reflected in the Flutter UI within milliseconds, without any user interaction.

Limitations and Future Iterations

While this architecture achieves its core goals, there are areas that require refinement for a production environment.

First, the gRPC service is currently a single point of failure. For high availability, multiple instances should be deployed and fronted by a gRPC load balancer (e.g., a Layer 4 network load balancer or a service mesh) to distribute client connections.

Second, the security posture is minimal. The gRPC communication should be secured with TLS (Transport Layer Security), and an authentication/authorization mechanism (like JWT-based tokens) is needed to ensure that only legitimate clients can subscribe to a specific application’s flag information.

Finally, as the number of feature flags grows, the initial full data synchronization payload could become large. Future optimizations could include a more fine-grained subscription model, where clients subscribe only to the flags they care about, or implementing a more efficient compression algorithm on the server. etcd itself has limits on value sizes; very large configurations might require splitting values or storing them elsewhere, with etcd only holding an index.


  TOC