2016-07-24 1 views
0

Je suis en train de mettre en place un serveur de radio dans Elixirflux Elixir à tous les abonnés

Un processus fonctionne toujours et la lecture d'un fichier (mp3) et publier sur le sujet « : radio », actuellement à des fins de test quand il il se termine recommence

Chaque connexion est abonné au sujet « : radio »

Je ne comprends pas comment envoyer les morceaux à toutes les connexions souscrites, la connexion fermée après 2 ou 3 morceaux

defmodule Plugtest do 
    import Plug.Conn 

    def init(opts), do: opts 

    def start() do 
    Plug.Adapters.Cowboy.http(Plugtest, []) 
    {:ok, _pid} = PubSub.start_link() 
    spawn(fn -> stream_from_file("./song.mp3", 128) end) 
    end 

    def call(conn, _opts) do 
    conn = conn 
    |> send_chunked(200) 
    |> put_resp_content_type("audio/mpeg") 

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio) 
# File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only 
    end 

    defp send_chunk_to_connection(conn) do 
    receive do 
     {:radio_data, data} -> 
     IO.inspect "* #{inspect self()} * [ #{inspect conn.owner} ] [ #{inspect data} ]" 
#  Enum.into(data, conn) # not working TODO send chunk to connection 
     {:ok, conn} = chunk(conn, data) 
     send_chunk_to_connection(conn) 
    end 
    end 

    defp stream_from_file(fpath, bytes) do 
    File.stream!(fpath, [], bytes) 
    |> Enum.each(fn chunk -> 
     PubSub.publish(:radio, {:radio_data, chunk}) 
    end) 
    stream_from_file(fpath, bytes) 
    end 

end 

Stacktrace:

[error] Process #PID<0.274.0> raised an exception 
     ** (MatchError) no match of right hand side value: {:error, :closed}  
      (plugtest) lib/plugtest.ex:26: Plugtest.send_chunk_to_connection/1 

dépendances:

defp deps do 
    [{:plug, "~> 1.0"}, {:cowboy, "~> 1.0"}, {:pubsub, "~> 0.0.2"}] 
    end 

modifier après @maxdec commentaire

defmodule Plugtest do 
    import Plug.Conn 

    @file_path "./song.mp3" 
    @port 4000 
    @chunk_size 128 

    def init(opts), do: opts 

    def start() do 
    Plug.Adapters.Cowboy.http Plugtest, [], port: @port 
    {:ok, _pid} = PubSub.start_link() 
    spawn fn -> 
     stream_from_file(@file_path, @chunk_size) 
    end 
    end 

    def call(conn, _opts) do 
    conn = conn 
    |> send_chunked(200) 
    |> put_resp_content_type("audio/mpeg") 

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio) 
# File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only 
    conn 
    end 
    defp send_chunk_to_connection(conn) do 
    receive do 
     {:radio_data, data} -> 
     case chunk(conn, data) do 
      {:ok, conn} -> send_chunk_to_connection(conn) 
      {:error, err} -> IO.puts err # do nothing, as something went wrong (client disconnection or something else...) 
     end 
    end 
    end 

    defp stream_from_file(fpath, bytes) do 
    File.stream!(fpath, [], bytes) 
    |> Enum.each(fn chunk -> 
     PubSub.publish(:radio, {:radio_data, chunk}) 
    end) 
    stream_from_file(fpath, bytes) 
    end 

end 
+0

dois-je envoyer des en-têtes uniques pour faire le codage chunked de travail pour l'audio via http? – IddoE

Répondre

1

Après un coup d'œil, je pense qu'il ya 2 choses que vous devez corriger:

  1. PlugTest est un Plug donc devrait retourner conn (ce n'est pas votre problème si). Il faut aussi des blocs en attendant les événements (receive):

    def call(conn, _opts) do 
        conn = conn 
        |> send_chunked(200) 
        |> put_resp_content_type("audio/mpeg") 
    
        :ok = PubSub.subscribe(self(), :radio) 
        send_chunk_to_connection(conn) 
    end 
    
  2. Dans send_chunk_to_connection vous devez faire:

    defp send_chunk_to_connection(conn) do 
        receive do 
        {:radio_data, data} -> 
         case chunk(conn, data) do 
         {:ok, conn} -> send_chunk_to_connection(conn) 
         {:error, err} -> IO.puts err; conn # do nothing, as something went wrong (client disconnection or something else...) 
         end 
        end 
    end 
    
+0

tout d'abord, merci, son ne fonctionne pas, j'ai changé d'appel/2, ne comprenait pas quoi faire à send_chunk_to_connection/1, ajouté le code ci-dessus – IddoE

+0

@JimWest: J'ai modifié ma réponse pour le rendre plus clair – maxdec

+0

essayé, est "fermé", je ne reçois pas 2 # send_chunk_to_connection # {: ok, conn}, où est le Enum.into qui envoie des données à la connexion? (aussi, code édité ci-dessus) – IddoE