import struct
import json
import argparse
from collections import namedtuple
from typing import List, Tuple, Dict, Optional, Any
# 定义数据结构
FLVHeader = namedtuple('FLVHeader', ['signature', 'version', 'flags', 'header_size'])
FLVTagHeader = namedtuple('FLVTagHeader', [
'prev_tag_size', 'tag_type', 'data_size', 'timestamp', 'timestamp_ext', 'stream_id'
])
MetadataHeader = namedtuple('MetadataHeader', [
'AMF1_type', 'size', 'onMetadata', "AMF2_type", 'arr_size'
])
MetadataArray = namedtuple('MetadataArray', [
'duration', 'width', 'height', 'videodatarate', 'framerate', 'videocodecid', 'audiodatarate',
'audiosamplerate', 'audiosamplesize', 'stereo', 'audiocodecid', 'encoder', 'filesize', "custom_fields"
])
AudioHeader = namedtuple('AudioHeader', [
'format', 'rate', 'size', 'type', 'aac_packet_type', 'audio_object_type',
'sampling_index', 'channel_config'
])
VideoHeader = namedtuple('VideoHeader', [
'frame_type', 'codec_id', 'avc_packet_type', 'composition_time'
])
NALUnit = namedtuple('NALUnit', ['type', 'ref_idc', 'size', 'data'])
# 常量定义
AUDIO_FORMATS = {
0: "Linear PCM, platform endian",
1: "ADPCM",
2: "MP3",
3: "Linear PCM, little endian",
4: "Nellymoser 16kHz mono",
5: "Nellymoser 8kHz mono",
6: "Nellymoser",
7: "G.711 A-law",
8: "G.711 mu-law",
10: "AAC",
11: "Speex",
14: "MP3 8kHz",
15: "Device-specific sound"
}
SAMPLE_RATES = {
0: "5.5kHz",
1: "11kHz",
2: "22kHz",
3: "44kHz"
}
FRAME_TYPES = {
1: "Keyframe",
2: "Inter frame",
3: "Disposable inter frame",
4: "Generated keyframe",
5: "Video info/command frame"
}
CODEC_IDS = {
1: "JPEG",
2: "Sorenson H.263",
3: "Screen video",
4: "On2 VP6",
5: "On2 VP6 with alpha",
6: "Screen video v2",
7: "AVC"
}
AVC_PACKET_TYPES = {
0: "AVC sequence header",
1: "AVC NALU",
2: "AVC end of sequence"
}
NALU_TYPES = {
1: "Coded slice of a non-IDR picture",
5: "Coded slice of an IDR picture",
6: "Supplemental enhancement information (SEI)",
7: "Sequence parameter set",
8: "Picture parameter set",
9: "Access unit delimiter",
10: "End of sequence",
11: "End of stream",
12: "Filler data",
13: "Sequence parameter set extension",
14: "Prefix NAL unit",
15: "Subset sequence parameter set",
19: "Coded slice of an auxiliary coded picture without partitioning",
20: "Coded slice extension"
}
NALU_REF_IDC = {
0: "Disposable",
1: "Lowest",
2: "Low",
3: "High"
}
SCRIPT_DATA_VALUE = {
0: "Number",
1: "Boolean",
2: "String",
3: "Object",
4: "MovieClip (reserved, not supported)",
5: "Null",
6: "Undefined",
7: "Reference",
8: "ECMA array",
9: "Object end marker",
10: "Strict array",
11: "Date",
12: "Long string"
}
class FLVProcessor:
def __init__(self, input_file: str):
self.input_file = input_file
self.header: Optional[FLVHeader] = None
self.tags: List[Tuple] = []
self.raw_data: bytes = b''
def read_file(self) -> None:
#读取文件内容
with open(self.input_file, 'rb') as f:
self.raw_data = f.read()
def _process_http_response(self, data: bytes) -> bytes:
#处理HTTP响应,自动判断是否为chunk编码
if not data.startswith(b'HTTP/1.1 200 OK'):
return data
# 查找HTTP头结束位置
http_header_end = data.find(b'\r\n\r\n') + 4
if http_header_end < 4: # 没有找到完整的HTTP头
return data
# 检查是否为chunk编码
headers = data[:http_header_end-4].split(b'\r\n')
is_chunked = any(b'Transfer-Encoding: chunked' in h for h in headers)
# 提取主体数据
body_data = data[http_header_end:]
# 如果是chunk编码,则解码
if is_chunked:
return self._decode_chunked_data(body_data)
return body_data
def _decode_chunked_data(self, data: bytes) -> bytes:
#解码HTTP chunk传输编码的数据
result = b''
pos = 0
while pos < len(data):
# 查找块大小行结束位置
chunk_size_end = data.find(b'\r\n', pos)
if chunk_size_end == -1:
break
# 解析块大小(16进制)
chunk_size_str = data[pos:chunk_size_end].decode('ascii').strip()
try:
chunk_size = int(chunk_size_str, 16)
except ValueError:
break
# 0大小的块表示结束
if chunk_size == 0:
break
# 移动到块数据开始位置
chunk_start = chunk_size_end + 2
chunk_end = chunk_start + chunk_size
# 检查是否有足够的空间
if chunk_end > len(data):
break
# 添加块数据到结果
result += data[chunk_start:chunk_end]
# 移动到下一个块开始位置(跳过CRLF)
pos = chunk_end + 2
return result
def parse(self) -> None:
#解析FLV文件
if not self.raw_data:
self.read_file()
# 处理可能的HTTP响应
flv_data = self._process_http_response(self.raw_data)
# 验证FLV签名
if not flv_data.startswith(b'FLV'):
raise ValueError("不是有效的FLV文件, 签名不匹配")
# 解析FLV头
self.header = self._parse_flv_header(flv_data[:9])
pos = 9
# 解析标签
while pos + 15 < len(flv_data):
# 解析前一个标签大小
prev_tag_size = struct.unpack('>I', flv_data[pos:pos+4])[0]
pos += 4
# 解析标签头
tag_type = flv_data[pos]
data_size = struct.unpack('>I', b'\x00' + flv_data[pos+1:pos+4])[0]
timestamp = struct.unpack('>I', flv_data[pos+4:pos+7] + b'\x00')[0] >> 8
timestamp_ext = flv_data[pos+7]
stream_id = struct.unpack('>I', b'\x00' + flv_data[pos+8:pos+11])[0]
tag_header = FLVTagHeader(
prev_tag_size=prev_tag_size,
tag_type=tag_type,
data_size=data_size,
timestamp=timestamp,
timestamp_ext=timestamp_ext,
stream_id=stream_id
)
pos += 11
# 检查数据大小是否有效
if pos + data_size > len(flv_data):
print(f"警告: 数据大小({data_size})超出文件范围,终止解析")
break
# 读取标签数据
tag_data = flv_data[pos:pos+data_size]
pos += data_size
# 根据标签类型解析
if tag_type == 8: # 音频
audio_info = self._parse_audio_tag(tag_data)
self.tags.append(('audio', tag_header, audio_info, tag_data))
elif tag_type == 9: # 视频
video_info, nal_units, video_body = self._parse_video_tag(tag_data)
self.tags.append(('video', tag_header, (video_info, nal_units, video_body), tag_data))
elif tag_type == 18: # 脚本数据/Metadata
metadata_info, metadata_array = self._parse_flv_metadate(tag_data)
self.tags.append(('metadata', tag_header, (metadata_info, metadata_array), tag_data))
# self.tags.append(('metadata', tag_header, tag_data, tag_data))
else:
self.tags.append(('unknown', tag_header, tag_data, tag_data))
def _parse_flv_header(self, header_data: bytes) -> FLVHeader:
#解析FLV头
signature = header_data[:3].decode('ascii')
version = header_data[3]
flags = header_data[4]
header_size = struct.unpack('>I', header_data[5:9])[0]
return FLVHeader(signature, version, flags, header_size)
def _parse_flv_metadate(self , data: bytes) -> Tuple[Optional[MetadataHeader], Optional[MetadataArray]]:
#解析Metadata
if not data:
return None, None
pos = 0
AMF1_type = data[pos]
pos += 1
# 字符串长度
name_size = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
# 检查剩余数据是否足够
if len(data) < pos + name_size + 1:
return None, None
# 提取字符串
onMetadata = data[pos:pos+name_size].decode('utf-8')
pos += name_size
# AMF2类型
AMF2_type = data[pos]
pos += 1
metadata_dict = {}
if AMF2_type == 0x08: # 0x08 ECMA数组
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
metadata_dict, pos = self._parse_amf_object(data, pos)
elif AMF2_type == 0x03: # 0x03 对象
metadata_dict, pos = self._parse_amf_object(data, pos)
arr_size = len(metadata_dict)
elif AMF2_type == 0x0A: # 0x0A 严格数组
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
metadata_dict, pos = self._parse_amf_strict_array(data, pos, arr_size)
elif AMF2_type == 0x05: # 0x05 Null
pass # metadata_dict保持为空
elif AMF2_type == 0x0B: # 0x0B Date
timestamp = struct.unpack('>d', data[pos:pos+8])[0]
pos += 8
timezone = struct.unpack('>h', data[pos:pos+2])[0]
pos += 2
metadata_dict["date"] = f"Date({timestamp}, tz={timezone})"
elif AMF2_type == 0x0C: # 0x0C 长字符串
str_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
metadata_dict["long_string"] = data[pos:pos+str_size].decode('utf-8', errors='replace')
pos += str_size
elif AMF2_type in [0x06, 0x0D]: # 0x06 或 0x0D
metadata_dict["unsupported"] = None
else: # 其他未实现类型(如0x04 MovieClip)
raise ValueError(f"不支持的AMF2类型: {hex(AMF2_type)}")
header = MetadataHeader(
AMF1_type=AMF1_type,
size=name_size,
onMetadata=onMetadata,
AMF2_type=AMF2_type,
arr_size=arr_size
)
standard_data = {}
custom_fields = {}
standard_fields = {
'duration': 'duration',
'width': 'width',
'height': 'height',
'videodatarate': 'videodatarate',
'framerate': 'framerate',
'videocodecid': 'videocodecid',
'audiodatarate': 'audiodatarate',
'audiosamplerate': 'audiosamplerate',
'audiosamplesize': 'audiosamplesize',
'stereo': 'stereo',
'audiocodecid': 'audiocodecid',
'encoder': 'encoder',
'filesize': 'filesize'
}
for key_in_data, mapped_key in standard_fields.items():
if key_in_data in metadata_dict:
standard_data[mapped_key] = metadata_dict[key_in_data]
for key, value in metadata_dict.items():
if key not in standard_fields:
custom_fields[key] = value
metadata = MetadataArray(
duration=standard_data.get('duration'),
width=standard_data.get('width'),
height=standard_data.get('height'),
videodatarate=standard_data.get('videodatarate'),
framerate=standard_data.get('framerate'),
videocodecid=standard_data.get('videocodecid'),
audiodatarate=standard_data.get('audiodatarate'),
audiosamplerate=standard_data.get('audiosamplerate'),
audiosamplesize=standard_data.get('audiosamplesize'),
stereo=standard_data.get('stereo'),
audiocodecid=standard_data.get('audiocodecid'),
encoder=standard_data.get('encoder'),
filesize=standard_data.get('filesize'),
custom_fields = custom_fields
)
return header, metadata
def _parse_amf_value(self, data: bytes, pos: int) -> Tuple[Any, int]:
#单值
value_type = data[pos]
pos += 1
if value_type == 0x00: # Double
value = struct.unpack('>d', data[pos:pos+8])[0]
pos += 8
elif value_type == 0x01: # Boolean
value = bool(data[pos])
pos += 1
elif value_type == 0x02: # String
str_size = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
value = data[pos:pos+str_size].decode('latin-1')
pos += str_size
elif value_type == 0x03: # Object
value, pos = self._parse_amf_object(data, pos)
elif value_type == AMF_NULL: # Null
value = None
elif value_type == 0x06: # Undefined
value = "undefined"
elif value_type == 0x08: # ECMA Array
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
value, pos = self._parse_amf_object(data, pos)
elif value_type == 0x0A: # Strict Array
arr_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
value, pos = self._parse_amf_strict_array(data, pos, arr_size)
elif value_type == 0X0B: # Date
timestamp = struct.unpack('>d', data[pos:pos+8])[0]
pos += 8
timezone = struct.unpack('>h', data[pos:pos+2])[0]
pos += 2
value = f"Date({timestamp}, tz={timezone})"
elif value_type == 0X0C: # Long String
str_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
value = data[pos:pos+str_size].decode('latin-1')
pos += str_size
else:
raise ValueError(f"Unknown AMF type: {hex(value_type)}")
return value, pos
def _parse_amf_object(self, data: bytes, pos: int) -> Tuple[Dict[str, Any], int]:
#object
obj = {}
while pos < len(data):
# 检查对象结束标记(0x000009)
if data[pos:pos+3] == b'\x00\x00\x09':
pos += 3
break
# 解析键名
key_size = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
key = data[pos:pos+key_size].decode('latin-1')
pos += key_size
# 解析值
value, pos = self._parse_amf_value(data, pos)
obj[key] = value
return obj, pos
def _parse_amf_strict_array(self, data: bytes, pos: int, size: int) -> Tuple[Dict[str, Any], int]:
#strict array
arr = {}
for i in range(size):
value, pos = self._parse_amf_value(data, pos)
arr[str(i)] = value
return arr, pos
def _parse_audio_tag(self, data: bytes) -> Optional[AudioHeader]:
#解析音频标签
if not data:
return None
header_byte = data[0]
format = (header_byte & 0xF0) >> 4
rate = (header_byte & 0x0C) >> 2
size = (header_byte & 0x02) >> 1
audio_type = header_byte & 0x01
aac_packet_type = None
audio_object_type = None
sampling_index = None
channel_config = None
if format == 10: # AAC
if len(data) > 1:
aac_packet_type = data[1]
if aac_packet_type == 0 and len(data) > 2: # AAC sequence header
audio_specific_config = data[2:]
if audio_specific_config:
audio_object_type = (audio_specific_config[0] & 0xF8) >> 3
sampling_index = ((audio_specific_config[0] & 0x07) << 1) | ((audio_specific_config[1] & 0x80) >> 7)
channel_config = (audio_specific_config[1] & 0x78) >> 3
return AudioHeader(
format=format,
rate=rate,
size=size,
type=audio_type,
aac_packet_type=aac_packet_type,
audio_object_type=audio_object_type,
sampling_index=sampling_index,
channel_config=channel_config
)
def _parse_video_tag(self, data: bytes) -> Tuple[Optional[VideoHeader], List[NALUnit], Optional[bytes]]:
#解析视频标签
if not data:
return None, [], None
header_byte = data[0]
frame_type = (header_byte & 0xF0) >> 4
codec_id = header_byte & 0x0F
avc_packet_type = None
composition_time = None
nal_units = []
video_body = None
if codec_id == 7: # AVC
if len(data) > 4:
avc_packet_type = data[1]
composition_time = struct.unpack('>I', b'\x00' + data[2:5])[0]
if avc_packet_type == 0: # AVC sequence header
video_body = data[5:]
elif avc_packet_type == 1: # AVC NALU
pos = 5
while pos + 4 < len(data):
nalu_size = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
if pos + nalu_size > len(data):
break
nalu_data = data[pos:pos+nalu_size]
pos += nalu_size
if nalu_data:
nalu_header = nalu_data[0]
nalu_type = nalu_header & 0x1F
nalu_ref_idc = (nalu_header & 0x60) >> 5
nal_units.append(NALUnit(
type=nalu_type,
ref_idc=nalu_ref_idc,
size=nalu_size,
data=nalu_data
))
video_body = data[5:pos]
else:
video_body = data[5:]
else:
video_body = data[1:]
return VideoHeader(
frame_type=frame_type,
codec_id=codec_id,
avc_packet_type=avc_packet_type,
composition_time=composition_time
), nal_units, video_body
def save_as_json(self, output_path: str) -> None:
#将解析结果保存为JSON
if not self.header:
raise ValueError("未解析FLV文件, 请先调用parse()方法")
result = {
'file': self.input_file,
'header': {
'signature': self.header.signature,
'version': self.header.version,
'flags': self.header.flags,
'header_size': self.header.header_size
},
'statistics': {
'total_tags': len(self.tags),
'audio_tags': sum(1 for tag in self.tags if tag[0] == 'audio'),
'video_tags': sum(1 for tag in self.tags if tag[0] == 'video'),
'metadata_tags': sum(1 for tag in self.tags if tag[0] == 'metadata'),
'unknown_tags': sum(1 for tag in self.tags if tag[0] == 'unknown')
},
'tags': []
}
for tag in self.tags:
tag_type, tag_header, tag_data, raw_data = tag
tag_dict = {
'prev_tag_size': tag_header.prev_tag_size,
'type': tag_type,
'data_size': tag_header.data_size,
'timestamp': tag_header.timestamp,
'timestamp_extended': tag_header.timestamp_ext,
'stream_id': tag_header.stream_id,
'details': {}
}
if tag_type == 'audio':
audio_header = tag_data
tag_dict['details'] = {
'format': audio_header.format,
'format_description': AUDIO_FORMATS.get(audio_header.format, "Unknown"),
'sample_rate': audio_header.rate,
'sample_rate_description': SAMPLE_RATES.get(audio_header.rate, "Unknown"),
'sample_size': audio_header.size,
'channels': audio_header.type,
'aac_packet_type': audio_header.aac_packet_type,
'audio_object_type': audio_header.audio_object_type,
'sampling_index': audio_header.sampling_index,
'channel_config': audio_header.channel_config
}
elif tag_type == 'video':
video_header, nal_units, video_body = tag_data
tag_dict['details'] = {
'frame_type': video_header.frame_type,
'frame_type_description': FRAME_TYPES.get(video_header.frame_type, "Unknown"),
'codec_id': video_header.codec_id,
'codec_description': CODEC_IDS.get(video_header.codec_id, "Unknown"),
'avc_packet_type': video_header.avc_packet_type,
'avc_packet_type_description': AVC_PACKET_TYPES.get(video_header.avc_packet_type, "Unknown"),
'composition_time': video_header.composition_time,
'nal_units_count': len(nal_units),
'video_body_size': len(video_body) if video_body else 0,
'nal_units': [{
'type': unit.type,
'type_description': NALU_TYPES.get(unit.type, "Unknown"),
'ref_idc': unit.ref_idc,
'ref_idc_description': NALU_REF_IDC.get(unit.ref_idc, "Unknown"),
'size': unit.size
} for unit in nal_units]
}
elif tag_type == 'metadata':
metadata_info, metadata_array = tag_data
tag_dict['details'] = {
"AMF1_type": metadata_info.AMF1_type,
"AMF1_type_description": SCRIPT_DATA_VALUE[metadata_info.AMF1_type],
"size":metadata_info.size,
SCRIPT_DATA_VALUE[metadata_info.AMF1_type]: metadata_info.onMetadata,
"AMF2_type": metadata_info.AMF2_type,
"AMF2_type_description": SCRIPT_DATA_VALUE[metadata_info.AMF2_type],
SCRIPT_DATA_VALUE[metadata_info.AMF2_type] + " size": metadata_info.arr_size,
"Metadata_array_data": [{
"duration": metadata_array.duration,
"width": metadata_array.width,
"height": metadata_array.height,
"videodatarate": metadata_array.videodatarate,
"framerate": metadata_array.framerate,
"videocodecid": metadata_array.videocodecid,
"audiodatarate": metadata_array.audiodatarate,
"audiosamplerate": metadata_array.audiosamplerate,
"audiosamplesize": metadata_array.audiosamplesize,
"stereo": metadata_array.stereo,
"audiocodecid": metadata_array.audiocodecid,
"encoder": metadata_array.encoder,
"filesize": metadata_array.filesize,
**metadata_array.custom_fields
}]
}
# try:
# tag_dict['details'] = {'metadata': tag_data.decode('utf-8', errors='replace')}
# except UnicodeDecodeError:
# tag_dict['details'] = {'metadata': '二进制数据,无法解码为文本'}
result['tags'].append(tag_dict)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"已保存JSON解析结果到: {output_path}")
def main():
parser = argparse.ArgumentParser(
description='FLV文件解析与保存工具 - 自动处理HTTP和chunk编码 兼容AMF解析',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('input', help='输入FLV文件路径')
parser.add_argument('--save-json', metavar='OUTPUT', help='将解析结果保存为JSON')
args = parser.parse_args()
try:
processor = FLVProcessor(args.input)
processor.parse()
if args.save_json:
processor.save_as_json(args.save_json)
else:
print("未指定输出操作, 使用python process.py input.raw --save-json out.json")
except Exception as e:
print(f"处理文件时出错: {str(e)}")
if __name__ == "__main__":
main()
运行:
python3 flv.py cap.raw --save-json flv-ana.json