Ryu 源码阅读

SimpleMonitor13,SimpleSwitchRest13 两个类都继承自 SimpleSwitch13,分别扩展了流量监控和北向 RESTful API 功能,这篇以实验+源码剖析的方式简单总结下。

我的新书《LangChain编程从入门到实践》 已经开售!推荐正在学习AI应用开发的朋友购买阅读!
LangChain编程从入门到实践

Switching Hub

  • 传统交换机
    • 学习连接到传统交换机的主机的mac地址,并把其存在mac地址表中
    • 对于已经记录下来的mac地址,若是收到送往该mac地址的数据包时,就往对应的端口进行转发
    • 对于mac地址表中没有的数据包,则进行flooding
  • OpenFlow 交换机实现传统交换机功能
    • 对接收到的数据包向指定的端口转发
    • 把接收到的数据包发送给控制器(Packet-In)
    • 把从控制器接收到的数据包转发到指定的端口(Packet-Out)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# simple_switch_13.py  实现一个带MAC地址学习功能的二层交换机
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import ether_types


class SimpleSwitch13(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

def __init__(self, *args, **kwargs):
super(SimpleSwitch13, self).__init__(*args, **kwargs)
# MAC地址表的定义
self.mac_to_port = {}
# 控制器向OpenFlow交换机发送Features Request消息,请求OpenFlow交换机上传自己的详细参数。
# OpenFlow交换机收到请求后,向控制器发送Features Reply消息,详细汇报自身参数,
# 包括支持的buffer、流表数以及Actions等
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
# ev.msg是用来存对应事件的OpenFlow消息类别实体,这里指的是OFPSwitchFeatures
# datapath是和控制器通信的实体单元
datapath = ev.msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# install table-miss flow entry
#
# We specify NO BUFFER to max_len of the output action due to
# OVS bug. At this moment, if we specify a lesser number, e.g.,
# 128, OVS will send Packet-In with invalid buffer_id and
# truncated packet data. In that case, we cannot output packets
# correctly. The bug has been fixed in OVS v2.1.0.
# OpenFlow1.3版本为处理table miss事件专门引入的条目。并规定每一个flow table必须要支持table-miss flow entry去处理table miss情况。
# table-miss flow entry具备最低的优先级(0);必须至少能够支持使用CONTROLLER保留端口发送包,使用Clear-Actions指令丢包;
# table-miss flow entry和其他flow entry具有相同的特性:默认不存在,控制器可以随时添加或丢弃该条目,也可以到期;如果使用CONTROLLER保留端口发生数据包,Packet-In发送原因必须标明table-miss。

match = parser.OFPMatch()
# OFPActionOutput() :使用一个packet_out 消息去指定你想从交换机的哪个端口发送出数据包。在该应用中,按照标准指定通过CONTROLLER保留端口发生,所以选择了OFPP_CONTROLLER端口。第二个参数OFPCML_NO_BUFFER,指明:消息中必须包含完整的包,而不会被缓冲。
actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,
ofproto.OFPCML_NO_BUFFER)]
# 下发Table-miss Flow Entry优先级为0的流表,即如果报文都没有匹配的话,则匹配该报文,并将其发送给控制器
self.add_flow(datapath, 0, match, actions)

# add_flow方法用来发送Flow Mod消息
def add_flow(self, datapath, priority, match, actions, buffer_id=None):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# Instruction是1.0版本中Actions的拓展,Instruction主要负责将流表转发到其他Table,或者进行其他转发操作,# OFPIT_APPLY_ACTIONS: 立即应用actions操作到交换机
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
actions)]
if buffer_id:
mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
priority=priority, match=match,
instructions=inst)
else:
mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
match=match, instructions=inst)
datapath.send_msg(mod)

# 处理Packet-in数据包,交换机状态为正常状态
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def _packet_in_handler(self, ev):
# If you hit this you might want to increase
# the "miss_send_length" of your switch
if ev.msg.msg_len < ev.msg.total_len:
self.logger.debug("packet truncated: only %s of %s bytes",
ev.msg.msg_len, ev.msg.total_len)
msg = ev.msg
datapath = msg.datapath
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
# match 用来存储数据包的Meta元数据
in_port = msg.match['in_port']
# data 接受数据包本身的信息
pkt = packet.Packet(msg.data)
eth = pkt.get_protocols(ethernet.ethernet)[0]

