|
|
|
@ -138,11 +138,11 @@ func (rc *Connection) Do(do func() error) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Force process to soft-break so we can lock
|
|
|
|
|
log.Degubln("request unlocking of process loop for do()")
|
|
|
|
|
log.Debugln("request unlocking of process loop for do()")
|
|
|
|
|
rc.unlockChannel <- true
|
|
|
|
|
log.Degubln("process loop is unlocked for do()")
|
|
|
|
|
log.Debugln("process loop is unlocked for do()")
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Degubln("giving up lock process loop after do() ")
|
|
|
|
|
log.Debugln("giving up lock process loop after do() ")
|
|
|
|
|
rc.unlockResponseChannel <- true
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
@ -169,18 +169,18 @@ func (rc *Connection) DoContext(ctx context.Context, do func(context.Context) er
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Force process to soft-break so we can lock
|
|
|
|
|
log.Degubln("request unlocking of process loop for do()")
|
|
|
|
|
log.Debugln("request unlocking of process loop for do()")
|
|
|
|
|
select {
|
|
|
|
|
case rc.unlockChannel <- true:
|
|
|
|
|
break
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
log.Degubln("giving up on unlocking process loop for do() because context cancelled")
|
|
|
|
|
log.Debugln("giving up on unlocking process loop for do() because context cancelled")
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Degubln("process loop is unlocked for do()")
|
|
|
|
|
log.Debugln("process loop is unlocked for do()")
|
|
|
|
|
defer func() {
|
|
|
|
|
log.Degubln("giving up lock process loop after do() ")
|
|
|
|
|
log.Debugln("giving up lock process loop after do() ")
|
|
|
|
|
rc.unlockResponseChannel <- true
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
@ -195,7 +195,7 @@ func (rc *Connection) DoContext(ctx context.Context, do func(context.Context) er
|
|
|
|
|
// are not met on the local side (a nil error return does not mean the
|
|
|
|
|
// channel was opened successfully, because channels open asynchronously).
|
|
|
|
|
func (rc *Connection) RequestOpenChannel(ctype string, handler channels.Handler) (*channels.Channel, error) {
|
|
|
|
|
log.Degubln(fmt.Sprintf("requesting open channel of type %s", ctype))
|
|
|
|
|
log.Debugln(fmt.Sprintf("requesting open channel of type %s", ctype))
|
|
|
|
|
channel, err := rc.buildChannel(handler, rc.channelManager.OpenChannelRequest)
|
|
|
|
|
if err == nil {
|
|
|
|
|
response, err := handler.OpenOutbound(channel)
|
|
|
|
@ -209,7 +209,7 @@ func (rc *Connection) handleChannelOpening(channel *channels.Channel, response [
|
|
|
|
|
rc.SendRicochetPacket(rc.Conn, 0, response)
|
|
|
|
|
return channel, nil
|
|
|
|
|
}
|
|
|
|
|
log.Degubln(fmt.Sprintf("failed to request open channel: %v", err))
|
|
|
|
|
log.Debugln(fmt.Sprintf("failed to request open channel: %v", err))
|
|
|
|
|
rc.channelManager.RemoveChannel(channel.ID)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -268,7 +268,7 @@ func (rc *Connection) Process(handler Handler) error {
|
|
|
|
|
if rc.closed {
|
|
|
|
|
return utils.ConnectionClosedError
|
|
|
|
|
}
|
|
|
|
|
log.Degubln("entering process loop")
|
|
|
|
|
log.Debugln("entering process loop")
|
|
|
|
|
rc.processUserCallback(func() { handler.OnReady(rc) })
|
|
|
|
|
|
|
|
|
|
// There are exactly two ways out of this loop: a signal on breakChannel
|
|
|
|
@ -288,7 +288,7 @@ func (rc *Connection) Process(handler Handler) error {
|
|
|
|
|
<-rc.unlockResponseChannel
|
|
|
|
|
continue
|
|
|
|
|
case <-rc.breakChannel:
|
|
|
|
|
log.Degubln("process has ended after break")
|
|
|
|
|
log.Debugln("process has ended after break")
|
|
|
|
|
rc.breakResultChannel <- nil
|
|
|
|
|
return nil
|
|
|
|
|
case packet = <-rc.packetChannel:
|
|
|
|
@ -333,7 +333,7 @@ func (rc *Connection) Process(handler Handler) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if packet.Channel == 0 {
|
|
|
|
|
log.Degubln(fmt.Sprintf("received control packet on channel %d", packet.Channel))
|
|
|
|
|
log.Debugln(fmt.Sprintf("received control packet on channel %d", packet.Channel))
|
|
|
|
|
res := new(Protocol_Data_Control.Packet)
|
|
|
|
|
err := proto.Unmarshal(packet.Data[:], res)
|
|
|
|
|
if err == nil {
|
|
|
|
@ -346,11 +346,11 @@ func (rc *Connection) Process(handler Handler) error {
|
|
|
|
|
channel, found := rc.channelManager.GetChannel(packet.Channel)
|
|
|
|
|
if found {
|
|
|
|
|
if len(packet.Data) == 0 {
|
|
|
|
|
log.Degubln(fmt.Sprintf("removing channel %d", packet.Channel))
|
|
|
|
|
log.Debugln(fmt.Sprintf("removing channel %d", packet.Channel))
|
|
|
|
|
rc.channelManager.RemoveChannel(packet.Channel)
|
|
|
|
|
rc.processUserCallback(func() { channel.Handler.Closed(utils.ChannelClosedByPeerError) })
|
|
|
|
|
} else {
|
|
|
|
|
log.Degubln(fmt.Sprintf("received packet on %v channel %d", channel.Handler.Type(), packet.Channel))
|
|
|
|
|
log.Debugln(fmt.Sprintf("received packet on %v channel %d", channel.Handler.Type(), packet.Channel))
|
|
|
|
|
// Send The Ricochet Packet to the Handler
|
|
|
|
|
rc.processUserCallback(func() { channel.Handler.Packet(packet.Data[:]) })
|
|
|
|
|
}
|
|
|
|
@ -358,7 +358,7 @@ func (rc *Connection) Process(handler Handler) error {
|
|
|
|
|
// When a non-zero packet is received for an unknown
|
|
|
|
|
// channel, the recipient responds by closing
|
|
|
|
|
// that channel.
|
|
|
|
|
log.Degubln(fmt.Sprintf("received packet on unknown channel %d. closing.", packet.Channel))
|
|
|
|
|
log.Debugln(fmt.Sprintf("received packet on unknown channel %d. closing.", packet.Channel))
|
|
|
|
|
if len(packet.Data) != 0 {
|
|
|
|
|
rc.SendRicochetPacket(rc.Conn, packet.Channel, []byte{})
|
|
|
|
|
}
|
|
|
|
@ -395,7 +395,7 @@ func (rc *Connection) controlPacket(handler Handler, res *Protocol_Data_Control.
|
|
|
|
|
}
|
|
|
|
|
// Send Error Packet
|
|
|
|
|
response := rc.messageBuilder.RejectOpenChannel(opm.GetChannelIdentifier(), errorText)
|
|
|
|
|
log.Degubln(fmt.Sprintf("sending reject open channel for %v: %v", opm.GetChannelIdentifier(), errorText))
|
|
|
|
|
log.Debugln(fmt.Sprintf("sending reject open channel for %v: %v", opm.GetChannelIdentifier(), errorText))
|
|
|
|
|
rc.SendRicochetPacket(rc.Conn, 0, response)
|
|
|
|
|
|
|
|
|
|
} else if res.GetChannelResult() != nil {
|
|
|
|
@ -404,20 +404,20 @@ func (rc *Connection) controlPacket(handler Handler, res *Protocol_Data_Control.
|
|
|
|
|
// XXX Though not currently part of the protocol
|
|
|
|
|
// We should likely put these calls behind
|
|
|
|
|
// authentication.
|
|
|
|
|
log.Degubln("received keep alive packet")
|
|
|
|
|
log.Debugln("received keep alive packet")
|
|
|
|
|
respond, data := rc.ctrlChannel.ProcessKeepAlive(res.GetKeepAlive())
|
|
|
|
|
if respond {
|
|
|
|
|
log.Degubln("sending keep alive response")
|
|
|
|
|
log.Debugln("sending keep alive response")
|
|
|
|
|
rc.SendRicochetPacket(rc.Conn, 0, data)
|
|
|
|
|
}
|
|
|
|
|
} else if res.GetEnableFeatures() != nil {
|
|
|
|
|
log.Degubln("received enable features packet")
|
|
|
|
|
log.Debugln("received enable features packet")
|
|
|
|
|
data := rc.ctrlChannel.ProcessEnableFeatures(handler, res.GetEnableFeatures())
|
|
|
|
|
log.Degubln(fmt.Sprintf("sending featured enabled: %v", data))
|
|
|
|
|
log.Debugln(fmt.Sprintf("sending featured enabled: %v", data))
|
|
|
|
|
rc.SendRicochetPacket(rc.Conn, 0, data)
|
|
|
|
|
} else if res.GetFeaturesEnabled() != nil {
|
|
|
|
|
rc.SupportChannels = res.GetFeaturesEnabled().GetFeature()
|
|
|
|
|
log.Degubln(fmt.Sprintf("connection supports: %v", rc.SupportChannels))
|
|
|
|
|
log.Debugln(fmt.Sprintf("connection supports: %v", rc.SupportChannels))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -439,10 +439,10 @@ func (rc *Connection) Break() error {
|
|
|
|
|
rc.processBlockMutex.Lock()
|
|
|
|
|
defer rc.processBlockMutex.Unlock()
|
|
|
|
|
if rc.closed {
|
|
|
|
|
log.Degubln("ignoring break because connection is already closed")
|
|
|
|
|
log.Debugln("ignoring break because connection is already closed")
|
|
|
|
|
return utils.ConnectionClosedError
|
|
|
|
|
}
|
|
|
|
|
log.Degubln("breaking out of process loop")
|
|
|
|
|
log.Debugln("breaking out of process loop")
|
|
|
|
|
rc.breakChannel <- true
|
|
|
|
|
return <-rc.breakResultChannel // Wait for Process to End
|
|
|
|
|
}
|
|
|
|
|