async www read write

This commit is contained in:
gauthiier 2019-01-04 09:59:01 +01:00
parent ce232ce805
commit 6009696cb6
7 changed files with 193 additions and 57 deletions

View File

@ -1,4 +1,4 @@
[server]
[ice-server]
name = iceice-server
addr = 85.149.50.45
@ -7,7 +7,7 @@ mount = wwww
usr = source
pwd = R1t4R1t4
[ice]
[ice-stream]
name = noise
desc = description of noize
@ -20,3 +20,7 @@ pub = 0
[archive]
path = /Volumes/QQQ/wellwellwell/mp3
[www-server]
port = 8718

View File

@ -4,7 +4,7 @@ import (
"github.com/go-ini/ini"
)
type server_t struct {
type ice_server_t struct {
NAME string
ADDR string
PORT int
@ -14,7 +14,7 @@ type server_t struct {
STYPE int
}
type ice_t struct {
type ice_stream_t struct {
NAME string
DESC string
GENRE string
@ -48,10 +48,11 @@ type archive_t struct {
}
type config_t struct {
Track Track_t
Server server_t
Ice ice_t
Archive archive_t
Track Track_t
IceServer ice_server_t
IceStream ice_stream_t
Archive archive_t
WWWport string
}
var Xcfg config_t
@ -63,23 +64,24 @@ func Loadconfig(filename string) error {
return err
}
Xcfg.Server.NAME = ini.Section("server").Key("name").Value()
Xcfg.Server.ADDR = ini.Section("server").Key("addr").Value()
Xcfg.Server.PORT, _ = ini.Section("server").Key("port").Int()
Xcfg.Server.MOUNT = ini.Section("server").Key("mount").Value()
Xcfg.Server.USR = ini.Section("server").Key("usr").Value()
Xcfg.Server.PWD = ini.Section("server").Key("pwd").Value()
Xcfg.IceServer.NAME = ini.Section("ice-server").Key("name").Value()
Xcfg.IceServer.ADDR = ini.Section("ice-server").Key("addr").Value()
Xcfg.IceServer.PORT, _ = ini.Section("ice-server").Key("port").Int()
Xcfg.IceServer.MOUNT = ini.Section("ice-server").Key("mount").Value()
Xcfg.IceServer.USR = ini.Section("ice-server").Key("usr").Value()
Xcfg.IceServer.PWD = ini.Section("ice-server").Key("pwd").Value()
Xcfg.Ice.NAME = ini.Section("ice").Key("name").Value()
Xcfg.Ice.DESC = ini.Section("ice").Key("desc").Value()
Xcfg.Ice.GENRE = ini.Section("ice").Key("genre").Value()
Xcfg.Ice.URL = ini.Section("ice").Key("url").Value()
Xcfg.Ice.IRC = ini.Section("ice").Key("irc").Value()
Xcfg.Ice.AIM = ini.Section("ice").Key("aim").Value()
Xcfg.Ice.PUB = ini.Section("ice").Key("pub").Value()
Xcfg.IceStream.NAME = ini.Section("ice-stream").Key("name").Value()
Xcfg.IceStream.DESC = ini.Section("ice-stream").Key("desc").Value()
Xcfg.IceStream.GENRE = ini.Section("ice-stream").Key("genre").Value()
Xcfg.IceStream.URL = ini.Section("ice-stream").Key("url").Value()
Xcfg.IceStream.IRC = ini.Section("ice-stream").Key("irc").Value()
Xcfg.IceStream.AIM = ini.Section("ice-stream").Key("aim").Value()
Xcfg.IceStream.PUB = ini.Section("ice-stream").Key("pub").Value()
Xcfg.Archive.PATH = ini.Section("archive").Key("path").Value()
Xcfg.WWWport = ini.Section("www-server").Key("port").Value()
Xcfg.Archive.ALBUMS = make(map[uint32]Album_t)

View File

@ -34,7 +34,7 @@ func Connect() (net.Conn, error){
// ips, err := net.LookupIP("www.google.ca")
var sock net.Conn
host := config.Xcfg.Server.ADDR + ":" + strconv.Itoa(config.Xcfg.Server.PORT)
host := config.Xcfg.IceServer.ADDR + ":" + strconv.Itoa(config.Xcfg.IceServer.PORT)
sock, err := net.Dial("tcp", host)
if err != nil {
@ -48,14 +48,14 @@ func Connect() (net.Conn, error){
// icecast only for now
// try PUT method
mount := config.Xcfg.Server.MOUNT
mount := config.Xcfg.IceServer.MOUNT
s := spf("PUT %s HTTP/1.1\r\n", mount)
if mount[0] != '/' {
s = spf("PUT /%s HTTP/1.1\r\n", mount)
}
err = socket.Send(sock, []byte(s))
s = spf("%s:%s", config.Xcfg.Server.USR, config.Xcfg.Server.PWD)
s = spf("%s:%s", config.Xcfg.IceServer.USR, config.Xcfg.IceServer.PWD)
sb64 := "Authorization: Basic " + base64.StdEncoding.EncodeToString([]byte(s)) + "\r\n"
err = socket.Send(sock, []byte(sb64))
@ -65,19 +65,19 @@ func Connect() (net.Conn, error){
s = "Content-Type: audio/mp3\r\n"
err = socket.Send(sock, []byte(s))
s = spf("ice-name: %s\r\n", config.Xcfg.Ice.NAME)
s = spf("ice-name: %s\r\n", config.Xcfg.IceStream.NAME)
err = socket.Send(sock, []byte(s))
s = spf("ice-public: %s\r\n", config.Xcfg.Ice.PUB)
s = spf("ice-public: %s\r\n", config.Xcfg.IceStream.PUB)
err = socket.Send(sock, []byte(s))
s = spf("ice-url: %s\r\n", config.Xcfg.Ice.URL)
s = spf("ice-url: %s\r\n", config.Xcfg.IceStream.URL)
err = socket.Send(sock, []byte(s))
s = spf("ice-genre: %s\r\n", config.Xcfg.Ice.GENRE)
s = spf("ice-genre: %s\r\n", config.Xcfg.IceStream.GENRE)
err = socket.Send(sock, []byte(s))
s = spf("ice-description: %s\r\n", config.Xcfg.Ice.DESC)
s = spf("ice-description: %s\r\n", config.Xcfg.IceStream.DESC)
err = socket.Send(sock, []byte(s))
s = "ice-audio-info: ice-bitrate=192000; ice-channels=2; ice-samplerate=44100\r\n"
@ -132,14 +132,14 @@ func Send(buff []byte) error {
func Update() error {
host := config.Xcfg.Server.ADDR + ":" + strconv.Itoa(config.Xcfg.Server.PORT)
host := config.Xcfg.IceServer.ADDR + ":" + strconv.Itoa(config.Xcfg.IceServer.PORT)
track := url.QueryEscape(config.Xcfg.Track.NAME)
mount := config.Xcfg.Server.MOUNT
mount := config.Xcfg.IceServer.MOUNT
if mount[0] != '/' {
mount = "/" + mount
}
s := spf("%s:%s", config.Xcfg.Server.USR, config.Xcfg.Server.PWD)
s := spf("%s:%s", config.Xcfg.IceServer.USR, config.Xcfg.IceServer.PWD)
sb64 := "Basic " + base64.StdEncoding.EncodeToString([]byte(s))
header := "GET /admin/metadata?mode=updinfo&mount=" + mount + "&song=" + track + " HTTP/1.0\r\n" +

View File

@ -63,12 +63,12 @@ func MakeRandom(name string, max int) (*Playlist, error) {
return p, nil;
}
func (p *Playlist) Encode() string {
pp := p.Pretty()
res, _ := json.Marshal(pp)
return string(res)
func (mp *MinimalPlaylist) Encode() []byte {
res, err := json.Marshal(mp)
if err != nil {
return nil
}
return res
}
func Decode(jsonstr string) (*Playlist, error) {

View File

@ -25,5 +25,5 @@ func main() {
go audio.Start(p.Pop)
log.Fatal(http.ListenAndServe(":8718", nil))
log.Fatal(http.ListenAndServe(":" + config.Xcfg.WWWport, nil))
}

73
www/socket.go Normal file
View File

@ -0,0 +1,73 @@
package www
import (
"github.com/gorilla/websocket"
)
type Socket struct {
wsc *websocket.Conn
send chan []byte
bundle *Bundle
closed bool
}
func makeSocket(c *websocket.Conn, b *Bundle) *Socket {
s := &Socket {
wsc: c,
send: make(chan []byte, 1024),
bundle: b,
closed: false,
}
s.bundle.add <- s
return s
}
func (s *Socket) close() {
if !s.closed {
pln("socket - close")
close(s.send)
s.wsc.Close()
s.bundle.remove <- s
s.closed = true
}
}
type Bundle struct {
socks map[*Socket]bool
broadcast chan []byte
add chan *Socket
remove chan *Socket
}
func makeBundle() *Bundle {
return &Bundle {
socks: make(map[*Socket]bool),
broadcast: make(chan []byte, 1024),
add: make(chan *Socket),
remove: make(chan *Socket),
}
}
func (b *Bundle) process() {
for {
select {
case sock := <-b.add:
b.socks[sock] = true
case sock := <-b.remove:
if b.socks[sock] {
delete(b.socks, sock)
}
case data := <-b.broadcast:
for s := range b.socks {
select {
case s.send <- data:
default:
pln("bundle -- socket close")
s.close()
delete(b.socks, s)
}
}
}
}
}

View File

@ -3,6 +3,7 @@ package www
import (
"log"
"strconv"
"sync"
"html/template"
"net/http"
"github.com/gorilla/websocket"
@ -16,6 +17,9 @@ var p *playlist.Playlist
var pp *playlist.PrettyPlaylist
var mp *playlist.MinimalPlaylist
var bundle *Bundle
var mutex = &sync.Mutex{}
var pln = log.Println
type op_t struct {
@ -26,7 +30,7 @@ type op_t struct {
var upgrader = websocket.Upgrader{
ReadBufferSize: 512,
WriteBufferSize: 512,
WriteBufferSize: 1024,
}
func Init(playlist *playlist.Playlist) {
@ -34,6 +38,9 @@ func Init(playlist *playlist.Playlist) {
p.CALLB = pop_callback
pp = p.Pretty()
mp = p.Minimal()
bundle = makeBundle()
go bundle.process()
http.HandleFunc("/pp", pp_handler)
http.HandleFunc("/ppop", pp_operations)
}
@ -49,36 +56,64 @@ func pp_handler(w http.ResponseWriter, r *http.Request) {
}
func pp_operations(w http.ResponseWriter, r *http.Request) {
pln("x")
pln("new connection")
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
pln(err)
return
}
wsc = c // this should be hub
go readop(c)
// pln("GGGG")
// s := &Socket{wsc: c, send: make(chan []byte, 1024), bundle: bundle}
// s.bundle.add <- s
s := makeSocket(c, bundle)
go s.read()
go s.write()
// // this should be hub
// go readop(c)
}
func readop(c *websocket.Conn) {
func (s *Socket) read() {
defer s.close()
for {
opdata := &op_t{}
if err := c.ReadJSON(&opdata); err != nil {
pln(err)
return //connection lost?
if err := s.wsc.ReadJSON(&opdata); err != nil {
pln("0")
// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
pln("connection closed - read")
break
// }
}
if opdata.OP == "move" {
if id, err := strconv.ParseUint(opdata.ID, 10, 32); err == nil {
mutex.Lock()
p.Move(uint32(id), opdata.INDEX)
pp = p.Pretty()
mp = p.Minimal()
// pp.Print()
mutex.Unlock()
s.bundle.broadcast <- mp.Encode() /// crap this nees to be []byte
}
}
}
}
if err := c.WriteJSON(mp); err != nil {
pln(err)
return //connection lost?
func (s *Socket) write() {
for {
select {
case mp, _ := <-s.send:
w, err := s.wsc.NextWriter(websocket.TextMessage);
if err != nil {
// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
pln("connection closed - write")
return
// }
}
w.Write(mp)
// is there queued messages?
w.Close()
}
}
}
@ -87,8 +122,30 @@ func pop_callback(list *playlist.Playlist) {
if list == p {
pp = p.Pretty()
mp = p.Minimal()
if wsc != nil {
wsc.WriteJSON(mp) // this should be hub broadcasted
}
bundle.broadcast <- mp.Encode()
}
}
// func readop(c *websocket.Conn) {
// for {
// opdata := &op_t{}
// if err := c.ReadJSON(&opdata); err != nil {
// pln(err)
// return //connection lost?
// }
// if opdata.OP == "move" {
// if id, err := strconv.ParseUint(opdata.ID, 10, 32); err == nil {
// p.Move(uint32(id), opdata.INDEX)
// pp = p.Pretty()
// mp = p.Minimal()
// // pp.Print()
// }
// }
// if err := c.WriteJSON(mp); err != nil {
// pln(err)
// return //connection lost?
// }
// }
// }