if eth.ethertype == ether_types.ETH_TYPE_LLDP:
# ignore lldp packet
return
dst = eth.dst
src = eth.src

# 更新mac地址表,每台交换机独立的dpid对应一个表
dpid = format(datapath.id, "d").zfill(16)
self.mac_to_port.setdefault(dpid, {})

self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)

# learn a mac address to avoid FLOOD next time.
self.mac_to_port[dpid][src] = in_port

if dst in self.mac_to_port[dpid]:
out_port = self.mac_to_port[dpid][dst]
else:
out_port = ofproto.OFPP_FLOOD

actions = [parser.OFPActionOutput(out_port)]

# install a flow to avoid packet_in next time
if out_port != ofproto.OFPP_FLOOD:
match = parser.OFPMatch(in_port=in_port, eth_dst=dst, eth_src=src)
# verify if we have a valid buffer_id, if yes avoid to send both
# flow_mod & packet_out
if msg.buffer_id != ofproto.OFP_NO_BUFFER:
self.add_flow(datapath, 1, match, actions, msg.buffer_id)
return
else:
self.add_flow(datapath, 1, match, actions)
data = None
if msg.buffer_id == ofproto.OFP_NO_BUFFER:
data = msg.data

out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,
in_port=in_port, actions=actions, data=data)
datapath.send_msg(out)

实验测试

mininet端执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ sudo mn --topo single,3 --mac --switch ovsk,protocols=OpenFlow13 --controller remote
*** Creating network
*** Adding controller
Unable to contact the remote controller at 127.0.0.1:6653
Unable to contact the remote controller at 127.0.0.1:6633
Setting remote controller to 127.0.0.1:6653
*** Adding hosts:
h1 h2 h3
*** Adding switches:
s1
*** Adding links:
(h1, s1) (h2, s1) (h3, s1)
*** Configuring hosts
h1 h2 h3
*** Starting controller
c0
*** Starting 1 switches
s1 ...
*** Starting CLI:

ovs端执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ sudo ovs-vsctl show
0f735673-a50c-4059-95bf-6ee3fd50c39c
Bridge "s1"
Controller "tcp:127.0.0.1:6653"
Controller "ptcp:6654"
fail_mode: secure
Port "s1"
Interface "s1"
type: internal
Port "s1-eth2"
Interface "s1-eth2"
Port "s1-eth1"
Interface "s1-eth1"
Port "s1-eth3"
Interface "s1-eth3"
ovs_version: "2.9.5"

因为此时的交换机中没有任何可匹配的转发策略,在mininet端执行pingall,结果全丢包

1
2
3
4
5
6
mininet> pingall
*** Ping: testing ping reachability
h1 -> X X
h2 -> X X
h3 -> X X
*** Results: 100% dropped (0/6 received)
再开一个终端,开启ryu

执行ryu-manager --verbose ryu.app.simple_switch_13

  • 在ovs端查看流表,Table-miss Flow Entry 已加入OVS

    1
    2
    3
    $ sudo ovs-ofctl -O OpenFlow13 dump-flows s1
    cookie=0x0, duration=28.824s, table=0, n_packets=0, n_bytes=0, priority=0 actions=CONTROLLER:65535

  • mininet端

    1
    2
    3
    4
    5
    6
    7
    8
    mininet> h1 ping -c1 h2
    PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
    64 bytes from 10.0.0.2: icmp_seq=1 ttl=64 time=5.18 ms

    --- 10.0.0.2 ping statistics ---
    1 packets transmitted, 1 received, 0% packet loss, time 0ms
    rtt min/avg/max/mdev = 5.183/5.183/5.183/0.000 ms

  • ovs端再次查看OVS流表

    1
    2
    3
    4
    $ sudo ovs-ofctl -O OpenFlow13 dump-flows s1
    cookie=0x0, duration=190.004s, table=0, n_packets=2, n_bytes=140, priority=1,in_port="s1-eth2",dl_dst=00:00:00:00:00:01 actions=output:"s1-eth1"
    cookie=0x0, duration=190.003s, table=0, n_packets=1, n_bytes=42, priority=1,in_port="s1-eth1",dl_dst=00:00:00:00:00:02 actions=output:"s1-eth2"
    cookie=0x0, duration=648.460s, table=0, n_packets=6, n_bytes=392, priority=0 actions=CONTROLLER:65535
  • ryu端查看

    1
    2
    3
    4
    5
    6
    EVENT ofp_event->SimpleSwitch13 EventOFPPacketIn
    packet in 1 00:00:00:00:00:01 ff:ff:ff:ff:ff:ff 1
    EVENT ofp_event->SimpleSwitch13 EventOFPPacketIn
    packet in 1 00:00:00:00:00:02 00:00:00:00:00:01 2
    EVENT ofp_event->SimpleSwitch13 EventOFPPacketIn
    packet in 1 00:00:00:00:00:01 00:00:00:00:00:02 1

