要求:
输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。
命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。
服务端程序:
- 从kafka消费者接收扫描任务信息
- 通过调用masscan启动探测任务,获取进度和结果信息,进度写入Redis,结果信息写入Kafka。
- 要求对启动任务、kafka、整理流程进行封装。
- 要求启动2个server端,通过命令行程序下发2个不同网段,可以均匀的分配到2个server上面执行完成。
测试要求:
- 启动两个server端程序。
- 通过命令行程序下发两个任务,IP不一样。
- 看server端程序日志,是否均匀的扫描了两个任务。
前置准备:
安装docker
思路:
1. 系统架构设计
采用生产者-消费者模式:
- 命令行客户端作为生产者,将扫描任务发布到Kafka
- 两个服务端实例作为消费者,从Kafka获取任务并执行
2. 关键组件设计
任务表示:
- 使用JSON格式表示扫描任务,包含:
- IP范围(单个IP或CIDR格式)
- 端口范围
- 扫描带宽限制
- 任务状态
- 进度信息
- 使用JSON格式表示扫描任务,包含:
Kafka设计:
- 创建一个主题(如
scan-tasks
) - 使用单个分区确保任务顺序性(或根据需求设计分区策略)
- 考虑使用消费者组实现两个服务端的负载均衡
- 创建一个主题(如
Redis设计:
- 存储任务进度信息
- 使用Hash结构存储每个任务的进度百分比
- 设置适当的TTL防止数据无限增长
服务端负载均衡:
- 两个服务端加入同一个Kafka消费者组
- Kafka会自动将任务均匀分配给两个消费者
3. 执行流程
客户端流程:
- 解析命令行参数(IP范围、端口、带宽)
- 验证输入格式
- 创建Kafka生产者
- 将任务发布到Kafka主题
服务端流程:
- 初始化Kafka消费者(加入消费者组)
- 初始化Redis连接
- 循环消费任务:
a. 从Kafka获取任务
b. 更新Redis中任务状态为"running"
c. 调用masscan执行扫描:- 构造masscan命令行参数
- 启动masscan进程
- 监控进程输出和退出状态
d. 实时解析masscan输出,更新Redis中的进度
e. 扫描完成后: - 更新Redis中任务状态为"completed"
- 将完整结果发布到另一个Kafka主题(如
scan-result
)
4. 关键技术点
Masscan集成:
- 使用
exec.Command
启动masscan进程 - 实时解析masscan的标准输出和错误输出
- 根据输出计算扫描进度
- 使用
错误处理:
- 处理无效IP格式
- 处理masscan执行失败
- 处理Kafka/Redis连接问题
日志记录:
- 记录服务端操作日志
- 记录任务执行状态变化
- 记录错误信息
5. 测试验证思路
- 启动两个服务端实例
- 使用客户端提交两个不同网段的任务
- 观察:
- 两个服务端的日志输出
- 任务是否被均匀分配(一个服务端处理一个任务)
- 扫描进度是否正确更新
- 最终结果是否正确输出
6. 扩展考虑
任务优先级:
- 可以在任务中添加优先级字段
- 服务端根据优先级处理任务
任务超时:
- 添加任务超时机制
- 超时后重新分配任务
结果存储:
- 可以考虑将结果存入数据库而不仅是Kafka
水平扩展:
- 设计支持更多服务端实例的扩展方案
这个设计实现了基本的分布式扫描任务调度系统,核心是利用Kafka的消息队列特性实现任务分发,通过消费者组机制实现负载均衡,使用Redis作为共享状态存储。
实现:
项目结构:

kafka:
consumer
package kafka
import (
"context"
"errors"
"fmt"
"github.com/IBM/sarama"
"log"
"sync"
)
type MessageHandler func([]byte) error
type SaramaConsumer struct {
client sarama.ConsumerGroup
handlers map[string]MessageHandler
ready chan bool
ctx context.Context
cancel context.CancelFunc
consuming sync.WaitGroup
memberId string
groupId string
}
func NewKafkaConsumer(brokers []string, groupId string, topic []string) (*SaramaConsumer, error) {
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0 // 使用适当的 Kafka 版本
config.Consumer.Offse