async www read write
This commit is contained in:
@@ -0,0 +1,73 @@
|
||||
package www
|
||||
|
||||
import (
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Socket struct {
|
||||
wsc *websocket.Conn
|
||||
send chan []byte
|
||||
bundle *Bundle
|
||||
closed bool
|
||||
}
|
||||
|
||||
func makeSocket(c *websocket.Conn, b *Bundle) *Socket {
|
||||
s := &Socket {
|
||||
wsc: c,
|
||||
send: make(chan []byte, 1024),
|
||||
bundle: b,
|
||||
closed: false,
|
||||
}
|
||||
s.bundle.add <- s
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Socket) close() {
|
||||
if !s.closed {
|
||||
pln("socket - close")
|
||||
close(s.send)
|
||||
s.wsc.Close()
|
||||
s.bundle.remove <- s
|
||||
s.closed = true
|
||||
}
|
||||
}
|
||||
|
||||
type Bundle struct {
|
||||
socks map[*Socket]bool
|
||||
broadcast chan []byte
|
||||
add chan *Socket
|
||||
remove chan *Socket
|
||||
}
|
||||
|
||||
|
||||
func makeBundle() *Bundle {
|
||||
return &Bundle {
|
||||
socks: make(map[*Socket]bool),
|
||||
broadcast: make(chan []byte, 1024),
|
||||
add: make(chan *Socket),
|
||||
remove: make(chan *Socket),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bundle) process() {
|
||||
for {
|
||||
select {
|
||||
case sock := <-b.add:
|
||||
b.socks[sock] = true
|
||||
case sock := <-b.remove:
|
||||
if b.socks[sock] {
|
||||
delete(b.socks, sock)
|
||||
}
|
||||
case data := <-b.broadcast:
|
||||
for s := range b.socks {
|
||||
select {
|
||||
case s.send <- data:
|
||||
default:
|
||||
pln("bundle -- socket close")
|
||||
s.close()
|
||||
delete(b.socks, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
+74
-17
@@ -3,6 +3,7 @@ package www
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -16,6 +17,9 @@ var p *playlist.Playlist
|
||||
var pp *playlist.PrettyPlaylist
|
||||
var mp *playlist.MinimalPlaylist
|
||||
|
||||
var bundle *Bundle
|
||||
var mutex = &sync.Mutex{}
|
||||
|
||||
var pln = log.Println
|
||||
|
||||
type op_t struct {
|
||||
@@ -26,7 +30,7 @@ type op_t struct {
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 512,
|
||||
WriteBufferSize: 512,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
func Init(playlist *playlist.Playlist) {
|
||||
@@ -34,6 +38,9 @@ func Init(playlist *playlist.Playlist) {
|
||||
p.CALLB = pop_callback
|
||||
pp = p.Pretty()
|
||||
mp = p.Minimal()
|
||||
bundle = makeBundle()
|
||||
go bundle.process()
|
||||
|
||||
http.HandleFunc("/pp", pp_handler)
|
||||
http.HandleFunc("/ppop", pp_operations)
|
||||
}
|
||||
@@ -49,37 +56,65 @@ func pp_handler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func pp_operations(w http.ResponseWriter, r *http.Request) {
|
||||
pln("x")
|
||||
pln("new connection")
|
||||
c, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
pln(err)
|
||||
return
|
||||
}
|
||||
wsc = c // this should be hub
|
||||
go readop(c)
|
||||
|
||||
// pln("GGGG")
|
||||
// s := &Socket{wsc: c, send: make(chan []byte, 1024), bundle: bundle}
|
||||
// s.bundle.add <- s
|
||||
|
||||
s := makeSocket(c, bundle)
|
||||
|
||||
go s.read()
|
||||
go s.write()
|
||||
|
||||
// // this should be hub
|
||||
// go readop(c)
|
||||
}
|
||||
|
||||
func readop(c *websocket.Conn) {
|
||||
func (s *Socket) read() {
|
||||
defer s.close()
|
||||
for {
|
||||
opdata := &op_t{}
|
||||
if err := c.ReadJSON(&opdata); err != nil {
|
||||
pln(err)
|
||||
return //connection lost?
|
||||
if err := s.wsc.ReadJSON(&opdata); err != nil {
|
||||
pln("0")
|
||||
// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
pln("connection closed - read")
|
||||
break
|
||||
// }
|
||||
}
|
||||
|
||||
if opdata.OP == "move" {
|
||||
if id, err := strconv.ParseUint(opdata.ID, 10, 32); err == nil {
|
||||
mutex.Lock()
|
||||
p.Move(uint32(id), opdata.INDEX)
|
||||
pp = p.Pretty()
|
||||
mp = p.Minimal()
|
||||
// pp.Print()
|
||||
mutex.Unlock()
|
||||
s.bundle.broadcast <- mp.Encode() /// crap this nees to be []byte
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.WriteJSON(mp); err != nil {
|
||||
pln(err)
|
||||
return //connection lost?
|
||||
}
|
||||
func (s *Socket) write() {
|
||||
for {
|
||||
select {
|
||||
case mp, _ := <-s.send:
|
||||
w, err := s.wsc.NextWriter(websocket.TextMessage);
|
||||
if err != nil {
|
||||
// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
pln("connection closed - write")
|
||||
return
|
||||
// }
|
||||
}
|
||||
w.Write(mp)
|
||||
// is there queued messages?
|
||||
w.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,8 +122,30 @@ func pop_callback(list *playlist.Playlist) {
|
||||
if list == p {
|
||||
pp = p.Pretty()
|
||||
mp = p.Minimal()
|
||||
if wsc != nil {
|
||||
wsc.WriteJSON(mp) // this should be hub broadcasted
|
||||
}
|
||||
bundle.broadcast <- mp.Encode()
|
||||
}
|
||||
}
|
||||
|
||||
// func readop(c *websocket.Conn) {
|
||||
// for {
|
||||
// opdata := &op_t{}
|
||||
// if err := c.ReadJSON(&opdata); err != nil {
|
||||
// pln(err)
|
||||
// return //connection lost?
|
||||
// }
|
||||
|
||||
// if opdata.OP == "move" {
|
||||
// if id, err := strconv.ParseUint(opdata.ID, 10, 32); err == nil {
|
||||
// p.Move(uint32(id), opdata.INDEX)
|
||||
// pp = p.Pretty()
|
||||
// mp = p.Minimal()
|
||||
// // pp.Print()
|
||||
// }
|
||||
// }
|
||||
|
||||
// if err := c.WriteJSON(mp); err != nil {
|
||||
// pln(err)
|
||||
// return //connection lost?
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
Reference in New Issue
Block a user