Traffic Monitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

def __init__(self, *args, **kwargs):
super(SimpleMonitor13, self).__init__(*args, **kwargs)
# 增添datapaths字典,存储交换机id
self.datapaths = {}
# 引入hub.spawn()函数启动一个新线程,输入为一个新的方法_monitor
self.monitor_thread = hub.spawn(self._monitor)
# 创建一个EventOFPStateChange监听事件,监听MAIN_DISPATCHER,DEAD_DISPATCHER两种情况。
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
# 监听如果为MAIN_DISPATCHER,并且datapath.id不在datapath列表中,则证明是新加入的交换机
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
# 如果为DEAD_DISPATCHER,并且datapath.id在datapath列表中,则证明是掉线的交换机
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]
# _monitor方法,循环不断向datapath列表中的交换机发送Flow状态请求,和Port状态请求
def _monitor(self):
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(10)

def _request_stats(self, datapath):
self.logger.debug('send stats request: %016x', datapath.id)
ofproto = datapath.ofproto
parser = datapath.ofproto_parser

req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)

req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
datapath.send_msg(req)
# 刚刚发送了请求,现在监听其回复
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
body = ev.msg.body

self.logger.info('datapath '
'in-port eth-dst '
'out-port packets bytes')
self.logger.info('---------------- '
'-------- ----------------- '
'-------- -------- --------')
# 其中sorted ... lambda语法,指元组的排列顺序按照先in_port再eth_dst
for stat in sorted([flow for flow in body if flow.priority == 1],
key=lambda flow: (flow.match['in_port'],
flow.match['eth_dst'])):
self.logger.info('%016x %8x %17s %8x %8d %8d',
ev.msg.datapath.id,
stat.match['in_port'], stat.match['eth_dst'],
stat.instructions[0].actions[0].port,
stat.packet_count, stat.byte_count)

@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
body = ev.msg.body

self.logger.info('datapath port '
'rx-pkts rx-bytes rx-error '
'tx-pkts tx-bytes tx-error')
self.logger.info('---------------- -------- '
'-------- -------- -------- '
'-------- -------- --------')
for stat in sorted(body, key=attrgetter('port_no')):
self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
ev.msg.datapath.id, stat.port_no,
stat.rx_packets, stat.rx_bytes, stat.rx_errors,
stat.tx_packets, stat.tx_bytes, stat.tx_errors)

实验测试

  • mininet端
    执行sudo mn --topo single,3 --mac --switch ovsk,protocols=OpenFlow13 --controller remote
  • ryu端
    执行ryu-manager --verbose ryu.app.simple_monitor_13
  • mininet端
    执行h1 ping -c1 h2,可观察到实时的流量检测

REST Linkage

RYU提供了一个类似WSGI的web服务器功能。借助这个功能,我们可以创建一个REST API,基于创建的REST API,可以快速的将RYU系统与其他系统或者是浏览器相连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.app.wsgi import ControllerBase
from ryu.app.wsgi import Response
from ryu.app.wsgi import route
from ryu.app.wsgi import WSGIApplication
from ryu.lib import dpid as dpid_lib

simple_switch_instance_name = 'simple_switch_api_app'
url = '/simpleswitch/mactable/{dpid}'

# 继承SimpleSwitch13的功能,即具备父类交换机的基本功能。
# 注册WSGI服务
# 配置mac_to_port
class SimpleSwitchRest13(simple_switch_13.SimpleSwitch13):

_CONTEXTS = {'wsgi': WSGIApplication}

