导出wireshark的FLV RAW数据并进行分析

发布于:2025-08-30 ⋅ 阅读:(18) ⋅ 点赞:(0)

import struct
import json
import argparse
from collections import namedtuple
from typing import List, Tuple, Dict, Optional, Any

# 定义数据结构
FLVHeader = namedtuple('FLVHeader', ['signature', 'version', 'flags', 'header_size'])
FLVTagHeader = namedtuple('FLVTagHeader', [
    'prev_tag_size', 'tag_type', 'data_size', 'timestamp', 'timestamp_ext', 'stream_id'
])

MetadataHeader = namedtuple('MetadataHeader', [
    'AMF1_type', 'size', 'onMetadata', "AMF2_type", 'arr_size'
])

MetadataArray = namedtuple('MetadataArray', [
    'duration', 'width', 'height', 'videodatarate', 'framerate', 'videocodecid', 'audiodatarate', 
    'audiosamplerate', 'audiosamplesize', 'stereo', 'audiocodecid', 'encoder', 'filesize', "custom_fields"
])

AudioHeader = namedtuple('AudioHeader', [
    'format', 'rate', 'size', 'type', 'aac_packet_type', 'audio_object_type',
    'sampling_index', 'channel_config'
])
VideoHeader = namedtuple('VideoHeader', [
    'frame_type', 'codec_id', 'avc_packet_type', 'composition_time'
])
NALUnit = namedtuple('NALUnit', ['type', 'ref_idc', 'size', 'data'])

# 常量定义
AUDIO_FORMATS = {
    0: "Linear PCM, platform endian",
    1: "ADPCM",
    2: "MP3",
    3: "Linear PCM, little endian",
    4: "Nellymoser 16kHz mono",
    5: "Nellymoser 8kHz mono",
    6: "Nellymoser",
    7: "G.711 A-law",
    8: "G.711 mu-law",
    10: "AAC",
    11: "Speex",
    14: "MP3 8kHz",
    15: "Device-specific sound"
}

SAMPLE_RATES = {
    0: "5.5kHz",
    1: "11kHz",
    2: "22kHz",
    3: "44kHz"
}

FRAME_TYPES = {
    1: "Keyframe",
    2: "Inter frame",
    3: "Disposable inter frame",
    4: "Generated keyframe",
    5: "Video info/command frame"
}

CODEC_IDS = {
    1: "JPEG",
    2: "Sorenson H.263",
    3: "Screen video",
    4: "On2 VP6",
    5: "On2 VP6 with alpha",
    6: "Screen video v2",
    7: "AVC"
}

AVC_PACKET_TYPES = {
     0: "AVC sequence header",
     1: "AVC NALU",
     2: "AVC end of sequence"
}

NALU_TYPES = {
    1: "Coded slice of a non-IDR picture",
    5: "Coded slice of an IDR picture",
    6: "Supplemental enhancement information (SEI)",
    7: "Sequence parameter set",
    8: "Picture parameter set",
    9: "Access unit delimiter",
    10: "End of sequence",
    11: "End of stream",
    12: "Filler data",
    13: "Sequence parameter set extension",
    14: "Prefix NAL unit",
    15: "Subset sequence parameter set",
    19: "Coded slice of an auxiliary coded picture without partitioning",
    20: "Coded slice extension"
}

NALU_REF_IDC = {
    0: "Disposable",
    1: "Lowest",
    2: "Low",
    3: "High"
}

SCRIPT_DATA_VALUE = {
    0: "Number",
    1: "Boolean",
    2: "String",
    3: "Object",
    4: "MovieClip (reserved, not supported)",
    5: "Null",
    6: "Undefined",
    7: "Reference",
    8: "ECMA array",
    9: "Object end marker",
    10: "Strict array",
    11: "Date",
    12: "Long string"
}

