shikigrid/mesh/packet_muxer.go

145 lines
3.6 KiB
Go

package mesh
import (
"fmt"
"github.com/evilsocket/islazy/async"
"github.com/evilsocket/islazy/log"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/pcap"
"strings"
"time"
)
const (
// ErrIfaceNotUp Ugly, but gopacket folks are not exporting pcap errors, so ...
// ref. https://github.com/gopacket/gopacket/blob/96986c90e3e5c7e01deed713ff8058e357c0c047/pcap/pcap.go#L281
ErrIfaceNotUp = "Interface Not Up"
)
var (
SnapLength = 65536
ReadTimeout = 100
)
type PacketCallback func(pkt gopacket.Packet)
type PacketMuxer struct {
iface string
filter string
handle *pcap.Handle
source *gopacket.PacketSource
channel chan gopacket.Packet
queue *async.WorkQueue
stop chan struct{}
onPacket PacketCallback
}
func dummyPacketCallback(pkt gopacket.Packet) {
}
func NewPacketMuxer(iface, filter string, workers int) (mux *PacketMuxer, err error) {
mux = &PacketMuxer{
iface: iface,
filter: filter,
stop: make(chan struct{}),
onPacket: dummyPacketCallback,
}
for retry := 0; ; retry++ {
inactiveHandle, err := pcap.NewInactiveHandle(iface)
if err != nil {
return nil, fmt.Errorf("error while opening interface %s: %s", iface, err)
}
defer inactiveHandle.CleanUp()
if err = inactiveHandle.SetRFMon(true); err != nil {
log.Warning("error while setting interface %s in monitor mode: %s", iface, err)
}
if err = inactiveHandle.SetSnapLen(SnapLength); err != nil {
return nil, fmt.Errorf("error while settng span len: %s", err)
}
/*
* We don't want to pcap.BlockForever otherwise pcap_close(handle)
* could hang waiting for a timeout to expire ...
*/
readTimeout := time.Duration(ReadTimeout) * time.Millisecond
if err = inactiveHandle.SetTimeout(readTimeout); err != nil {
return nil, fmt.Errorf("error while setting timeout: %s", err)
} else if mux.handle, err = inactiveHandle.Activate(); err != nil {
if retry == 0 && err.Error() == ErrIfaceNotUp {
log.Info("interface %s is down, bringing it up ...", iface)
if err := ActivateInterface(iface); err != nil {
return nil, err
}
continue
}
return nil, fmt.Errorf("error while activating handle: %s", err)
}
if filter != "" {
if err := mux.handle.SetBPFFilter(filter); err != nil {
return nil, fmt.Errorf("error setting BPF filter '%s': %v", filter, err)
}
}
break
}
mux.source = gopacket.NewPacketSource(mux.handle, mux.handle.LinkType())
mux.channel = mux.source.Packets()
mux.queue = async.NewQueue(workers, func(arg async.Job) {
mux.onPacket(arg.(gopacket.Packet))
})
return mux, nil
}
func (mux *PacketMuxer) OnPacket(cb PacketCallback) {
mux.onPacket = cb
}
func (mux *PacketMuxer) Write(data []byte) error {
var err error
for attempt := 0; attempt < 5; attempt++ {
if err = mux.handle.WritePacketData(data); err == nil {
return nil
} else if strings.Contains(err.Error(), "temporarily unavailable") {
log.Debug("resource temporarily unavailable when sending data")
// if it's the last attempt this will set err to nil as we can't really
// do a lot about this case, otherwise it'll wait 200ms before the next
// attempt is made.
err = nil
if attempt < 5 {
time.Sleep(200 * time.Millisecond)
}
} else {
return nil
}
}
return err
}
func (mux *PacketMuxer) Start() {
go func() {
log.Debug("packet muxer started (iface:%s filter:%s)", mux.iface, mux.filter)
for {
select {
case packet := <-mux.channel:
mux.queue.Add(async.Job(packet))
case <-mux.stop:
return
}
}
}()
}
func (mux *PacketMuxer) Stop() {
log.Debug("stopping packet muxer ...")
mux.stop <- struct{}{}
mux.queue.WaitDone()
log.Debug("packet muxer stopped")
}