" (C) 2010-2012 by Holger Hans Peter Freyther All Rights Reserved This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . " Object subclass: OsmoUDPSocket [ | socket queue rx tx net_exit name on_data | OsmoUDPSocket class >> new [ ^self basicNew initialize ] initialize [ queue := SharedQueue new. net_exit := Semaphore new. ] name: aName [ name := aName ] onData: aBlock [ on_data := aBlock ] start: aSocket [ socket := aSocket. "Receive datagrams from the socket..." rx := self startRXProcess. "Send data to the MGWs" tx := [ [Processor activeProcess name: name, ' TX'. self runTXProcess] ensure: [net_exit signal]] fork. ] startRXProcess [ ^ [[Processor activeProcess name: name, ' RX'. self runRXProcess] ensure: [net_exit signal]] fork. ] runRXProcess [ [ | data | socket ensureReadable. socket isOpen ifFalse: [ ^self logNotice: name, ' socket closed.' area: #core]. data := socket next. on_data value: data. ] repeat. ] runTXProcess [ [ | data | data := queue next. data = nil ifTrue: [ ^self logNotice: name, ' TX asked to quit.' area: #core]. socket nextPut: data. ] repeat. ] stop [ socket ifNil: [^self]. "Close" socket close. queue nextPut: nil. "Wait for the process to exit" self logNotice: name, ' waiting for IO handlers to exit.' area: #core. net_exit wait; wait. "Forget things" socket := nil. tx := nil. rx := nil. ] queueData: aData [ queue nextPut: aData ] ]