kafka-topics.sh --list --bootstrap-server localhost:9092 | 토픽 목록 |
kafka-topics.sh --create --topic my-topic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092 | 토픽 생성 |
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092 | 토픽 설명 |
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092 | 토픽 삭제 |
kafka-topics.sh --alter --topic my-topic --partitions 5 --bootstrap-server localhost:9092 | 파티션 추가 |
kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 | 콘솔 프로듀서 |
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092 | 콘솔 컨슈머 |
kafka-console-consumer.sh --topic my-topic --group my-group --bootstrap-server localhost:9092 | 그룹으로 컨슈머 |
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 | 컨슈머 그룹 목록 |
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092 | 그룹 설명 |
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send message
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
// With callback
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent to: " + metadata.partition());
}
});
producer.close(); // Reliability
props.put("acks", "all"); // Wait for all replicas
props.put("retries", 3); // Retry on failure
props.put("enable.idempotence", true); // Exactly-once
// Performance
props.put("batch.size", 16384); // Batch size bytes
props.put("linger.ms", 5); // Wait before sending
props.put("buffer.memory", 33554432); // Buffer size
props.put("compression.type", "snappy"); // Compression Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
}
} // Offset management
props.put("auto.offset.reset", "earliest"); // earliest, latest
props.put("enable.auto.commit", false); // Manual commit
// Manual commit
consumer.commitSync();
consumer.commitAsync();
// Seek to offset
consumer.seek(partition, offset);
consumer.seekToBeginning(partitions);
consumer.seekToEnd(partitions);
// Assign specific partitions
consumer.assign(Arrays.asList(new TopicPartition("topic", 0))); Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Read from topic
KStream<String, String> source = builder.stream("input-topic");
// Transform
KStream<String, String> transformed = source
.filter((key, value) -> value.length() > 5)
.mapValues(value -> value.toUpperCase());
// Write to topic
transformed.to("output-topic");
// Start
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start(); // Group and count
KTable<String, Long> counts = source
.groupByKey()
.count();
// Windowed aggregation
KTable<Windowed<String>, Long> windowedCounts = source
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Join streams
KStream<String, String> joined = stream1.join(
stream2,
(v1, v2) -> v1 + "-" + v2,
JoinWindows.of(Duration.ofMinutes(5))
); version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1