class FLVProcessor:
    
    def __init__(self, input_file: str):
        self.input_file = input_file
        self.header: Optional[FLVHeader] = None
        self.tags: List[Tuple] = []
        self.raw_data: bytes = b''
    
    def read_file(self) -> None:
        #读取文件内容
        with open(self.input_file, 'rb') as f:
            self.raw_data = f.read()
    
    def _process_http_response(self, data: bytes) -> bytes:
        #处理HTTP响应,自动判断是否为chunk编码
        if not data.startswith(b'HTTP/1.1 200 OK'):
            return data
        
        # 查找HTTP头结束位置
        http_header_end = data.find(b'\r\n\r\n') + 4
        if http_header_end < 4:  # 没有找到完整的HTTP头
            return data
        
        # 检查是否为chunk编码
        headers = data[:http_header_end-4].split(b'\r\n')
        is_chunked = any(b'Transfer-Encoding: chunked' in h for h in headers)
        
        # 提取主体数据
        body_data = data[http_header_end:]
        
        # 如果是chunk编码,则解码
        if is_chunked:
            return self._decode_chunked_data(body_data)
        return body_data
    
    def _decode_chunked_data(self, data: bytes) -> bytes:
        #解码HTTP chunk传输编码的数据
        result = b''
        pos = 0
        while pos < len(data):
            # 查找块大小行结束位置
            chunk_size_end = data.find(b'\r\n', pos)
            if chunk_size_end == -1:
                break
            
            # 解析块大小(16进制)
            chunk_size_str = data[pos:chunk_size_end].decode('ascii').strip()
            try:
                chunk_size = int(chunk_size_str, 16)
            except ValueError:
                break
            
            # 0大小的块表示结束
            if chunk_size == 0:
                break
            
            # 移动到块数据开始位置
            chunk_start = chunk_size_end + 2
            chunk_end = chunk_start + chunk_size
            
            # 检查是否有足够的空间
            if chunk_end > len(data):
                break
            
            # 添加块数据到结果
            result += data[chunk_start:chunk_end]
            
            # 移动到下一个块开始位置(跳过CRLF)
            pos = chunk_end + 2
        
        return result
    
    def parse(self) -> None:
        #解析FLV文件
        if not self.raw_data:
            self.read_file()
        
        # 处理可能的HTTP响应
        flv_data = self._process_http_response(self.raw_data)
        
        # 验证FLV签名
        if not flv_data.startswith(b'FLV'):
            raise ValueError("不是有效的FLV文件, 签名不匹配")
        
        # 解析FLV头
        self.header = self._parse_flv_header(flv_data[:9])
        pos = 9
        
        # 解析标签
        while pos + 15 < len(flv_data):
            # 解析前一个标签大小
            prev_tag_size = struct.unpack('>I', flv_data[pos:pos+4])[0]
            pos += 4
            
            # 解析标签头
            tag_type = flv_data[pos]
            data_size = struct.unpack('>I', b'\x00' + flv_data[pos+1:pos+4])[0]
            timestamp = struct.unpack('>I', flv_data[pos+4:pos+7] + b'\x00')[0] >> 8
            timestamp_ext = flv_data[pos+7]
            stream_id = struct.unpack('>I', b'\x00' + flv_data[pos+8:pos+11])[0]
            
            tag_header = FLVTagHeader(
                prev_tag_size=prev_tag_size,
                tag_type=tag_type,
                data_size=data_size,
                timestamp=timestamp,
                timestamp_ext=timestamp_ext,
                stream_id=stream_id
            )
            
            pos += 11
            
            # 检查数据大小是否有效
            if pos + data_size > len(flv_data):
                print(f"警告: 数据大小({data_size})超出文件范围,终止解析")
                break
            
            # 读取标签数据
            tag_data = flv_data[pos:pos+data_size]
            pos += data_size
            
            # 根据标签类型解析
            if tag_type == 8:  # 音频
                audio_info = self._parse_audio_tag(tag_data)
                self.tags.append(('audio', tag_header, audio_info, tag_data))
            elif tag_type == 9:  # 视频
                video_info, nal_units, video_body = self._parse_video_tag(tag_data)
                self.tags.append(('video', tag_header, (video_info, nal_units, video_body), tag_data))
            elif tag_type == 18:  # 脚本数据/Metadata
                metadata_info, metadata_array = self._parse_flv_metadate(tag_data)
                self.tags.append(('metadata', tag_header, (metadata_info, metadata_array), tag_data))
                # self.tags.append(('metadata', tag_header, tag_data, tag_data))
            else:
                self.tags.append(('unknown', tag_header, tag_data, tag_data))
    
    def _parse_flv_header(self, header_data: bytes) -> FLVHeader:
        #解析FLV头
        signature = header_data[:3].decode('ascii')
        version = header_data[3]
        flags = header_data[4]
        header_size = struct.unpack('>I', header_data[5:9])[0]
        return FLVHeader(signature, version, flags, header_size)

    def _parse_flv_metadate(self , data: bytes) -> Tuple[Optional[MetadataHeader], Optional[MetadataArray]]:
        #解析Metadata
        if not data:
            return None, None
        pos = 0
        
        AMF1_type = data[pos]
        pos += 1

        # 字符串长度
        name_size = struct.unpack('>H', data[pos:pos+2])[0]
        pos += 2
        
        # 检查剩余数据是否足够
        if len(data) < pos + name_size + 1:
            return None, None

        # 提取字符串
        onMetadata = data[pos:pos+name_size].decode('utf-8')
        pos += name_size

        # AMF2类型
        AMF2_type = data[pos]
        pos += 1

        metadata_dict = {}

        if AMF2_type == 0x08:  # 0x08 ECMA数组
            arr_size = struct.unpack('>I', data[pos:pos+4])[0]
            pos += 4
            metadata_dict, pos = self._parse_amf_object(data, pos)

        elif AMF2_type == 0x03:  # 0x03 对象
            metadata_dict, pos = self._parse_amf_object(data, pos)
            arr_size = len(metadata_dict)

        elif AMF2_type == 0x0A:  # 0x0A 严格数组
            arr_size = struct.unpack('>I', data[pos:pos+4])[0]
            pos += 4
            metadata_dict, pos = self._parse_amf_strict_array(data, pos, arr_size)

        elif AMF2_type == 0x05:  # 0x05 Null
            pass  # metadata_dict保持为空

        elif AMF2_type == 0x0B:  # 0x0B Date
            timestamp = struct.unpack('>d', data[pos:pos+8])[0]
            pos += 8
            timezone = struct.unpack('>h', data[pos:pos+2])[0]
            pos += 2
            metadata_dict["date"] = f"Date({timestamp}, tz={timezone})"

        elif AMF2_type == 0x0C:  # 0x0C 长字符串
            str_size = struct.unpack('>I', data[pos:pos+4])[0]
            pos += 4
            metadata_dict["long_string"] = data[pos:pos+str_size].decode('utf-8', errors='replace')
            pos += str_size

        elif AMF2_type in [0x06, 0x0D]:  # 0x06 或 0x0D
            metadata_dict["unsupported"] = None

        else:  # 其他未实现类型(如0x04 MovieClip)
            raise ValueError(f"不支持的AMF2类型: {hex(AMF2_type)}")

        header = MetadataHeader(
            AMF1_type=AMF1_type,
            size=name_size,
            onMetadata=onMetadata,
            AMF2_type=AMF2_type,
            arr_size=arr_size
        )

        standard_data = {}
        custom_fields = {}
        standard_fields = {
            'duration': 'duration',
            'width': 'width',
            'height': 'height',
            'videodatarate': 'videodatarate',
            'framerate': 'framerate',
            'videocodecid': 'videocodecid',
            'audiodatarate': 'audiodatarate',
            'audiosamplerate': 'audiosamplerate',
            'audiosamplesize': 'audiosamplesize',
            'stereo': 'stereo',
            'audiocodecid': 'audiocodecid',
            'encoder': 'encoder',
            'filesize': 'filesize'
        }

        for key_in_data, mapped_key in standard_fields.items():
            if key_in_data in metadata_dict:
                standard_data[mapped_key] = metadata_dict[key_in_data]

        for key, value in metadata_dict.items():
            if key not in standard_fields:
                custom_fields[key] = value

        metadata = MetadataArray(
            duration=standard_data.get('duration'),
            width=standard_data.get('width'),
            height=standard_data.get('height'),
            videodatarate=standard_data.get('videodatarate'),
            framerate=standard_data.get('framerate'),
            videocodecid=standard_data.get('videocodecid'),
            audiodatarate=standard_data.get('audiodatarate'),
            audiosamplerate=standard_data.get('audiosamplerate'),
            audiosamplesize=standard_data.get('audiosamplesize'),
            stereo=standard_data.get('stereo'),
            audiocodecid=standard_data.get('audiocodecid'),
            encoder=standard_data.get('encoder'),
            filesize=standard_data.get('filesize'),
            custom_fields = custom_fields
        )
        return header, metadata


    def _parse_amf_value(self, data: bytes, pos: int) -> Tuple[Any, int]:
        #单值
        value_type = data[pos]
        pos += 1

        if value_type == 0x00:  # Double
            value = struct.unpack('>d', data[pos:pos+8])[0]
            pos += 8
        elif value_type == 0x01:  # Boolean
            value = bool(data[pos])
            pos += 1
        elif value_type == 0x02:  # String
            str_size = struct.unpack('>H', data[pos:pos+2])[0]
            pos += 2
            value = data[pos:pos+str_size].decode('latin-1')
            pos += str_size
        elif value_type == 0x03:  # Object
            value, pos = self._parse_amf_object(data, pos)
        elif value_type == AMF_NULL:  # Null
            value = None
        elif value_type == 0x06:  # Undefined
            value = "undefined"
        elif value_type == 0x08:  # ECMA Array
            arr_size = struct.unpack('>I', data[pos:pos+4])[0]
            pos += 4
            value, pos = self._parse_amf_object(data, pos)
        elif value_type == 0x0A:  # Strict Array
            arr_size = struct.unpack('>I', data[pos:pos+4])[0]
            pos += 4
            value, pos = self._parse_amf_strict_array(data, pos, arr_size)
        elif value_type == 0X0B:  # Date
            timestamp = struct.unpack('>d', data[pos:pos+8])[0]
            pos += 8
            timezone = struct.unpack('>h', data[pos:pos+2])[0]
            pos += 2
            value = f"Date({timestamp}, tz={timezone})"
        elif value_type == 0X0C:  # Long String
            str_size = struct.unpack('>I', data[pos:pos+4])[0]
            pos += 4
            value = data[pos:pos+str_size].decode('latin-1')
            pos += str_size
        else:
            raise ValueError(f"Unknown AMF type: {hex(value_type)}")

        return value, pos

    def _parse_amf_object(self, data: bytes, pos: int) -> Tuple[Dict[str, Any], int]:
        #object
        obj = {}
        while pos < len(data):
            # 检查对象结束标记(0x000009)
            if data[pos:pos+3] == b'\x00\x00\x09':
                pos += 3
                break

            # 解析键名
            key_size = struct.unpack('>H', data[pos:pos+2])[0]
            pos += 2
            key = data[pos:pos+key_size].decode('latin-1')
            pos += key_size

            # 解析值
            value, pos = self._parse_amf_value(data, pos)
            obj[key] = value

        return obj, pos

    def _parse_amf_strict_array(self, data: bytes, pos: int, size: int) -> Tuple[Dict[str, Any], int]:
        #strict array
        arr = {}
        for i in range(size):
            value, pos = self._parse_amf_value(data, pos)
            arr[str(i)] = value
        return arr, pos


    def _parse_audio_tag(self, data: bytes) -> Optional[AudioHeader]:
        #解析音频标签
        if not data:
            return None
        
        header_byte = data[0]
        format = (header_byte & 0xF0) >> 4
        rate = (header_byte & 0x0C) >> 2
        size = (header_byte & 0x02) >> 1
        audio_type = header_byte & 0x01
        
        aac_packet_type = None
        audio_object_type = None
        sampling_index = None
        channel_config = None
        
        if format == 10:  # AAC
            if len(data) > 1:
                aac_packet_type = data[1]
                if aac_packet_type == 0 and len(data) > 2:  # AAC sequence header
                    audio_specific_config = data[2:]
                    if audio_specific_config:
                        audio_object_type = (audio_specific_config[0] & 0xF8) >> 3
                        sampling_index = ((audio_specific_config[0] & 0x07) << 1) | ((audio_specific_config[1] & 0x80) >> 7)
                        channel_config = (audio_specific_config[1] & 0x78) >> 3
        
        return AudioHeader(
            format=format,
            rate=rate,
            size=size,
            type=audio_type,
            aac_packet_type=aac_packet_type,
            audio_object_type=audio_object_type,
            sampling_index=sampling_index,
            channel_config=channel_config
        )
    
    def _parse_video_tag(self, data: bytes) -> Tuple[Optional[VideoHeader], List[NALUnit], Optional[bytes]]:
        #解析视频标签
        if not data:
            return None, [], None
            
        header_byte = data[0]
        frame_type = (header_byte & 0xF0) >> 4
        codec_id = header_byte & 0x0F
        
        avc_packet_type = None
        composition_time = None
        nal_units = []
        video_body = None
        
        if codec_id == 7:  # AVC
            if len(data) > 4:
                avc_packet_type = data[1]
                composition_time = struct.unpack('>I', b'\x00' + data[2:5])[0]
                
                if avc_packet_type == 0:  # AVC sequence header
                    video_body = data[5:]
                elif avc_packet_type == 1:  # AVC NALU
                    pos = 5
                    while pos + 4 < len(data):
                        nalu_size = struct.unpack('>I', data[pos:pos+4])[0]
                        pos += 4
                        
                        if pos + nalu_size > len(data):
                            break
                        
                        nalu_data = data[pos:pos+nalu_size]
                        pos += nalu_size
                        
                        if nalu_data:
                            nalu_header = nalu_data[0]
                            nalu_type = nalu_header & 0x1F
                            nalu_ref_idc = (nalu_header & 0x60) >> 5
                            
                            nal_units.append(NALUnit(
                                type=nalu_type,
                                ref_idc=nalu_ref_idc,
                                size=nalu_size,
                                data=nalu_data
                            ))
                    video_body = data[5:pos]
                else:
                    video_body = data[5:]
        else:
            video_body = data[1:]
        
        return VideoHeader(
            frame_type=frame_type,
            codec_id=codec_id,
            avc_packet_type=avc_packet_type,
            composition_time=composition_time
        ), nal_units, video_body
    
    def save_as_json(self, output_path: str) -> None:
        #将解析结果保存为JSON
        if not self.header:
            raise ValueError("未解析FLV文件, 请先调用parse()方法")
        
        result = {
            'file': self.input_file,
            'header': {
                'signature': self.header.signature,
                'version': self.header.version,
                'flags': self.header.flags,
                'header_size': self.header.header_size
            },
            'statistics': {
                'total_tags': len(self.tags),
                'audio_tags': sum(1 for tag in self.tags if tag[0] == 'audio'),
                'video_tags': sum(1 for tag in self.tags if tag[0] == 'video'),
                'metadata_tags': sum(1 for tag in self.tags if tag[0] == 'metadata'),
                'unknown_tags': sum(1 for tag in self.tags if tag[0] == 'unknown')
            },
            'tags': []
        }
        
        for tag in self.tags:
            tag_type, tag_header, tag_data, raw_data = tag
            tag_dict = {
                'prev_tag_size': tag_header.prev_tag_size,
                'type': tag_type,
                'data_size': tag_header.data_size,
                'timestamp': tag_header.timestamp,
                'timestamp_extended': tag_header.timestamp_ext,
                'stream_id': tag_header.stream_id,
                'details': {}
            }
            
            if tag_type == 'audio':
                audio_header = tag_data
                tag_dict['details'] = {
                    'format': audio_header.format,
                    'format_description': AUDIO_FORMATS.get(audio_header.format, "Unknown"),
                    'sample_rate': audio_header.rate,
                    'sample_rate_description': SAMPLE_RATES.get(audio_header.rate, "Unknown"),
                    'sample_size': audio_header.size,
                    'channels': audio_header.type,
                    'aac_packet_type': audio_header.aac_packet_type,
                    'audio_object_type': audio_header.audio_object_type,
                    'sampling_index': audio_header.sampling_index,
                    'channel_config': audio_header.channel_config
                }
            elif tag_type == 'video':
                video_header, nal_units, video_body = tag_data
                tag_dict['details'] = {
                    'frame_type': video_header.frame_type,
                    'frame_type_description': FRAME_TYPES.get(video_header.frame_type, "Unknown"),
                    'codec_id': video_header.codec_id,
                    'codec_description': CODEC_IDS.get(video_header.codec_id, "Unknown"),
                    'avc_packet_type': video_header.avc_packet_type,
                    'avc_packet_type_description': AVC_PACKET_TYPES.get(video_header.avc_packet_type, "Unknown"),
                    'composition_time': video_header.composition_time,
                    'nal_units_count': len(nal_units),
                    'video_body_size': len(video_body) if video_body else 0,
                    'nal_units': [{
                        'type': unit.type,
                        'type_description': NALU_TYPES.get(unit.type, "Unknown"),
                        'ref_idc': unit.ref_idc,
                        'ref_idc_description': NALU_REF_IDC.get(unit.ref_idc, "Unknown"),
                        'size': unit.size
                    } for unit in nal_units]
                }
            elif tag_type == 'metadata':
                metadata_info, metadata_array = tag_data
                tag_dict['details'] = {
                    "AMF1_type": metadata_info.AMF1_type,
                    "AMF1_type_description": SCRIPT_DATA_VALUE[metadata_info.AMF1_type],
                    "size":metadata_info.size,
                    SCRIPT_DATA_VALUE[metadata_info.AMF1_type]: metadata_info.onMetadata,
                    "AMF2_type": metadata_info.AMF2_type,
                    "AMF2_type_description": SCRIPT_DATA_VALUE[metadata_info.AMF2_type],
                    SCRIPT_DATA_VALUE[metadata_info.AMF2_type] + " size": metadata_info.arr_size,
                    "Metadata_array_data": [{
                        "duration": metadata_array.duration,
                        "width": metadata_array.width,
                        "height": metadata_array.height,
                        "videodatarate": metadata_array.videodatarate,
                        "framerate": metadata_array.framerate,
                        "videocodecid": metadata_array.videocodecid,
                        "audiodatarate": metadata_array.audiodatarate,
                        "audiosamplerate": metadata_array.audiosamplerate,
                        "audiosamplesize": metadata_array.audiosamplesize,
                        "stereo": metadata_array.stereo,
                        "audiocodecid": metadata_array.audiocodecid,
                        "encoder": metadata_array.encoder,
                        "filesize": metadata_array.filesize,
                        **metadata_array.custom_fields
                    }]
                }
                # try:
                #     tag_dict['details'] = {'metadata': tag_data.decode('utf-8', errors='replace')}
                # except UnicodeDecodeError:
                #     tag_dict['details'] = {'metadata': '二进制数据,无法解码为文本'}
            
            result['tags'].append(tag_dict)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(result, f, indent=2, ensure_ascii=False)
        print(f"已保存JSON解析结果到: {output_path}")

def main():
    parser = argparse.ArgumentParser(
        description='FLV文件解析与保存工具 - 自动处理HTTP和chunk编码 兼容AMF解析',
        formatter_class=argparse.RawTextHelpFormatter
    )
    parser.add_argument('input', help='输入FLV文件路径')
    parser.add_argument('--save-json', metavar='OUTPUT', help='将解析结果保存为JSON')
    
    args = parser.parse_args()
    
    try:
        processor = FLVProcessor(args.input)
        processor.parse()
    
        if args.save_json:
            processor.save_as_json(args.save_json)
        else:
            print("未指定输出操作, 使用python process.py input.raw --save-json out.json")
    
    except Exception as e:
        print(f"处理文件时出错: {str(e)}")

if __name__ == "__main__":
    main()

运行:

python3 flv.py cap.raw --save-json flv-ana.json


网站公告

今日签到

点亮在社区的每一天
去签到