def __init__(self, *args, **kwargs):
super(SimpleSwitchRest13, self).__init__(*args, **kwargs)
self.switches = {}
# 通过上一步设置的_CONTEXTS成员变量,可以通过kwargs进行实例化一个WSGIApplication。
# 同时使用register方法注册该服务到controller类上。
wsgi = kwargs['wsgi']
wsgi.register(SimpleSwitchController,
{simple_switch_instance_name: self})

# 重写父类的switch_features_handler函数,存储datapath到switches,初始化MAC 地址表
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
super(SimpleSwitchRest13, self).switch_features_handler(ev)
datapath = ev.msg.datapath
self.switches[datapath.id] = datapath
self.mac_to_port.setdefault(datapath.id, {})
# 该方法将MAC地址和端口注册到指定的交换机。该方法主要被REST API的PUT方法所调用
def set_mac_to_port(self, dpid, entry):
mac_table = self.mac_to_port.setdefault(dpid, {})
datapath = self.switches.get(dpid)

entry_port = entry['port']
entry_mac = entry['mac']

if datapath is not None:
parser = datapath.ofproto_parser
if entry_port not in mac_table.values():

for mac, port in mac_table.items():

# from known device to new device
actions = [parser.OFPActionOutput(entry_port)]
match = parser.OFPMatch(in_port=port, eth_dst=entry_mac)
self.add_flow(datapath, 1, match, actions)

# from new device to known device
actions = [parser.OFPActionOutput(port)]
match = parser.OFPMatch(in_port=entry_port, eth_dst=mac)
self.add_flow(datapath, 1, match, actions)

mac_table.update({entry_mac: entry_port})
return mac_table


class SimpleSwitchController(ControllerBase):

def __init__(self, req, link, data, **config):
super(SimpleSwitchController, self).__init__(req, link, data, **config)
self.simple_switch_app = data[simple_switch_instance_name]
# 借助route装饰器关联方法和URL。参数如下:
# 第一个参数:任何自定义名称
# 第二个参数:指明URL
# 第三个参数:指定http方法
# 第四个参数:指明指定位置的格式,URL(/simpleswitch/mactable/{dpid} 匹配DPID_PATTERN的描述
@route('simpleswitch', url, methods=['GET'],
requirements={'dpid': dpid_lib.DPID_PATTERN})
# 当使用GET方式访问到该REST API接口时,调用list_mac_table函数
def list_mac_table(self, req, **kwargs):

simple_switch = self.simple_switch_app
dpid = kwargs['dpid']

if dpid not in simple_switch.mac_to_port:
return Response(status=404)

mac_table = simple_switch.mac_to_port.get(dpid, {})
body = json.dumps(mac_table)
return Response(content_type='application/json', text=body)

@route('simpleswitch', url, methods=['PUT'],
requirements={'dpid': dpid_lib.DPID_PATTERN})
def put_mac_table(self, req, **kwargs):

simple_switch = self.simple_switch_app
dpid = kwargs['dpid']
try:
new_entry = req.json if req.body else {}
except ValueError:
raise Response(status=400)

if dpid not in simple_switch.mac_to_port:
return Response(status=404)

try:
mac_table = simple_switch.set_mac_to_port(dpid, new_entry)
body = json.dumps(mac_table)
return Response(content_type='application/json', text=body)
except Exception as e:
return Response(status=500)

实验测试

  • mininet端
    执行sudo mn --topo single,3 --mac --switch ovsk,protocols=OpenFlow13 --controller remote
  • ryu端
    执行ryu-manager --verbose ryu.app.simple_switch_rest_13
  • mininet端
    执行h1 ping -c1 h2,并观察ryu端的变化

使用curl调用 RESTful API 获取mac地址表
curl -X GET http://127.0.0.1:8080/simpleswitch/mactable /0000000000000001
使用curl调用 RESTful API 进行mac地址表的注册
curl -X PUT -d '{"mac" : "00:00:00:00:00:03", "port" : 3}' http://127.0.0.1:8080/simpleswitch/mactable/0000000000000001
查看流表可以看得到刚刚PUT的请求,也转化为流表的形式下发了

参考链接

RYUBook

作者

莫尔索

发布于

2020-03-07

更新于

2024-05-19

许可协议

评论