From 6009696cb65422cc0dbc64b6af0c08649544f187 Mon Sep 17 00:00:00 2001 From: gauthiier Date: Fri, 4 Jan 2019 09:59:01 +0100 Subject: [PATCH] async www read write --- config.ini | 10 +++-- config/config.go | 40 ++++++++++--------- icecast/icecast.go | 22 +++++------ playlist/playlist.go | 12 +++--- serve.go | 2 +- www/socket.go | 73 +++++++++++++++++++++++++++++++++++ www/www.go | 91 +++++++++++++++++++++++++++++++++++--------- 7 files changed, 193 insertions(+), 57 deletions(-) create mode 100644 www/socket.go diff --git a/config.ini b/config.ini index 672d4b9..cac7d64 100644 --- a/config.ini +++ b/config.ini @@ -1,4 +1,4 @@ -[server] +[ice-server] name = iceice-server addr = 85.149.50.45 @@ -7,7 +7,7 @@ mount = wwww usr = source pwd = R1t4R1t4 -[ice] +[ice-stream] name = noise desc = description of noize @@ -19,4 +19,8 @@ pub = 0 [archive] -path = /Volumes/QQQ/wellwellwell/mp3 \ No newline at end of file +path = /Volumes/QQQ/wellwellwell/mp3 + +[www-server] + +port = 8718 \ No newline at end of file diff --git a/config/config.go b/config/config.go index 4edf03a..141f59c 100644 --- a/config/config.go +++ b/config/config.go @@ -4,7 +4,7 @@ import ( "github.com/go-ini/ini" ) -type server_t struct { +type ice_server_t struct { NAME string ADDR string PORT int @@ -14,7 +14,7 @@ type server_t struct { STYPE int } -type ice_t struct { +type ice_stream_t struct { NAME string DESC string GENRE string @@ -48,10 +48,11 @@ type archive_t struct { } type config_t struct { - Track Track_t - Server server_t - Ice ice_t - Archive archive_t + Track Track_t + IceServer ice_server_t + IceStream ice_stream_t + Archive archive_t + WWWport string } var Xcfg config_t @@ -63,23 +64,24 @@ func Loadconfig(filename string) error { return err } - Xcfg.Server.NAME = ini.Section("server").Key("name").Value() - Xcfg.Server.ADDR = ini.Section("server").Key("addr").Value() - Xcfg.Server.PORT, _ = ini.Section("server").Key("port").Int() - Xcfg.Server.MOUNT = ini.Section("server").Key("mount").Value() - Xcfg.Server.USR = ini.Section("server").Key("usr").Value() - Xcfg.Server.PWD = ini.Section("server").Key("pwd").Value() + Xcfg.IceServer.NAME = ini.Section("ice-server").Key("name").Value() + Xcfg.IceServer.ADDR = ini.Section("ice-server").Key("addr").Value() + Xcfg.IceServer.PORT, _ = ini.Section("ice-server").Key("port").Int() + Xcfg.IceServer.MOUNT = ini.Section("ice-server").Key("mount").Value() + Xcfg.IceServer.USR = ini.Section("ice-server").Key("usr").Value() + Xcfg.IceServer.PWD = ini.Section("ice-server").Key("pwd").Value() - Xcfg.Ice.NAME = ini.Section("ice").Key("name").Value() - Xcfg.Ice.DESC = ini.Section("ice").Key("desc").Value() - Xcfg.Ice.GENRE = ini.Section("ice").Key("genre").Value() - Xcfg.Ice.URL = ini.Section("ice").Key("url").Value() - Xcfg.Ice.IRC = ini.Section("ice").Key("irc").Value() - Xcfg.Ice.AIM = ini.Section("ice").Key("aim").Value() - Xcfg.Ice.PUB = ini.Section("ice").Key("pub").Value() + Xcfg.IceStream.NAME = ini.Section("ice-stream").Key("name").Value() + Xcfg.IceStream.DESC = ini.Section("ice-stream").Key("desc").Value() + Xcfg.IceStream.GENRE = ini.Section("ice-stream").Key("genre").Value() + Xcfg.IceStream.URL = ini.Section("ice-stream").Key("url").Value() + Xcfg.IceStream.IRC = ini.Section("ice-stream").Key("irc").Value() + Xcfg.IceStream.AIM = ini.Section("ice-stream").Key("aim").Value() + Xcfg.IceStream.PUB = ini.Section("ice-stream").Key("pub").Value() Xcfg.Archive.PATH = ini.Section("archive").Key("path").Value() + Xcfg.WWWport = ini.Section("www-server").Key("port").Value() Xcfg.Archive.ALBUMS = make(map[uint32]Album_t) diff --git a/icecast/icecast.go b/icecast/icecast.go index 566d9e5..5bed3e9 100644 --- a/icecast/icecast.go +++ b/icecast/icecast.go @@ -34,7 +34,7 @@ func Connect() (net.Conn, error){ // ips, err := net.LookupIP("www.google.ca") var sock net.Conn - host := config.Xcfg.Server.ADDR + ":" + strconv.Itoa(config.Xcfg.Server.PORT) + host := config.Xcfg.IceServer.ADDR + ":" + strconv.Itoa(config.Xcfg.IceServer.PORT) sock, err := net.Dial("tcp", host) if err != nil { @@ -48,14 +48,14 @@ func Connect() (net.Conn, error){ // icecast only for now // try PUT method - mount := config.Xcfg.Server.MOUNT + mount := config.Xcfg.IceServer.MOUNT s := spf("PUT %s HTTP/1.1\r\n", mount) if mount[0] != '/' { s = spf("PUT /%s HTTP/1.1\r\n", mount) } err = socket.Send(sock, []byte(s)) - s = spf("%s:%s", config.Xcfg.Server.USR, config.Xcfg.Server.PWD) + s = spf("%s:%s", config.Xcfg.IceServer.USR, config.Xcfg.IceServer.PWD) sb64 := "Authorization: Basic " + base64.StdEncoding.EncodeToString([]byte(s)) + "\r\n" err = socket.Send(sock, []byte(sb64)) @@ -65,19 +65,19 @@ func Connect() (net.Conn, error){ s = "Content-Type: audio/mp3\r\n" err = socket.Send(sock, []byte(s)) - s = spf("ice-name: %s\r\n", config.Xcfg.Ice.NAME) + s = spf("ice-name: %s\r\n", config.Xcfg.IceStream.NAME) err = socket.Send(sock, []byte(s)) - s = spf("ice-public: %s\r\n", config.Xcfg.Ice.PUB) + s = spf("ice-public: %s\r\n", config.Xcfg.IceStream.PUB) err = socket.Send(sock, []byte(s)) - s = spf("ice-url: %s\r\n", config.Xcfg.Ice.URL) + s = spf("ice-url: %s\r\n", config.Xcfg.IceStream.URL) err = socket.Send(sock, []byte(s)) - s = spf("ice-genre: %s\r\n", config.Xcfg.Ice.GENRE) + s = spf("ice-genre: %s\r\n", config.Xcfg.IceStream.GENRE) err = socket.Send(sock, []byte(s)) - s = spf("ice-description: %s\r\n", config.Xcfg.Ice.DESC) + s = spf("ice-description: %s\r\n", config.Xcfg.IceStream.DESC) err = socket.Send(sock, []byte(s)) s = "ice-audio-info: ice-bitrate=192000; ice-channels=2; ice-samplerate=44100\r\n" @@ -132,14 +132,14 @@ func Send(buff []byte) error { func Update() error { - host := config.Xcfg.Server.ADDR + ":" + strconv.Itoa(config.Xcfg.Server.PORT) + host := config.Xcfg.IceServer.ADDR + ":" + strconv.Itoa(config.Xcfg.IceServer.PORT) track := url.QueryEscape(config.Xcfg.Track.NAME) - mount := config.Xcfg.Server.MOUNT + mount := config.Xcfg.IceServer.MOUNT if mount[0] != '/' { mount = "/" + mount } - s := spf("%s:%s", config.Xcfg.Server.USR, config.Xcfg.Server.PWD) + s := spf("%s:%s", config.Xcfg.IceServer.USR, config.Xcfg.IceServer.PWD) sb64 := "Basic " + base64.StdEncoding.EncodeToString([]byte(s)) header := "GET /admin/metadata?mode=updinfo&mount=" + mount + "&song=" + track + " HTTP/1.0\r\n" + diff --git a/playlist/playlist.go b/playlist/playlist.go index 0f78ecf..8a64baf 100644 --- a/playlist/playlist.go +++ b/playlist/playlist.go @@ -63,12 +63,12 @@ func MakeRandom(name string, max int) (*Playlist, error) { return p, nil; } -func (p *Playlist) Encode() string { - - pp := p.Pretty() - res, _ := json.Marshal(pp) - return string(res) - +func (mp *MinimalPlaylist) Encode() []byte { + res, err := json.Marshal(mp) + if err != nil { + return nil + } + return res } func Decode(jsonstr string) (*Playlist, error) { diff --git a/serve.go b/serve.go index 7e0e56a..9827e7d 100644 --- a/serve.go +++ b/serve.go @@ -25,5 +25,5 @@ func main() { go audio.Start(p.Pop) - log.Fatal(http.ListenAndServe(":8718", nil)) + log.Fatal(http.ListenAndServe(":" + config.Xcfg.WWWport, nil)) } diff --git a/www/socket.go b/www/socket.go new file mode 100644 index 0000000..5c866a2 --- /dev/null +++ b/www/socket.go @@ -0,0 +1,73 @@ +package www + +import ( + "github.com/gorilla/websocket" +) + +type Socket struct { + wsc *websocket.Conn + send chan []byte + bundle *Bundle + closed bool +} + +func makeSocket(c *websocket.Conn, b *Bundle) *Socket { + s := &Socket { + wsc: c, + send: make(chan []byte, 1024), + bundle: b, + closed: false, + } + s.bundle.add <- s + return s +} + +func (s *Socket) close() { + if !s.closed { + pln("socket - close") + close(s.send) + s.wsc.Close() + s.bundle.remove <- s + s.closed = true + } +} + +type Bundle struct { + socks map[*Socket]bool + broadcast chan []byte + add chan *Socket + remove chan *Socket +} + + +func makeBundle() *Bundle { + return &Bundle { + socks: make(map[*Socket]bool), + broadcast: make(chan []byte, 1024), + add: make(chan *Socket), + remove: make(chan *Socket), + } +} + +func (b *Bundle) process() { + for { + select { + case sock := <-b.add: + b.socks[sock] = true + case sock := <-b.remove: + if b.socks[sock] { + delete(b.socks, sock) + } + case data := <-b.broadcast: + for s := range b.socks { + select { + case s.send <- data: + default: + pln("bundle -- socket close") + s.close() + delete(b.socks, s) + } + } + } + } +} \ No newline at end of file diff --git a/www/www.go b/www/www.go index 5d9d2c1..f1400e1 100644 --- a/www/www.go +++ b/www/www.go @@ -3,6 +3,7 @@ package www import ( "log" "strconv" + "sync" "html/template" "net/http" "github.com/gorilla/websocket" @@ -16,6 +17,9 @@ var p *playlist.Playlist var pp *playlist.PrettyPlaylist var mp *playlist.MinimalPlaylist +var bundle *Bundle +var mutex = &sync.Mutex{} + var pln = log.Println type op_t struct { @@ -26,7 +30,7 @@ type op_t struct { var upgrader = websocket.Upgrader{ ReadBufferSize: 512, - WriteBufferSize: 512, + WriteBufferSize: 1024, } func Init(playlist *playlist.Playlist) { @@ -34,6 +38,9 @@ func Init(playlist *playlist.Playlist) { p.CALLB = pop_callback pp = p.Pretty() mp = p.Minimal() + bundle = makeBundle() + go bundle.process() + http.HandleFunc("/pp", pp_handler) http.HandleFunc("/ppop", pp_operations) } @@ -49,37 +56,65 @@ func pp_handler(w http.ResponseWriter, r *http.Request) { } func pp_operations(w http.ResponseWriter, r *http.Request) { - pln("x") + pln("new connection") c, err := upgrader.Upgrade(w, r, nil) if err != nil { pln(err) return } - wsc = c // this should be hub - go readop(c) + + // pln("GGGG") + // s := &Socket{wsc: c, send: make(chan []byte, 1024), bundle: bundle} + // s.bundle.add <- s + + s := makeSocket(c, bundle) + + go s.read() + go s.write() + + // // this should be hub + // go readop(c) } -func readop(c *websocket.Conn) { +func (s *Socket) read() { + defer s.close() for { opdata := &op_t{} - if err := c.ReadJSON(&opdata); err != nil { - pln(err) - return //connection lost? + if err := s.wsc.ReadJSON(&opdata); err != nil { + pln("0") + // if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + pln("connection closed - read") + break + // } } - if opdata.OP == "move" { if id, err := strconv.ParseUint(opdata.ID, 10, 32); err == nil { + mutex.Lock() p.Move(uint32(id), opdata.INDEX) pp = p.Pretty() mp = p.Minimal() - // pp.Print() + mutex.Unlock() + s.bundle.broadcast <- mp.Encode() /// crap this nees to be []byte } } + } +} - if err := c.WriteJSON(mp); err != nil { - pln(err) - return //connection lost? - } +func (s *Socket) write() { + for { + select { + case mp, _ := <-s.send: + w, err := s.wsc.NextWriter(websocket.TextMessage); + if err != nil { + // if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + pln("connection closed - write") + return + // } + } + w.Write(mp) + // is there queued messages? + w.Close() + } } } @@ -87,8 +122,30 @@ func pop_callback(list *playlist.Playlist) { if list == p { pp = p.Pretty() mp = p.Minimal() - if wsc != nil { - wsc.WriteJSON(mp) // this should be hub broadcasted - } + bundle.broadcast <- mp.Encode() } } + +// func readop(c *websocket.Conn) { +// for { +// opdata := &op_t{} +// if err := c.ReadJSON(&opdata); err != nil { +// pln(err) +// return //connection lost? +// } + +// if opdata.OP == "move" { +// if id, err := strconv.ParseUint(opdata.ID, 10, 32); err == nil { +// p.Move(uint32(id), opdata.INDEX) +// pp = p.Pretty() +// mp = p.Minimal() +// // pp.Print() +// } +// } + +// if err := c.WriteJSON(mp); err != nil { +// pln(err) +// return //connection lost? +// } +// } +// }